resource_tracker/collector/cpu.rs
1use crate::metrics::CpuMetrics;
2use procfs::prelude::*;
3use procfs::process::all_processes;
4use procfs::{CpuTime, KernelStats};
5use std::collections::HashMap;
6use std::time::Instant;
7
8type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
10// ---------------------------------------------------------------------------
11// Tick helpers
12// ---------------------------------------------------------------------------
13
14fn cpu_total(c: &CpuTime) -> u64 {
15 c.user
16 + c.nice
17 + c.system
18 + c.idle
19 + c.iowait.unwrap_or(0)
20 + c.irq.unwrap_or(0)
21 + c.softirq.unwrap_or(0)
22 + c.steal.unwrap_or(0)
23}
24
25fn cpu_idle(c: &CpuTime) -> u64 {
26 c.idle + c.iowait.unwrap_or(0)
27}
28
29/// Per-core utilization percentage (0.0–100.0, clamped).
30fn core_util_pct(prev: &CpuTime, curr: &CpuTime) -> f64 {
31 util_pct_from_ticks(
32 cpu_total(prev),
33 cpu_idle(prev),
34 cpu_total(curr),
35 cpu_idle(curr),
36 )
37 .clamp(0.0, 100.0)
38}
39
40/// Aggregate utilization expressed as fractional cores in use (0.0..n_cores).
41/// Not clamped: kernel rounding can produce values very slightly above n_cores.
42fn aggregate_util_cores(prev: &CpuTime, curr: &CpuTime, n_cores: usize) -> f64 {
43 util_pct_from_ticks(
44 cpu_total(prev),
45 cpu_idle(prev),
46 cpu_total(curr),
47 cpu_idle(curr),
48 ) / 100.0
49 * n_cores as f64
50}
51
52/// Pure math: percentage of non-idle ticks between two snapshots (0.0–100.0
53/// before any clamping). Takes raw pre-computed totals/idles so it can be
54/// unit-tested without constructing a `CpuTime` (which has private fields).
55fn util_pct_from_ticks(prev_total: u64, prev_idle: u64, curr_total: u64, curr_idle: u64) -> f64 {
56 let delta_total = curr_total.saturating_sub(prev_total) as f64;
57 let delta_idle = curr_idle.saturating_sub(prev_idle) as f64;
58 if delta_total == 0.0 {
59 return 0.0;
60 }
61 (delta_total - delta_idle) / delta_total * 100.0
62}
63
64// ---------------------------------------------------------------------------
65// Process-tree helpers
66// ---------------------------------------------------------------------------
67
68/// Returns a map of { pid to (utime, stime) } for every process in the tree
69/// rooted at `root_pid` (root included). Processes that have already exited
70/// are silently skipped: this is a TOCTOU race we accept.
71fn process_tree_ticks(root_pid: i32) -> HashMap<i32, (u64, u64)> {
72 // Collect all readable processes in one pass.
73 let all: Vec<_> = match all_processes() {
74 Ok(iter) => iter.filter_map(|r| r.ok()).collect(),
75 Err(_) => return HashMap::new(),
76 };
77
78 // Single .stat() read per process: build both the parent->children map and
79 // the pid->(utime+cutime, stime+cstime) map in one pass to halve /proc I/O.
80 //
81 // cutime/cstime (CPU time of waited-for children) is included so that
82 // short-lived child processes that both start AND exit within a single
83 // measurement interval are still captured: once a child is reaped its
84 // ticks roll up into the parent's cutime/cstime.
85 //
86 // Double-counting guard: if a process was alive at the previous snapshot
87 // and exits before the current one, its pre-snapshot ticks are already in
88 // prev_proc_ticks AND will re-appear via the parent's cutime delta.
89 // CpuCollector::collect() subtracts the prev ticks of all such exited
90 // processes to cancel that overcounting.
91 let mut children: HashMap<i32, Vec<i32>> = HashMap::new();
92 let ticks_for: HashMap<i32, (u64, u64)> = all
93 .iter()
94 .filter_map(|proc| {
95 proc.stat().ok().map(|s| {
96 children.entry(s.ppid).or_default().push(proc.pid);
97 let user = s.utime + u64::try_from(s.cutime).unwrap_or(0);
98 let system = s.stime + u64::try_from(s.cstime).unwrap_or(0);
99 (proc.pid, (user, system))
100 })
101 })
102 .collect();
103
104 // BFS from root_pid, collecting (utime, stime) for every reachable node.
105 let mut result = HashMap::new();
106 let mut queue = vec![root_pid];
107 while let Some(pid) = queue.pop() {
108 if let Some(&ticks) = ticks_for.get(&pid) {
109 result.insert(pid, ticks);
110 }
111 if let Some(kids) = children.get(&pid) {
112 queue.extend(kids);
113 }
114 }
115 result
116}
117
118/// Sum of PSS and VmRSS across all given PIDs, each converted to MiB.
119/// One `Process::open` per PID reads both sources. PSS matches Python
120/// `memory_mib`; RSS is retained for consumers that need resident set size.
121fn process_tree_memory_mib(pids: &[i32]) -> (u64, u64) {
122 let mut pss_kib = 0u64;
123 let mut rss_kib = 0u64;
124 for &pid in pids {
125 let Some(proc_) = procfs::process::Process::new(pid).ok() else {
126 continue;
127 };
128 if let Ok(rollup) = proc_.smaps_rollup() {
129 if let Some(bytes) = rollup
130 .memory_map_rollup
131 .iter()
132 .find_map(|m| m.extension.map.get("Pss").copied())
133 {
134 pss_kib += bytes / 1024;
135 }
136 }
137 if let Ok(status) = proc_.status() {
138 if let Some(vmrss) = status.vmrss {
139 rss_kib += vmrss;
140 }
141 }
142 }
143 (pss_kib / 1024, rss_kib / 1024)
144}
145
146/// Per-process cumulative disk I/O bytes from /proc/pid/io.
147/// Returns { pid -> (read_bytes, write_bytes) }.
148/// PIDs whose /proc/pid/io is unreadable (e.g. different UID without ptrace)
149/// are silently omitted -- the delta for those PIDs will be 0.
150fn process_tree_io(pids: &[i32]) -> HashMap<i32, (u64, u64)> {
151 pids.iter()
152 .filter_map(|&pid| {
153 let io = procfs::process::Process::new(pid).ok()?.io().ok()?;
154 Some((pid, (io.read_bytes, io.write_bytes)))
155 })
156 .collect()
157}
158
159// ---------------------------------------------------------------------------
160// Snapshot + Collector
161// ---------------------------------------------------------------------------
162
163struct Snapshot {
164 /// Aggregate across all logical CPUs (the "cpu" summary line in /proc/stat).
165 total: CpuTime,
166 /// Per-logical-CPU entries (cpu0, cpu1, …).
167 per_core: Vec<CpuTime>,
168 /// Wall-clock time after all /proc reads; used as the Python-style
169 /// snapshot timestamp for process CPU rate (Δcpu_secs / Δtimestamp).
170 instant: Instant,
171 /// { pid -> (utime, stime) } for root process + all descendants.
172 /// Empty when no PID is being tracked.
173 proc_ticks: HashMap<i32, (u64, u64)>,
174 /// { pid -> (read_bytes, write_bytes) } from /proc/pid/io.
175 /// Empty when no PID is tracked or /proc/pid/io is unreadable.
176 proc_io: HashMap<i32, (u64, u64)>,
177}
178
179pub struct CpuCollector {
180 /// Root PID of the process tree to track. None = system-only metrics.
181 pid: Option<i32>,
182 prev: Option<Snapshot>,
183}
184
185impl CpuCollector {
186 pub fn new(pid: Option<i32>) -> Self {
187 Self { pid, prev: None }
188 }
189
190 pub fn collect(&mut self) -> Result<CpuMetrics> {
191 let tps = procfs::ticks_per_second() as f64;
192
193 // Total number of existing processes - matches Python resource-tracker's
194 // `processes` column. Counted by listing numeric entries in /proc,
195 // which is O(n_procs) but cheap for a polling interval.
196 let process_count = std::fs::read_dir("/proc")
197 .map(|dir| {
198 let n = dir
199 .filter_map(|e| e.ok())
200 .filter(|e| {
201 e.file_name()
202 .to_string_lossy()
203 .chars()
204 .all(|c| c.is_ascii_digit())
205 })
206 .count();
207 u32::try_from(n).unwrap_or(0)
208 })
209 .unwrap_or(0);
210
211 let proc_ticks = match self.pid {
212 Some(root) => process_tree_ticks(root),
213 None => HashMap::new(),
214 };
215
216 // Read process I/O and memory only when tracking a PID.
217 let proc_io = if self.pid.is_some() {
218 let pids: Vec<i32> = proc_ticks.keys().copied().collect();
219 process_tree_io(&pids)
220 } else {
221 HashMap::new()
222 };
223
224 // Process memory is instantaneous (not a delta), compute before storing prev.
225 let (process_pss_mib, process_rss_mib) = if self.pid.is_some() {
226 let pids: Vec<i32> = proc_ticks.keys().copied().collect();
227 let (pss, rss) = process_tree_memory_mib(&pids);
228 (Some(pss), Some(rss))
229 } else {
230 (None, None)
231 };
232
233 // Capture /proc/stat after process-tree reads, then record wall time so
234 // system and process snapshots and the elapsed denominator share the
235 // same end point in the poll cycle (issue #20).
236 let stats = KernelStats::current()?;
237 let now = Instant::now();
238
239 let curr = Snapshot {
240 total: stats.total,
241 per_core: stats.cpu_time,
242 instant: now,
243 proc_ticks,
244 proc_io,
245 };
246
247 let metrics = match &self.prev {
248 // First call: store baseline and return zeros. The caller should
249 // sleep for one interval then call collect() again for real data.
250 None => CpuMetrics {
251 utilization_pct: 0.0,
252 per_core_pct: vec![0.0; curr.per_core.len()],
253 utime_secs: 0.0,
254 stime_secs: 0.0,
255 process_count,
256 process_cores_used: self.pid.map(|_| 0.0),
257 process_child_count: self
258 .pid
259 .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0)),
260 process_utime_secs: self.pid.map(|_| 0.0),
261 process_stime_secs: self.pid.map(|_| 0.0),
262 process_pss_mib,
263 process_rss_mib,
264 process_disk_read_bytes: self.pid.map(|_| 0),
265 process_disk_write_bytes: self.pid.map(|_| 0),
266 process_gpu_usage: None, // filled by main.rs after GPU query
267 process_gpu_vram_mib: None, // filled by main.rs after GPU query
268 process_gpu_utilized: None,
269 process_tree_pids: curr.proc_ticks.keys().copied().collect(),
270 },
271
272 Some(prev) => {
273 let n_cores = curr.per_core.len();
274
275 // Per-interval CPU time deltas - matches Python resource-tracker's
276 // utime/stime columns (delta ticks / ticks_per_second).
277 let utime_secs = (curr.total.user + curr.total.nice)
278 .saturating_sub(prev.total.user + prev.total.nice)
279 as f64
280 / tps;
281 let stime_secs = curr.total.system.saturating_sub(prev.total.system) as f64 / tps;
282
283 let per_core_pct = prev
284 .per_core
285 .iter()
286 .zip(curr.per_core.iter())
287 .map(|(p, c)| core_util_pct(p, c))
288 .collect();
289
290 // Cutime double-counting correction (issue #20, bug 1).
291 //
292 // proc_ticks stores (utime + cutime, stime + cstime) so that
293 // short-lived children that start AND exit within one interval are
294 // captured via the parent's cutime delta.
295 //
296 // Side-effect: if a child was alive at the prev snapshot, its
297 // pre-snapshot ticks appear both in prev_proc_ticks[child] AND
298 // in the parent's cutime delta once the child is reaped. That
299 // double-counts the child's pre-snapshot ticks.
300 //
301 // Fix: sum the prev ticks of every PID in prev that is absent
302 // from curr (it exited), then subtract that sum from the raw
303 // delta. This cancels exactly the overcounting without
304 // affecting short-lived processes (which were never in prev, so
305 // their prev ticks are zero).
306 let (exited_utime, exited_stime): (u64, u64) = if self.pid.is_some() {
307 prev.proc_ticks
308 .iter()
309 .filter(|(pid, _)| !curr.proc_ticks.contains_key(pid))
310 .fold((0u64, 0u64), |(au, as_), (_, &(pu, ps))| {
311 (au + pu, as_ + ps)
312 })
313 } else {
314 (0, 0)
315 };
316
317 let utilization_pct = aggregate_util_cores(&prev.total, &curr.total, n_cores);
318
319 let process_child_count = self
320 .pid
321 .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0));
322
323 // Per-tree utime and stime deltas (seconds this interval).
324 let process_utime_secs = self.pid.map(|_| {
325 let raw: u64 = curr
326 .proc_ticks
327 .iter()
328 .map(|(pid, &(cu, _))| {
329 let pu = prev.proc_ticks.get(pid).map(|&(u, _)| u).unwrap_or(cu);
330 cu.saturating_sub(pu)
331 })
332 .sum();
333 raw.saturating_sub(exited_utime) as f64 / tps
334 });
335
336 let process_stime_secs = self.pid.map(|_| {
337 let raw: u64 = curr
338 .proc_ticks
339 .iter()
340 .map(|(pid, &(_, cs))| {
341 let ps = prev.proc_ticks.get(pid).map(|&(_, s)| s).unwrap_or(cs);
342 cs.saturating_sub(ps)
343 })
344 .sum();
345 raw.saturating_sub(exited_stime) as f64 / tps
346 });
347
348 // Fractional cores = (Δutime + Δstime) / Δtimestamp, matching
349 // Python ProcessTracker.cpu_usage (process_utime_secs +
350 // process_stime_secs share the same corrected tick deltas).
351 let process_cores_used = match (self.pid, process_utime_secs, process_stime_secs) {
352 (Some(_), Some(u), Some(s)) => {
353 let elapsed = (curr.instant - prev.instant).as_secs_f64().max(0.001);
354 Some(((u + s) / elapsed).max(0.0))
355 }
356 _ => None,
357 };
358
359 // Per-interval disk I/O deltas across the process tree.
360 let process_disk_read_bytes = self.pid.map(|_| {
361 curr.proc_io
362 .iter()
363 .map(|(pid, &(cr, _))| {
364 let pr = prev.proc_io.get(pid).map(|&(r, _)| r).unwrap_or(cr);
365 cr.saturating_sub(pr)
366 })
367 .sum::<u64>()
368 });
369
370 let process_disk_write_bytes = self.pid.map(|_| {
371 curr.proc_io
372 .iter()
373 .map(|(pid, &(_, cw))| {
374 let pw = prev.proc_io.get(pid).map(|&(_, w)| w).unwrap_or(cw);
375 cw.saturating_sub(pw)
376 })
377 .sum::<u64>()
378 });
379
380 CpuMetrics {
381 utilization_pct,
382 per_core_pct,
383 utime_secs,
384 stime_secs,
385 process_count,
386 process_cores_used,
387 process_child_count,
388 process_utime_secs,
389 process_stime_secs,
390 process_pss_mib,
391 process_rss_mib,
392 process_disk_read_bytes,
393 process_disk_write_bytes,
394 process_gpu_usage: None, // filled by main.rs after GPU query
395 process_gpu_vram_mib: None, // filled by main.rs after GPU query
396 process_gpu_utilized: None,
397 process_tree_pids: curr.proc_ticks.keys().copied().collect(),
398 }
399 }
400 };
401
402 self.prev = Some(curr);
403 Ok(metrics)
404 }
405}
406
407// ---------------------------------------------------------------------------
408// Unit tests
409// ---------------------------------------------------------------------------
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 // Tests use `util_pct_from_ticks` directly -- `CpuTime` has private fields
416 // and cannot be constructed in tests. All branching logic in
417 // `aggregate_util_cores` and `core_util_pct` delegates to this one
418 // pure function, so testing it covers all paths.
419 //
420 // Tick layout: (prev_total, prev_idle, curr_total, curr_idle)
421
422 #[test]
423 fn test_util_pct_all_idle_is_zero() {
424 // All new ticks went to idle.
425 assert_eq!(util_pct_from_ticks(0, 0, 1600, 1600), 0.0);
426 }
427
428 #[test]
429 fn test_util_pct_fully_busy_is_100() {
430 // 1600 new ticks, 0 idle -> 100%.
431 let pct = util_pct_from_ticks(0, 0, 1600, 0);
432 assert!((pct - 100.0).abs() < 0.01, "expected 100.0, got {pct}");
433 }
434
435 #[test]
436 fn test_util_pct_half_busy_is_50() {
437 // 1600 new ticks, 800 idle -> 50%.
438 let pct = util_pct_from_ticks(0, 0, 1600, 800);
439 assert!((pct - 50.0).abs() < 0.01, "expected 50.0, got {pct}");
440 }
441
442 #[test]
443 fn test_util_pct_no_delta_is_zero() {
444 // Identical snapshots: no elapsed ticks.
445 assert_eq!(util_pct_from_ticks(100, 50, 100, 50), 0.0);
446 }
447
448 /// Aggregate util converts the percentage to fractional cores and does NOT clamp.
449 /// 99.9% busy on a 4-core machine -> ~3.996 cores, not forced to <= 4.0.
450 #[test]
451 fn test_aggregate_util_cores_no_clamp() {
452 // 999 active ticks, 1 idle, total 1000 -> 99.9% -> 99.9/100*4 = 3.996
453 let pct = util_pct_from_ticks(0, 0, 1000, 1);
454 let cores = pct / 100.0 * 4.0_f64;
455 assert!(cores > 3.9, "expected close to 4.0, got {cores}");
456 assert!(
457 cores < 4.05,
458 "should not greatly exceed n_cores, got {cores}"
459 );
460 }
461
462 /// Per-core values are clamped to 100 by `core_util_pct`; verify the
463 /// underlying math exceeds 100 without the clamp (so the clamp is doing work).
464 #[test]
465 fn test_util_pct_raw_is_not_clamped() {
466 // 100% busy -- raw result is exactly 100, clamp has no effect here.
467 let raw = util_pct_from_ticks(0, 0, 1000, 0);
468 assert!((raw - 100.0).abs() < 0.01);
469 // Apply clamp explicitly to show it would cap any value > 100.
470 assert_eq!(raw.clamp(0.0, 100.0), 100.0);
471 }
472
473 // T-CPU-06: the first call to collect() returns 0.0 for all delta fields
474 // (utilization_pct, per_core_pct, utime_secs, stime_secs). A warm-up
475 // sleep then a second collect() produces real data.
476 #[test]
477 fn test_first_collect_returns_zero_for_delta_fields() {
478 let mut collector = CpuCollector::new(None);
479 let metrics = collector.collect().expect("first collect failed");
480 assert_eq!(
481 metrics.utilization_pct, 0.0,
482 "utilization_pct must be 0.0 on first collect, got {}",
483 metrics.utilization_pct
484 );
485 assert!(
486 metrics.per_core_pct.iter().all(|&v| v == 0.0),
487 "per_core_pct must be all-zero on first collect: {:?}",
488 metrics.per_core_pct
489 );
490 assert_eq!(
491 metrics.utime_secs, 0.0,
492 "utime_secs must be 0.0 on first collect, got {}",
493 metrics.utime_secs
494 );
495 assert_eq!(
496 metrics.stime_secs, 0.0,
497 "stime_secs must be 0.0 on first collect, got {}",
498 metrics.stime_secs
499 );
500 }
501
502 // T-CPU-07: first collect() with PID tracking returns Some for process fields.
503 #[test]
504 fn test_first_collect_with_pid_returns_some_process_fields() {
505 let pid = i32::try_from(std::process::id()).expect("PID too large");
506 let mut collector = CpuCollector::new(Some(pid));
507 let m = collector.collect().expect("collect() failed");
508 assert!(
509 m.process_cores_used.is_some(),
510 "process_cores_used must be Some when PID is tracked"
511 );
512 assert!(
513 m.process_child_count.is_some(),
514 "process_child_count must be Some when PID is tracked"
515 );
516 assert!(
517 m.process_pss_mib.is_some(),
518 "process_pss_mib must be Some when PID is tracked"
519 );
520 assert!(
521 m.process_rss_mib.is_some(),
522 "process_rss_mib must be Some when PID is tracked"
523 );
524 assert!(
525 m.process_utime_secs.is_some(),
526 "process_utime_secs must be Some when PID is tracked"
527 );
528 assert!(
529 m.process_stime_secs.is_some(),
530 "process_stime_secs must be Some when PID is tracked"
531 );
532 assert!(
533 m.process_disk_read_bytes.is_some(),
534 "process_disk_read_bytes must be Some when PID is tracked"
535 );
536 assert!(
537 m.process_disk_write_bytes.is_some(),
538 "process_disk_write_bytes must be Some when PID is tracked"
539 );
540 }
541
542 // T-CPU-08: process tree memory (PSS and RSS) is positive for the running test process.
543 #[test]
544 fn test_process_tree_memory_nonzero_for_self() {
545 let pid = i32::try_from(std::process::id()).expect("PID too large");
546 let (pss, rss) = process_tree_memory_mib(&[pid]);
547 assert!(
548 pss > 0,
549 "PSS for the current process should be > 0, got {pss}"
550 );
551 assert!(
552 rss > 0,
553 "RSS for the current process should be > 0, got {rss}"
554 );
555 assert!(
556 pss <= rss,
557 "PSS ({pss}) should not exceed RSS ({rss}) for a single process"
558 );
559 }
560
561 // T-CPU-09: process_tree_ticks contains the root PID.
562 // PID 1 (init/systemd) is used because it is always present and readable
563 // on any Linux host. Using std::process::id() is unreliable under
564 // llvm-cov instrumentation: the instrumented binary's own /proc entry
565 // can be transiently unreadable when many tests run in parallel.
566 #[test]
567 fn test_process_tree_ticks_contains_root_pid() {
568 let ticks = process_tree_ticks(1);
569 assert!(
570 ticks.contains_key(&1),
571 "process_tree_ticks(1) must contain PID 1 (init/systemd is always present)"
572 );
573 }
574
575 // T-CPU-10: second collect() with PID tracking produces non-negative cores.
576 #[test]
577 fn test_second_collect_with_pid_nonneg_cores() {
578 let pid = i32::try_from(std::process::id()).expect("PID too large");
579 let mut collector = CpuCollector::new(Some(pid));
580 let _ = collector.collect().expect("first collect() failed");
581 let m = collector.collect().expect("second collect() failed");
582 let cores = m
583 .process_cores_used
584 .expect("process_cores_used must be Some");
585 assert!(
586 cores >= 0.0,
587 "process_cores_used must be >= 0.0, got {cores}"
588 );
589 }
590
591 // T-CPU-11: second collect() with no PID still returns None for all process fields.
592 #[test]
593 fn test_second_collect_no_pid_all_process_fields_none() {
594 let mut collector = CpuCollector::new(None);
595 let _ = collector.collect().expect("first collect() failed");
596 let m = collector.collect().expect("second collect() failed");
597 assert!(
598 m.process_cores_used.is_none(),
599 "process_cores_used must be None when not tracking"
600 );
601 assert!(
602 m.process_child_count.is_none(),
603 "process_child_count must be None when not tracking"
604 );
605 assert!(
606 m.process_pss_mib.is_none(),
607 "process_pss_mib must be None when not tracking"
608 );
609 assert!(
610 m.process_rss_mib.is_none(),
611 "process_rss_mib must be None when not tracking"
612 );
613 assert!(
614 m.process_utime_secs.is_none(),
615 "process_utime_secs must be None when not tracking"
616 );
617 assert!(
618 m.process_stime_secs.is_none(),
619 "process_stime_secs must be None when not tracking"
620 );
621 assert!(
622 m.process_disk_read_bytes.is_none(),
623 "process_disk_read_bytes must be None when not tracking"
624 );
625 assert!(
626 m.process_disk_write_bytes.is_none(),
627 "process_disk_write_bytes must be None when not tracking"
628 );
629 }
630
631 // T-CPU-12: process_count > 0 (at least one process is always visible).
632 #[test]
633 fn test_process_count_positive() {
634 let mut collector = CpuCollector::new(None);
635 let m = collector.collect().expect("collect() failed");
636 assert!(
637 m.process_count > 0,
638 "process_count must be > 0, got {}",
639 m.process_count
640 );
641 }
642
643 // -----------------------------------------------------------------------
644 // Issue #20 regression tests: process CPU must never exceed system CPU
645 // -----------------------------------------------------------------------
646
647 // T-CPU-13: cutime correction formula -- direct arithmetic verification.
648 //
649 // A child with 500 pre-snapshot user ticks exits between samples and is
650 // reaped by its parent. The parent's cutime delta therefore covers the
651 // child's full 2500-tick lifetime. The raw delta overcounts by 500 (the
652 // pre-snapshot portion already counted via the child's prev entry).
653 // The correction must subtract exactly those 500 ticks.
654 #[test]
655 fn test_cutime_correction_cancels_exited_child_ticks() {
656 let prev: HashMap<i32, (u64, u64)> = [
657 (200, (50, 0)), // parent: 50 own ticks at warm-up
658 (100, (500, 0)), // child: 500 ticks at warm-up
659 ]
660 .iter()
661 .cloned()
662 .collect();
663
664 // Between samples: child accumulates 2000 more ticks then exits.
665 // Parent's cutime = child's full lifetime = 500 + 2000 = 2500.
666 // Parent runs 250 own ticks.
667 let curr: HashMap<i32, (u64, u64)> =
668 [(200, (50 + 250 + 2500, 0))].iter().cloned().collect();
669
670 let raw: u64 = curr
671 .iter()
672 .map(|(pid, &(cu, cs))| {
673 let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
674 cu.saturating_sub(pu) + cs.saturating_sub(ps)
675 })
676 .sum();
677 assert_eq!(
678 raw, 2750,
679 "raw delta must include the double-counted pre-snapshot child ticks"
680 );
681
682 let exited: u64 = prev
683 .iter()
684 .filter(|(pid, _)| !curr.contains_key(pid))
685 .map(|(_, &(pu, ps))| pu + ps)
686 .sum();
687 assert_eq!(
688 exited, 500,
689 "exited ticks must equal the child's pre-snapshot tick count"
690 );
691
692 let corrected = raw.saturating_sub(exited);
693 // Correct answer: parent own delta (250) + child post-snapshot delta (2000) = 2250.
694 assert_eq!(
695 corrected, 2250,
696 "corrected delta must exclude the child's pre-snapshot ticks"
697 );
698 }
699
700 // T-CPU-14: cutime correction handles cascaded exits.
701 //
702 // Both a child and grandchild exit between samples. Root's cutime ends up
703 // containing the full lifetimes of both. Subtracting all exited PIDs'
704 // pre-snapshot ticks must leave only the ticks actually earned in the
705 // interval regardless of exit depth.
706 #[test]
707 fn test_cutime_correction_handles_cascaded_exits() {
708 let prev: HashMap<i32, (u64, u64)> = [
709 (7, (0, 0)), // root: no prior ticks
710 (8, (100, 0)), // child: 100 pre-snapshot ticks
711 (9, (200, 0)), // grandchild: 200 pre-snapshot ticks
712 ]
713 .iter()
714 .cloned()
715 .collect();
716
717 // Grandchild earns 50 ticks and exits; reaped by child.
718 // child cutime → 200 + 50 = 250.
719 // Child earns 50 own ticks then exits; reaped by root.
720 // child lifetime = 100 + 50 + 250 = 400.
721 // root cutime → 400.
722 // Root earns 30 own ticks.
723 let curr: HashMap<i32, (u64, u64)> = [(7, (30 + 400, 0))].iter().cloned().collect();
724
725 let raw: u64 = curr
726 .iter()
727 .map(|(pid, &(cu, cs))| {
728 let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
729 cu.saturating_sub(pu) + cs.saturating_sub(ps)
730 })
731 .sum();
732 // raw = 430; overcounts by child_prev (100) + grandchild_prev (200) = 300.
733 assert_eq!(raw, 430);
734
735 let exited: u64 = prev
736 .iter()
737 .filter(|(pid, _)| !curr.contains_key(pid))
738 .map(|(_, &(pu, ps))| pu + ps)
739 .sum();
740 assert_eq!(
741 exited, 300,
742 "exited = child pre-snap (100) + grandchild pre-snap (200)"
743 );
744
745 let corrected = raw.saturating_sub(exited);
746 // Correct: root own (30) + child own delta (50) + grandchild own delta (50) = 130.
747 assert_eq!(corrected, 130);
748 }
749
750 // T-CPU-15: process CPU must not exceed system CPU when a long-running
751 // child exits between two measurement snapshots.
752 //
753 // On busy servers the tracked process often has long-standing children
754 // that accumulate significant CPU ticks over many intervals. When such a
755 // child exits between the warm-up and the real sample, its entire lifetime
756 // rolls into the parent's cutime delta. Without the double-counting
757 // correction those pre-snapshot ticks are counted a second time, pushing
758 // the process metric above the system metric.
759 //
760 // We compare absolute CPU seconds (process_utime_secs + process_stime_secs
761 // vs utime_secs + stime_secs) rather than fractional cores because both
762 // quantities share the same tps divisor and kernel tick accounting.
763 // fractional-cores comparison divides by wall-clock elapsed, which makes
764 // the ratio unstable when the measurement window is very short (a fixed
765 // iteration burn finishes in microseconds on fast CPUs, leaving
766 // elapsed << TOCTOU gap and inflating process_cores_used).
767 #[test]
768 fn test_process_cores_used_does_not_exceed_system_utilization() {
769 let pid = i32::try_from(std::process::id()).expect("PID too large");
770 let mut collector = CpuCollector::new(Some(pid));
771
772 // Spawn a CPU-busy child to simulate a long-running process on a
773 // busy server. A shell busy-loop accumulates real utime ticks.
774 let mut child = std::process::Command::new("sh")
775 .args(["-c", "while true; do :; done"])
776 .spawn()
777 .expect("failed to spawn sh busy-loop -- required for T-CPU-15");
778
779 // Let the child accumulate pre-snapshot CPU ticks for 200 ms.
780 // At 100 HZ that yields ~20 ticks = ~0.2 s that would be double-counted
781 // without the cutime correction.
782 std::thread::sleep(std::time::Duration::from_millis(200));
783
784 // Warm-up: child is alive with ~200 ms of accumulated CPU ticks.
785 let _ = collector.collect().expect("warm-up collect failed");
786
787 // Kill the child immediately after warm-up. Its full lifetime ticks
788 // (including the ~0.2 s pre-snapshot portion) roll into parent's cutime
789 // delta in the next collect(). Without the correction those pre-snapshot
790 // ticks are double-counted, inflating proc_cpu well above sys_cpu.
791 child.kill().ok();
792 child.wait().ok();
793
794 let m = collector.collect().expect("second collect failed");
795
796 let proc_utime = m
797 .process_utime_secs
798 .expect("process_utime_secs must be Some");
799 let proc_stime = m
800 .process_stime_secs
801 .expect("process_stime_secs must be Some");
802 let proc_cpu = proc_utime + proc_stime;
803 let sys_cpu = m.utime_secs + m.stime_secs;
804
805 // 15 % relative + 50 ms absolute tolerance for the TOCTOU gap between
806 // /proc/PID/stat and /proc/stat reads. Without the cutime correction,
807 // proc_cpu would be inflated by ~0.2 s (pre-snapshot child ticks),
808 // which far exceeds this tolerance and makes the assertion fail.
809 let tolerance = sys_cpu * 0.15 + 0.05;
810 assert!(
811 proc_cpu <= sys_cpu + tolerance,
812 "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
813 must not exceed system CPU ({sys_cpu:.3}s) -- cutime double-counting regression \
814 for issue #20"
815 );
816 }
817
818 // T-CPU-16: process_utime_secs must not exceed system utime_secs after a
819 // child process exits between the warm-up and the real sample.
820 //
821 // This directly exercises the cutime double-counting bug from issue #20:
822 // without the correction, the child's pre-snapshot ticks are counted twice
823 // (once via the child's prev entry, once via the parent's cutime delta),
824 // pushing process_utime_secs above utime_secs on an otherwise idle system.
825 #[test]
826 fn test_process_utime_no_double_count_after_child_exits() {
827 let pid = i32::try_from(std::process::id()).expect("PID too large");
828 let mut collector = CpuCollector::new(Some(pid));
829
830 // Spawn a child that burns a little CPU then exits naturally.
831 // `sh` must be available on any Linux host used for testing.
832 let mut child = std::process::Command::new("sh")
833 .args(["-c", "for i in $(seq 1 20000); do :; done"])
834 .spawn()
835 .expect("failed to spawn sh -- required for T-CPU-16");
836
837 // Let the child accumulate real ticks before the warm-up snapshot so
838 // there is a meaningful pre-snapshot tick count to double-count.
839 std::thread::sleep(std::time::Duration::from_millis(20));
840
841 // Warm-up: child is alive; its ticks are stored in prev_proc_ticks.
842 let _ = collector.collect().expect("warm-up collect failed");
843
844 // Reap the child. Its full-lifetime ticks roll into parent's cutime.
845 let _ = child.wait().expect("failed to wait for child");
846
847 // Real collect: child is absent from curr_proc_ticks but parent's
848 // cutime has grown by the child's entire lifetime. Without the
849 // correction the overcounting would inflate process_utime_secs.
850 let m = collector.collect().expect("second collect failed");
851
852 let proc_utime = m
853 .process_utime_secs
854 .expect("process_utime_secs must be Some when a PID is tracked");
855 let sys_utime = m.utime_secs;
856
857 // Allow 5% relative + 50 ms absolute tolerance for /proc timing jitter.
858 let tolerance = sys_utime * 0.05 + 0.05;
859 assert!(
860 proc_utime <= sys_utime + tolerance,
861 "process_utime_secs ({proc_utime:.3}s) exceeds system utime_secs ({sys_utime:.3}s) -- \
862 cutime double-counting regression (issue #20)"
863 );
864 }
865
866 // T-CPU-17: multi-interval accumulation -- child tracked across two snapshots
867 // before exiting.
868 //
869 // This is the scenario shown in examples/repro_cpu_cutime_spike.rs: a child
870 // burns CPU across several measurement intervals, then exits in the final one.
871 // The cutime delta for that final interval equals the child's ENTIRE lifetime,
872 // not just the ticks accumulated since the previous snapshot.
873 //
874 // The correction must use the MOST RECENT prev_proc_ticks (updated after the
875 // intermediate collect), not the original warm-up ticks. If self.prev were
876 // not updated between intervals, exited_utime would be too small and the
877 // overcounting would not be fully cancelled.
878 //
879 // Without the correction: proc_cpu ≈ child's lifetime at intermediate snapshot
880 // >> sys_cpu for that short final window.
881 // With the correction: proc_cpu ≈ only post-intermediate child ticks ≈ 0.
882 #[test]
883 fn test_cutime_correction_multi_interval_child_exit() {
884 let pid = i32::try_from(std::process::id()).expect("PID too large");
885 let mut collector = CpuCollector::new(Some(pid));
886
887 // Spawn a CPU-busy child that accumulates real utime ticks.
888 let mut child = std::process::Command::new("sh")
889 .args(["-c", "while true; do :; done"])
890 .spawn()
891 .expect("failed to spawn sh busy-loop -- required for T-CPU-17");
892
893 // Interval 1 warm-up: child is alive with some initial ticks.
894 std::thread::sleep(std::time::Duration::from_millis(100));
895 let _ = collector.collect().expect("warm-up collect failed");
896
897 // Interval 2: child continues burning CPU. self.prev is updated so the
898 // next correction baseline is the child's tick count at this point.
899 std::thread::sleep(std::time::Duration::from_millis(100));
900 let _ = collector.collect().expect("intermediate collect failed");
901
902 // Interval 3 (final): kill child immediately so its full lifetime since
903 // interval 2 rolls into parent's cutime. The correction must subtract
904 // the interval-2 tick count (not the warm-up tick count).
905 child.kill().ok();
906 child.wait().ok();
907
908 let m = collector.collect().expect("final collect failed");
909
910 let proc_utime = m
911 .process_utime_secs
912 .expect("process_utime_secs must be Some");
913 let proc_stime = m
914 .process_stime_secs
915 .expect("process_stime_secs must be Some");
916 let proc_cpu = proc_utime + proc_stime;
917 let sys_cpu = m.utime_secs + m.stime_secs;
918
919 // 15 % relative + 50 ms absolute tolerance for TOCTOU jitter.
920 // Without the correction, proc_cpu would include ~200 ms of pre-snapshot
921 // child ticks, far exceeding sys_cpu for this short measurement window.
922 let tolerance = sys_cpu * 0.15 + 0.05;
923 assert!(
924 proc_cpu <= sys_cpu + tolerance,
925 "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
926 must not exceed system CPU ({sys_cpu:.3}s) across multiple intervals -- \
927 cutime multi-interval regression for issue #20"
928 );
929 }
930
931 // T-CPU-18: PSS (via smaps_rollup) correctly tracks a file-backed mapping.
932 //
933 // This is the regression test for the fix shown in
934 // examples/repro_memory_rss_vs_used.rs. The old VmRSS approach overcounted
935 // shared pages: when N processes map the same file each contributes its full
936 // mapping size to the VmRSS sum, but PSS via /proc/pid/smaps_rollup
937 // attributes only each process's proportional share.
938 //
939 // For a sole mapper with MAP_PRIVATE and all pages touched:
940 // - RSS increases by >= mapping_mib (all pages in physical RAM)
941 // - PSS increases by >= mapping_mib (sole mapper gets full proportional share)
942 // - PSS <= RSS (PSS never over-reports)
943 // - |PSS_delta - RSS_delta| <= 1 MiB (sole-mapper PSS == RSS for the region)
944 //
945 // The last invariant is the regression guard: if PSS were broken (zero or
946 // reading the wrong field) the delta would diverge from the RSS delta even
947 // though PSS <= RSS holds trivially for zero.
948 //
949 // The multi-process case (N workers sharing the same file, causing
950 // tree_pss << tree_rss) is demonstrated in examples/repro_memory_rss_vs_used.rs.
951 #[test]
952 fn test_pss_tracks_file_backed_mapping() {
953 use std::fs;
954 use std::io::Write as _;
955 use std::os::unix::io::AsRawFd;
956
957 const MAPPING_MIB: usize = 4;
958 const MAPPING_SIZE: usize = MAPPING_MIB * 1024 * 1024;
959
960 let pid = i32::try_from(std::process::id()).expect("PID too large");
961 let path = format!("/tmp/rt_test_pss_{}", std::process::id());
962
963 let (pss_before, rss_before) = process_tree_memory_mib(&[pid]);
964
965 // Write a temp file that this process will map read-only.
966 {
967 let mut f = fs::File::create(&path).expect("cannot create temp file for T-CPU-18");
968 let chunk = vec![0xABu8; 64 * 1024];
969 for _ in 0..(MAPPING_SIZE / chunk.len()) {
970 f.write_all(&chunk).expect("write failed");
971 }
972 }
973
974 let file = fs::File::open(&path).expect("cannot open temp file for T-CPU-18");
975 let ptr = unsafe {
976 libc::mmap(
977 std::ptr::null_mut(),
978 MAPPING_SIZE,
979 libc::PROT_READ,
980 libc::MAP_PRIVATE,
981 file.as_raw_fd(),
982 0,
983 )
984 };
985 assert_ne!(ptr, libc::MAP_FAILED, "mmap failed in T-CPU-18");
986
987 // Touch every page to bring all pages into physical RAM (RSS and PSS).
988 let slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, MAPPING_SIZE) };
989 let mut checksum = 0u64;
990 for offset in (0..MAPPING_SIZE).step_by(4096) {
991 checksum = checksum.wrapping_add(u64::from(slice[offset]));
992 }
993 let _ = checksum;
994
995 let (pss_after, rss_after) = process_tree_memory_mib(&[pid]);
996
997 // Clean up before asserting so a failure does not leak resources.
998 unsafe { libc::munmap(ptr, MAPPING_SIZE) };
999 fs::remove_file(&path).ok();
1000
1001 let pss_delta = pss_after.saturating_sub(pss_before);
1002 let rss_delta = rss_after.saturating_sub(rss_before);
1003
1004 assert!(
1005 rss_delta >= MAPPING_MIB as u64,
1006 "RSS must increase by >= {MAPPING_MIB} MiB after touching the mapping: \
1007 before={rss_before} MiB, after={rss_after} MiB (delta={rss_delta} MiB)"
1008 );
1009 assert!(
1010 pss_delta >= MAPPING_MIB as u64,
1011 "PSS must increase by >= {MAPPING_MIB} MiB as sole mapper of the file: \
1012 before={pss_before} MiB, after={pss_after} MiB (delta={pss_delta} MiB)"
1013 );
1014 assert!(
1015 pss_after <= rss_after,
1016 "PSS ({pss_after} MiB) must not exceed RSS ({rss_after} MiB)"
1017 );
1018 // For the sole mapper the PSS delta and RSS delta must agree within 1 MiB.
1019 // A regression that breaks smaps_rollup reading (e.g. returning 0 for PSS)
1020 // would leave pss_delta == 0 while rss_delta >= MAPPING_MIB.
1021 let skew = pss_delta.abs_diff(rss_delta);
1022 assert!(
1023 skew <= 1,
1024 "PSS delta ({pss_delta} MiB) and RSS delta ({rss_delta} MiB) must agree within \
1025 1 MiB for a sole mapper -- larger skew indicates smaps_rollup is not being read"
1026 );
1027 }
1028}