Skip to main content

resource_tracker/collector/
cpu.rs

1use crate::metrics::CpuMetrics;
2use procfs::prelude::*;
3use procfs::process::all_processes;
4use procfs::{CpuTime, KernelStats};
5use std::collections::HashMap;
6use std::time::Instant;
7
8type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
10// ---------------------------------------------------------------------------
11// Tick helpers
12// ---------------------------------------------------------------------------
13
14fn cpu_total(c: &CpuTime) -> u64 {
15    c.user
16        + c.nice
17        + c.system
18        + c.idle
19        + c.iowait.unwrap_or(0)
20        + c.irq.unwrap_or(0)
21        + c.softirq.unwrap_or(0)
22        + c.steal.unwrap_or(0)
23}
24
25fn cpu_idle(c: &CpuTime) -> u64 {
26    c.idle + c.iowait.unwrap_or(0)
27}
28
29/// Per-core utilization percentage (0.0–100.0, clamped).
30fn core_util_pct(prev: &CpuTime, curr: &CpuTime) -> f64 {
31    util_pct_from_ticks(
32        cpu_total(prev),
33        cpu_idle(prev),
34        cpu_total(curr),
35        cpu_idle(curr),
36    )
37    .clamp(0.0, 100.0)
38}
39
40/// Aggregate utilization expressed as fractional cores in use (0.0..n_cores).
41/// Not clamped: kernel rounding can produce values very slightly above n_cores.
42fn aggregate_util_cores(prev: &CpuTime, curr: &CpuTime, n_cores: usize) -> f64 {
43    util_pct_from_ticks(
44        cpu_total(prev),
45        cpu_idle(prev),
46        cpu_total(curr),
47        cpu_idle(curr),
48    ) / 100.0
49        * n_cores as f64
50}
51
52/// Pure math: percentage of non-idle ticks between two snapshots (0.0–100.0
53/// before any clamping).  Takes raw pre-computed totals/idles so it can be
54/// unit-tested without constructing a `CpuTime` (which has private fields).
55fn util_pct_from_ticks(prev_total: u64, prev_idle: u64, curr_total: u64, curr_idle: u64) -> f64 {
56    let delta_total = curr_total.saturating_sub(prev_total) as f64;
57    let delta_idle = curr_idle.saturating_sub(prev_idle) as f64;
58    if delta_total == 0.0 {
59        return 0.0;
60    }
61    (delta_total - delta_idle) / delta_total * 100.0
62}
63
64// ---------------------------------------------------------------------------
65// Process-tree helpers
66// ---------------------------------------------------------------------------
67
68/// Returns a map of { pid to (utime, stime) } for every process in the tree
69/// rooted at `root_pid` (root included).  Processes that have already exited
70/// are silently skipped: this is a TOCTOU race we accept.
71fn process_tree_ticks(root_pid: i32) -> HashMap<i32, (u64, u64)> {
72    // Collect all readable processes in one pass.
73    let all: Vec<_> = match all_processes() {
74        Ok(iter) => iter.filter_map(|r| r.ok()).collect(),
75        Err(_) => return HashMap::new(),
76    };
77
78    // Single .stat() read per process: build both the parent->children map and
79    // the pid->(utime+cutime, stime+cstime) map in one pass to halve /proc I/O.
80    //
81    // cutime/cstime (CPU time of waited-for children) is included so that
82    // short-lived child processes that both start AND exit within a single
83    // measurement interval are still captured: once a child is reaped its
84    // ticks roll up into the parent's cutime/cstime.
85    //
86    // Double-counting guard: if a process was alive at the previous snapshot
87    // and exits before the current one, its pre-snapshot ticks are already in
88    // prev_proc_ticks AND will re-appear via the parent's cutime delta.
89    // CpuCollector::collect() subtracts the prev ticks of all such exited
90    // processes to cancel that overcounting.
91    let mut children: HashMap<i32, Vec<i32>> = HashMap::new();
92    let ticks_for: HashMap<i32, (u64, u64)> = all
93        .iter()
94        .filter_map(|proc| {
95            proc.stat().ok().map(|s| {
96                children.entry(s.ppid).or_default().push(proc.pid);
97                let user = s.utime + u64::try_from(s.cutime).unwrap_or(0);
98                let system = s.stime + u64::try_from(s.cstime).unwrap_or(0);
99                (proc.pid, (user, system))
100            })
101        })
102        .collect();
103
104    // BFS from root_pid, collecting (utime, stime) for every reachable node.
105    let mut result = HashMap::new();
106    let mut queue = vec![root_pid];
107    while let Some(pid) = queue.pop() {
108        if let Some(&ticks) = ticks_for.get(&pid) {
109            result.insert(pid, ticks);
110        }
111        if let Some(kids) = children.get(&pid) {
112            queue.extend(kids);
113        }
114    }
115    result
116}
117
118/// Sum of PSS and VmRSS across all given PIDs, each converted to MiB.
119/// One `Process::open` per PID reads both sources. PSS matches Python
120/// `memory_mib`; RSS is retained for consumers that need resident set size.
121fn process_tree_memory_mib(pids: &[i32]) -> (u64, u64) {
122    let mut pss_kib = 0u64;
123    let mut rss_kib = 0u64;
124    for &pid in pids {
125        let Some(proc_) = procfs::process::Process::new(pid).ok() else {
126            continue;
127        };
128        if let Ok(rollup) = proc_.smaps_rollup() {
129            if let Some(bytes) = rollup
130                .memory_map_rollup
131                .iter()
132                .find_map(|m| m.extension.map.get("Pss").copied())
133            {
134                pss_kib += bytes / 1024;
135            }
136        }
137        if let Ok(status) = proc_.status() {
138            if let Some(vmrss) = status.vmrss {
139                rss_kib += vmrss;
140            }
141        }
142    }
143    (pss_kib / 1024, rss_kib / 1024)
144}
145
146/// Per-process cumulative disk I/O bytes from /proc/pid/io.
147/// Returns { pid -> (read_bytes, write_bytes) }.
148/// PIDs whose /proc/pid/io is unreadable (e.g. different UID without ptrace)
149/// are silently omitted -- the delta for those PIDs will be 0.
150fn process_tree_io(pids: &[i32]) -> HashMap<i32, (u64, u64)> {
151    pids.iter()
152        .filter_map(|&pid| {
153            let io = procfs::process::Process::new(pid).ok()?.io().ok()?;
154            Some((pid, (io.read_bytes, io.write_bytes)))
155        })
156        .collect()
157}
158
159// ---------------------------------------------------------------------------
160// Snapshot + Collector
161// ---------------------------------------------------------------------------
162
163struct Snapshot {
164    /// Aggregate across all logical CPUs (the "cpu" summary line in /proc/stat).
165    total: CpuTime,
166    /// Per-logical-CPU entries (cpu0, cpu1, …).
167    per_core: Vec<CpuTime>,
168    /// Wall-clock time after all /proc reads; used as the Python-style
169    /// snapshot timestamp for process CPU rate (Δcpu_secs / Δtimestamp).
170    instant: Instant,
171    /// { pid -> (utime, stime) } for root process + all descendants.
172    /// Empty when no PID is being tracked.
173    proc_ticks: HashMap<i32, (u64, u64)>,
174    /// { pid -> (read_bytes, write_bytes) } from /proc/pid/io.
175    /// Empty when no PID is tracked or /proc/pid/io is unreadable.
176    proc_io: HashMap<i32, (u64, u64)>,
177}
178
179pub struct CpuCollector {
180    /// Root PID of the process tree to track. None = system-only metrics.
181    pid: Option<i32>,
182    prev: Option<Snapshot>,
183}
184
185impl CpuCollector {
186    pub fn new(pid: Option<i32>) -> Self {
187        Self { pid, prev: None }
188    }
189
190    pub fn collect(&mut self) -> Result<CpuMetrics> {
191        let tps = procfs::ticks_per_second() as f64;
192
193        // Total number of existing processes - matches Python resource-tracker's
194        // `processes` column.  Counted by listing numeric entries in /proc,
195        // which is O(n_procs) but cheap for a polling interval.
196        let process_count = std::fs::read_dir("/proc")
197            .map(|dir| {
198                let n = dir
199                    .filter_map(|e| e.ok())
200                    .filter(|e| {
201                        e.file_name()
202                            .to_string_lossy()
203                            .chars()
204                            .all(|c| c.is_ascii_digit())
205                    })
206                    .count();
207                u32::try_from(n).unwrap_or(0)
208            })
209            .unwrap_or(0);
210
211        let proc_ticks = match self.pid {
212            Some(root) => process_tree_ticks(root),
213            None => HashMap::new(),
214        };
215
216        // Read process I/O and memory only when tracking a PID.
217        let proc_io = if self.pid.is_some() {
218            let pids: Vec<i32> = proc_ticks.keys().copied().collect();
219            process_tree_io(&pids)
220        } else {
221            HashMap::new()
222        };
223
224        // Process memory is instantaneous (not a delta), compute before storing prev.
225        let (process_pss_mib, process_rss_mib) = if self.pid.is_some() {
226            let pids: Vec<i32> = proc_ticks.keys().copied().collect();
227            let (pss, rss) = process_tree_memory_mib(&pids);
228            (Some(pss), Some(rss))
229        } else {
230            (None, None)
231        };
232
233        // Capture /proc/stat after process-tree reads, then record wall time so
234        // system and process snapshots and the elapsed denominator share the
235        // same end point in the poll cycle (issue #20).
236        let stats = KernelStats::current()?;
237        let now = Instant::now();
238
239        let curr = Snapshot {
240            total: stats.total,
241            per_core: stats.cpu_time,
242            instant: now,
243            proc_ticks,
244            proc_io,
245        };
246
247        let metrics = match &self.prev {
248            // First call: store baseline and return zeros. The caller should
249            // sleep for one interval then call collect() again for real data.
250            None => CpuMetrics {
251                utilization_pct: 0.0,
252                per_core_pct: vec![0.0; curr.per_core.len()],
253                utime_secs: 0.0,
254                stime_secs: 0.0,
255                process_count,
256                process_cores_used: self.pid.map(|_| 0.0),
257                process_child_count: self
258                    .pid
259                    .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0)),
260                process_utime_secs: self.pid.map(|_| 0.0),
261                process_stime_secs: self.pid.map(|_| 0.0),
262                process_pss_mib,
263                process_rss_mib,
264                process_disk_read_bytes: self.pid.map(|_| 0),
265                process_disk_write_bytes: self.pid.map(|_| 0),
266                process_gpu_usage: None, // filled by main.rs after GPU query
267                process_gpu_vram_mib: None, // filled by main.rs after GPU query
268                process_gpu_utilized: None,
269                process_tree_pids: curr.proc_ticks.keys().copied().collect(),
270            },
271
272            Some(prev) => {
273                let n_cores = curr.per_core.len();
274
275                // Per-interval CPU time deltas - matches Python resource-tracker's
276                // utime/stime columns (delta ticks / ticks_per_second).
277                let utime_secs = (curr.total.user + curr.total.nice)
278                    .saturating_sub(prev.total.user + prev.total.nice)
279                    as f64
280                    / tps;
281                let stime_secs = curr.total.system.saturating_sub(prev.total.system) as f64 / tps;
282
283                let per_core_pct = prev
284                    .per_core
285                    .iter()
286                    .zip(curr.per_core.iter())
287                    .map(|(p, c)| core_util_pct(p, c))
288                    .collect();
289
290                // Cutime double-counting correction (issue #20, bug 1).
291                //
292                // proc_ticks stores (utime + cutime, stime + cstime) so that
293                // short-lived children that start AND exit within one interval are
294                // captured via the parent's cutime delta.
295                //
296                // Side-effect: if a child was alive at the prev snapshot, its
297                // pre-snapshot ticks appear both in prev_proc_ticks[child] AND
298                // in the parent's cutime delta once the child is reaped.  That
299                // double-counts the child's pre-snapshot ticks.
300                //
301                // Fix: sum the prev ticks of every PID in prev that is absent
302                // from curr (it exited), then subtract that sum from the raw
303                // delta.  This cancels exactly the overcounting without
304                // affecting short-lived processes (which were never in prev, so
305                // their prev ticks are zero).
306                let (exited_utime, exited_stime): (u64, u64) = if self.pid.is_some() {
307                    prev.proc_ticks
308                        .iter()
309                        .filter(|(pid, _)| !curr.proc_ticks.contains_key(pid))
310                        .fold((0u64, 0u64), |(au, as_), (_, &(pu, ps))| {
311                            (au + pu, as_ + ps)
312                        })
313                } else {
314                    (0, 0)
315                };
316
317                let utilization_pct = aggregate_util_cores(&prev.total, &curr.total, n_cores);
318
319                let process_child_count = self
320                    .pid
321                    .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0));
322
323                // Per-tree utime and stime deltas (seconds this interval).
324                let process_utime_secs = self.pid.map(|_| {
325                    let raw: u64 = curr
326                        .proc_ticks
327                        .iter()
328                        .map(|(pid, &(cu, _))| {
329                            let pu = prev.proc_ticks.get(pid).map(|&(u, _)| u).unwrap_or(cu);
330                            cu.saturating_sub(pu)
331                        })
332                        .sum();
333                    raw.saturating_sub(exited_utime) as f64 / tps
334                });
335
336                let process_stime_secs = self.pid.map(|_| {
337                    let raw: u64 = curr
338                        .proc_ticks
339                        .iter()
340                        .map(|(pid, &(_, cs))| {
341                            let ps = prev.proc_ticks.get(pid).map(|&(_, s)| s).unwrap_or(cs);
342                            cs.saturating_sub(ps)
343                        })
344                        .sum();
345                    raw.saturating_sub(exited_stime) as f64 / tps
346                });
347
348                // Fractional cores = (Δutime + Δstime) / Δtimestamp, matching
349                // Python ProcessTracker.cpu_usage (process_utime_secs +
350                // process_stime_secs share the same corrected tick deltas).
351                let process_cores_used = match (self.pid, process_utime_secs, process_stime_secs) {
352                    (Some(_), Some(u), Some(s)) => {
353                        let elapsed = (curr.instant - prev.instant).as_secs_f64().max(0.001);
354                        Some(((u + s) / elapsed).max(0.0))
355                    }
356                    _ => None,
357                };
358
359                // Per-interval disk I/O deltas across the process tree.
360                let process_disk_read_bytes = self.pid.map(|_| {
361                    curr.proc_io
362                        .iter()
363                        .map(|(pid, &(cr, _))| {
364                            let pr = prev.proc_io.get(pid).map(|&(r, _)| r).unwrap_or(cr);
365                            cr.saturating_sub(pr)
366                        })
367                        .sum::<u64>()
368                });
369
370                let process_disk_write_bytes = self.pid.map(|_| {
371                    curr.proc_io
372                        .iter()
373                        .map(|(pid, &(_, cw))| {
374                            let pw = prev.proc_io.get(pid).map(|&(_, w)| w).unwrap_or(cw);
375                            cw.saturating_sub(pw)
376                        })
377                        .sum::<u64>()
378                });
379
380                CpuMetrics {
381                    utilization_pct,
382                    per_core_pct,
383                    utime_secs,
384                    stime_secs,
385                    process_count,
386                    process_cores_used,
387                    process_child_count,
388                    process_utime_secs,
389                    process_stime_secs,
390                    process_pss_mib,
391                    process_rss_mib,
392                    process_disk_read_bytes,
393                    process_disk_write_bytes,
394                    process_gpu_usage: None, // filled by main.rs after GPU query
395                    process_gpu_vram_mib: None, // filled by main.rs after GPU query
396                    process_gpu_utilized: None,
397                    process_tree_pids: curr.proc_ticks.keys().copied().collect(),
398                }
399            }
400        };
401
402        self.prev = Some(curr);
403        Ok(metrics)
404    }
405}
406
407// ---------------------------------------------------------------------------
408// Unit tests
409// ---------------------------------------------------------------------------
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    // Tests use `util_pct_from_ticks` directly -- `CpuTime` has private fields
416    // and cannot be constructed in tests.  All branching logic in
417    // `aggregate_util_cores` and `core_util_pct` delegates to this one
418    // pure function, so testing it covers all paths.
419    //
420    // Tick layout: (prev_total, prev_idle, curr_total, curr_idle)
421
422    #[test]
423    fn test_util_pct_all_idle_is_zero() {
424        // All new ticks went to idle.
425        assert_eq!(util_pct_from_ticks(0, 0, 1600, 1600), 0.0);
426    }
427
428    #[test]
429    fn test_util_pct_fully_busy_is_100() {
430        // 1600 new ticks, 0 idle -> 100%.
431        let pct = util_pct_from_ticks(0, 0, 1600, 0);
432        assert!((pct - 100.0).abs() < 0.01, "expected 100.0, got {pct}");
433    }
434
435    #[test]
436    fn test_util_pct_half_busy_is_50() {
437        // 1600 new ticks, 800 idle -> 50%.
438        let pct = util_pct_from_ticks(0, 0, 1600, 800);
439        assert!((pct - 50.0).abs() < 0.01, "expected 50.0, got {pct}");
440    }
441
442    #[test]
443    fn test_util_pct_no_delta_is_zero() {
444        // Identical snapshots: no elapsed ticks.
445        assert_eq!(util_pct_from_ticks(100, 50, 100, 50), 0.0);
446    }
447
448    /// Aggregate util converts the percentage to fractional cores and does NOT clamp.
449    /// 99.9% busy on a 4-core machine -> ~3.996 cores, not forced to <= 4.0.
450    #[test]
451    fn test_aggregate_util_cores_no_clamp() {
452        // 999 active ticks, 1 idle, total 1000 -> 99.9% -> 99.9/100*4 = 3.996
453        let pct = util_pct_from_ticks(0, 0, 1000, 1);
454        let cores = pct / 100.0 * 4.0_f64;
455        assert!(cores > 3.9, "expected close to 4.0, got {cores}");
456        assert!(
457            cores < 4.05,
458            "should not greatly exceed n_cores, got {cores}"
459        );
460    }
461
462    /// Per-core values are clamped to 100 by `core_util_pct`; verify the
463    /// underlying math exceeds 100 without the clamp (so the clamp is doing work).
464    #[test]
465    fn test_util_pct_raw_is_not_clamped() {
466        // 100% busy -- raw result is exactly 100, clamp has no effect here.
467        let raw = util_pct_from_ticks(0, 0, 1000, 0);
468        assert!((raw - 100.0).abs() < 0.01);
469        // Apply clamp explicitly to show it would cap any value > 100.
470        assert_eq!(raw.clamp(0.0, 100.0), 100.0);
471    }
472
473    // T-CPU-06: the first call to collect() returns 0.0 for all delta fields
474    // (utilization_pct, per_core_pct, utime_secs, stime_secs).  A warm-up
475    // sleep then a second collect() produces real data.
476    #[test]
477    fn test_first_collect_returns_zero_for_delta_fields() {
478        let mut collector = CpuCollector::new(None);
479        let metrics = collector.collect().expect("first collect failed");
480        assert_eq!(
481            metrics.utilization_pct, 0.0,
482            "utilization_pct must be 0.0 on first collect, got {}",
483            metrics.utilization_pct
484        );
485        assert!(
486            metrics.per_core_pct.iter().all(|&v| v == 0.0),
487            "per_core_pct must be all-zero on first collect: {:?}",
488            metrics.per_core_pct
489        );
490        assert_eq!(
491            metrics.utime_secs, 0.0,
492            "utime_secs must be 0.0 on first collect, got {}",
493            metrics.utime_secs
494        );
495        assert_eq!(
496            metrics.stime_secs, 0.0,
497            "stime_secs must be 0.0 on first collect, got {}",
498            metrics.stime_secs
499        );
500    }
501
502    // T-CPU-07: first collect() with PID tracking returns Some for process fields.
503    #[test]
504    fn test_first_collect_with_pid_returns_some_process_fields() {
505        let pid = i32::try_from(std::process::id()).expect("PID too large");
506        let mut collector = CpuCollector::new(Some(pid));
507        let m = collector.collect().expect("collect() failed");
508        assert!(
509            m.process_cores_used.is_some(),
510            "process_cores_used must be Some when PID is tracked"
511        );
512        assert!(
513            m.process_child_count.is_some(),
514            "process_child_count must be Some when PID is tracked"
515        );
516        assert!(
517            m.process_pss_mib.is_some(),
518            "process_pss_mib must be Some when PID is tracked"
519        );
520        assert!(
521            m.process_rss_mib.is_some(),
522            "process_rss_mib must be Some when PID is tracked"
523        );
524        assert!(
525            m.process_utime_secs.is_some(),
526            "process_utime_secs must be Some when PID is tracked"
527        );
528        assert!(
529            m.process_stime_secs.is_some(),
530            "process_stime_secs must be Some when PID is tracked"
531        );
532        assert!(
533            m.process_disk_read_bytes.is_some(),
534            "process_disk_read_bytes must be Some when PID is tracked"
535        );
536        assert!(
537            m.process_disk_write_bytes.is_some(),
538            "process_disk_write_bytes must be Some when PID is tracked"
539        );
540    }
541
542    // T-CPU-08: process tree memory (PSS and RSS) is positive for the running test process.
543    #[test]
544    fn test_process_tree_memory_nonzero_for_self() {
545        let pid = i32::try_from(std::process::id()).expect("PID too large");
546        let (pss, rss) = process_tree_memory_mib(&[pid]);
547        assert!(
548            pss > 0,
549            "PSS for the current process should be > 0, got {pss}"
550        );
551        assert!(
552            rss > 0,
553            "RSS for the current process should be > 0, got {rss}"
554        );
555        assert!(
556            pss <= rss,
557            "PSS ({pss}) should not exceed RSS ({rss}) for a single process"
558        );
559    }
560
561    // T-CPU-09: process_tree_ticks contains the root PID.
562    // PID 1 (init/systemd) is used because it is always present and readable
563    // on any Linux host. Using std::process::id() is unreliable under
564    // llvm-cov instrumentation: the instrumented binary's own /proc entry
565    // can be transiently unreadable when many tests run in parallel.
566    #[test]
567    fn test_process_tree_ticks_contains_root_pid() {
568        let ticks = process_tree_ticks(1);
569        assert!(
570            ticks.contains_key(&1),
571            "process_tree_ticks(1) must contain PID 1 (init/systemd is always present)"
572        );
573    }
574
575    // T-CPU-10: second collect() with PID tracking produces non-negative cores.
576    #[test]
577    fn test_second_collect_with_pid_nonneg_cores() {
578        let pid = i32::try_from(std::process::id()).expect("PID too large");
579        let mut collector = CpuCollector::new(Some(pid));
580        let _ = collector.collect().expect("first collect() failed");
581        let m = collector.collect().expect("second collect() failed");
582        let cores = m
583            .process_cores_used
584            .expect("process_cores_used must be Some");
585        assert!(
586            cores >= 0.0,
587            "process_cores_used must be >= 0.0, got {cores}"
588        );
589    }
590
591    // T-CPU-11: second collect() with no PID still returns None for all process fields.
592    #[test]
593    fn test_second_collect_no_pid_all_process_fields_none() {
594        let mut collector = CpuCollector::new(None);
595        let _ = collector.collect().expect("first collect() failed");
596        let m = collector.collect().expect("second collect() failed");
597        assert!(
598            m.process_cores_used.is_none(),
599            "process_cores_used must be None when not tracking"
600        );
601        assert!(
602            m.process_child_count.is_none(),
603            "process_child_count must be None when not tracking"
604        );
605        assert!(
606            m.process_pss_mib.is_none(),
607            "process_pss_mib must be None when not tracking"
608        );
609        assert!(
610            m.process_rss_mib.is_none(),
611            "process_rss_mib must be None when not tracking"
612        );
613        assert!(
614            m.process_utime_secs.is_none(),
615            "process_utime_secs must be None when not tracking"
616        );
617        assert!(
618            m.process_stime_secs.is_none(),
619            "process_stime_secs must be None when not tracking"
620        );
621        assert!(
622            m.process_disk_read_bytes.is_none(),
623            "process_disk_read_bytes must be None when not tracking"
624        );
625        assert!(
626            m.process_disk_write_bytes.is_none(),
627            "process_disk_write_bytes must be None when not tracking"
628        );
629    }
630
631    // T-CPU-12: process_count > 0 (at least one process is always visible).
632    #[test]
633    fn test_process_count_positive() {
634        let mut collector = CpuCollector::new(None);
635        let m = collector.collect().expect("collect() failed");
636        assert!(
637            m.process_count > 0,
638            "process_count must be > 0, got {}",
639            m.process_count
640        );
641    }
642
643    // -----------------------------------------------------------------------
644    // Issue #20 regression tests: process CPU must never exceed system CPU
645    // -----------------------------------------------------------------------
646
647    // T-CPU-13: cutime correction formula -- direct arithmetic verification.
648    //
649    // A child with 500 pre-snapshot user ticks exits between samples and is
650    // reaped by its parent.  The parent's cutime delta therefore covers the
651    // child's full 2500-tick lifetime.  The raw delta overcounts by 500 (the
652    // pre-snapshot portion already counted via the child's prev entry).
653    // The correction must subtract exactly those 500 ticks.
654    #[test]
655    fn test_cutime_correction_cancels_exited_child_ticks() {
656        let prev: HashMap<i32, (u64, u64)> = [
657            (200, (50, 0)),  // parent: 50 own ticks at warm-up
658            (100, (500, 0)), // child:  500 ticks at warm-up
659        ]
660        .iter()
661        .cloned()
662        .collect();
663
664        // Between samples: child accumulates 2000 more ticks then exits.
665        // Parent's cutime = child's full lifetime = 500 + 2000 = 2500.
666        // Parent runs 250 own ticks.
667        let curr: HashMap<i32, (u64, u64)> =
668            [(200, (50 + 250 + 2500, 0))].iter().cloned().collect();
669
670        let raw: u64 = curr
671            .iter()
672            .map(|(pid, &(cu, cs))| {
673                let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
674                cu.saturating_sub(pu) + cs.saturating_sub(ps)
675            })
676            .sum();
677        assert_eq!(
678            raw, 2750,
679            "raw delta must include the double-counted pre-snapshot child ticks"
680        );
681
682        let exited: u64 = prev
683            .iter()
684            .filter(|(pid, _)| !curr.contains_key(pid))
685            .map(|(_, &(pu, ps))| pu + ps)
686            .sum();
687        assert_eq!(
688            exited, 500,
689            "exited ticks must equal the child's pre-snapshot tick count"
690        );
691
692        let corrected = raw.saturating_sub(exited);
693        // Correct answer: parent own delta (250) + child post-snapshot delta (2000) = 2250.
694        assert_eq!(
695            corrected, 2250,
696            "corrected delta must exclude the child's pre-snapshot ticks"
697        );
698    }
699
700    // T-CPU-14: cutime correction handles cascaded exits.
701    //
702    // Both a child and grandchild exit between samples.  Root's cutime ends up
703    // containing the full lifetimes of both.  Subtracting all exited PIDs'
704    // pre-snapshot ticks must leave only the ticks actually earned in the
705    // interval regardless of exit depth.
706    #[test]
707    fn test_cutime_correction_handles_cascaded_exits() {
708        let prev: HashMap<i32, (u64, u64)> = [
709            (7, (0, 0)),   // root:        no prior ticks
710            (8, (100, 0)), // child:       100 pre-snapshot ticks
711            (9, (200, 0)), // grandchild:  200 pre-snapshot ticks
712        ]
713        .iter()
714        .cloned()
715        .collect();
716
717        // Grandchild earns 50 ticks and exits; reaped by child.
718        //   child cutime → 200 + 50 = 250.
719        // Child earns 50 own ticks then exits; reaped by root.
720        //   child lifetime = 100 + 50 + 250 = 400.
721        //   root cutime → 400.
722        // Root earns 30 own ticks.
723        let curr: HashMap<i32, (u64, u64)> = [(7, (30 + 400, 0))].iter().cloned().collect();
724
725        let raw: u64 = curr
726            .iter()
727            .map(|(pid, &(cu, cs))| {
728                let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
729                cu.saturating_sub(pu) + cs.saturating_sub(ps)
730            })
731            .sum();
732        // raw = 430; overcounts by child_prev (100) + grandchild_prev (200) = 300.
733        assert_eq!(raw, 430);
734
735        let exited: u64 = prev
736            .iter()
737            .filter(|(pid, _)| !curr.contains_key(pid))
738            .map(|(_, &(pu, ps))| pu + ps)
739            .sum();
740        assert_eq!(
741            exited, 300,
742            "exited = child pre-snap (100) + grandchild pre-snap (200)"
743        );
744
745        let corrected = raw.saturating_sub(exited);
746        // Correct: root own (30) + child own delta (50) + grandchild own delta (50) = 130.
747        assert_eq!(corrected, 130);
748    }
749
750    // T-CPU-15: process CPU must not exceed system CPU when a long-running
751    // child exits between two measurement snapshots.
752    //
753    // On busy servers the tracked process often has long-standing children
754    // that accumulate significant CPU ticks over many intervals.  When such a
755    // child exits between the warm-up and the real sample, its entire lifetime
756    // rolls into the parent's cutime delta.  Without the double-counting
757    // correction those pre-snapshot ticks are counted a second time, pushing
758    // the process metric above the system metric.
759    //
760    // We compare absolute CPU seconds (process_utime_secs + process_stime_secs
761    // vs utime_secs + stime_secs) rather than fractional cores because both
762    // quantities share the same tps divisor and kernel tick accounting.
763    // fractional-cores comparison divides by wall-clock elapsed, which makes
764    // the ratio unstable when the measurement window is very short (a fixed
765    // iteration burn finishes in microseconds on fast CPUs, leaving
766    // elapsed << TOCTOU gap and inflating process_cores_used).
767    #[test]
768    fn test_process_cores_used_does_not_exceed_system_utilization() {
769        let pid = i32::try_from(std::process::id()).expect("PID too large");
770        let mut collector = CpuCollector::new(Some(pid));
771
772        // Spawn a CPU-busy child to simulate a long-running process on a
773        // busy server.  A shell busy-loop accumulates real utime ticks.
774        let mut child = std::process::Command::new("sh")
775            .args(["-c", "while true; do :; done"])
776            .spawn()
777            .expect("failed to spawn sh busy-loop -- required for T-CPU-15");
778
779        // Let the child accumulate pre-snapshot CPU ticks for 200 ms.
780        // At 100 HZ that yields ~20 ticks = ~0.2 s that would be double-counted
781        // without the cutime correction.
782        std::thread::sleep(std::time::Duration::from_millis(200));
783
784        // Warm-up: child is alive with ~200 ms of accumulated CPU ticks.
785        let _ = collector.collect().expect("warm-up collect failed");
786
787        // Kill the child immediately after warm-up.  Its full lifetime ticks
788        // (including the ~0.2 s pre-snapshot portion) roll into parent's cutime
789        // delta in the next collect().  Without the correction those pre-snapshot
790        // ticks are double-counted, inflating proc_cpu well above sys_cpu.
791        child.kill().ok();
792        child.wait().ok();
793
794        let m = collector.collect().expect("second collect failed");
795
796        let proc_utime = m
797            .process_utime_secs
798            .expect("process_utime_secs must be Some");
799        let proc_stime = m
800            .process_stime_secs
801            .expect("process_stime_secs must be Some");
802        let proc_cpu = proc_utime + proc_stime;
803        let sys_cpu = m.utime_secs + m.stime_secs;
804
805        // 15 % relative + 50 ms absolute tolerance for the TOCTOU gap between
806        // /proc/PID/stat and /proc/stat reads.  Without the cutime correction,
807        // proc_cpu would be inflated by ~0.2 s (pre-snapshot child ticks),
808        // which far exceeds this tolerance and makes the assertion fail.
809        let tolerance = sys_cpu * 0.15 + 0.05;
810        assert!(
811            proc_cpu <= sys_cpu + tolerance,
812            "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
813             must not exceed system CPU ({sys_cpu:.3}s) -- cutime double-counting regression \
814             for issue #20"
815        );
816    }
817
818    // T-CPU-16: process_utime_secs must not exceed system utime_secs after a
819    // child process exits between the warm-up and the real sample.
820    //
821    // This directly exercises the cutime double-counting bug from issue #20:
822    // without the correction, the child's pre-snapshot ticks are counted twice
823    // (once via the child's prev entry, once via the parent's cutime delta),
824    // pushing process_utime_secs above utime_secs on an otherwise idle system.
825    #[test]
826    fn test_process_utime_no_double_count_after_child_exits() {
827        let pid = i32::try_from(std::process::id()).expect("PID too large");
828        let mut collector = CpuCollector::new(Some(pid));
829
830        // Spawn a child that burns a little CPU then exits naturally.
831        // `sh` must be available on any Linux host used for testing.
832        let mut child = std::process::Command::new("sh")
833            .args(["-c", "for i in $(seq 1 20000); do :; done"])
834            .spawn()
835            .expect("failed to spawn sh -- required for T-CPU-16");
836
837        // Let the child accumulate real ticks before the warm-up snapshot so
838        // there is a meaningful pre-snapshot tick count to double-count.
839        std::thread::sleep(std::time::Duration::from_millis(20));
840
841        // Warm-up: child is alive; its ticks are stored in prev_proc_ticks.
842        let _ = collector.collect().expect("warm-up collect failed");
843
844        // Reap the child.  Its full-lifetime ticks roll into parent's cutime.
845        let _ = child.wait().expect("failed to wait for child");
846
847        // Real collect: child is absent from curr_proc_ticks but parent's
848        // cutime has grown by the child's entire lifetime.  Without the
849        // correction the overcounting would inflate process_utime_secs.
850        let m = collector.collect().expect("second collect failed");
851
852        let proc_utime = m
853            .process_utime_secs
854            .expect("process_utime_secs must be Some when a PID is tracked");
855        let sys_utime = m.utime_secs;
856
857        // Allow 5% relative + 50 ms absolute tolerance for /proc timing jitter.
858        let tolerance = sys_utime * 0.05 + 0.05;
859        assert!(
860            proc_utime <= sys_utime + tolerance,
861            "process_utime_secs ({proc_utime:.3}s) exceeds system utime_secs ({sys_utime:.3}s) -- \
862             cutime double-counting regression (issue #20)"
863        );
864    }
865
866    // T-CPU-17: multi-interval accumulation -- child tracked across two snapshots
867    // before exiting.
868    //
869    // This is the scenario shown in examples/repro_cpu_cutime_spike.rs: a child
870    // burns CPU across several measurement intervals, then exits in the final one.
871    // The cutime delta for that final interval equals the child's ENTIRE lifetime,
872    // not just the ticks accumulated since the previous snapshot.
873    //
874    // The correction must use the MOST RECENT prev_proc_ticks (updated after the
875    // intermediate collect), not the original warm-up ticks.  If self.prev were
876    // not updated between intervals, exited_utime would be too small and the
877    // overcounting would not be fully cancelled.
878    //
879    // Without the correction: proc_cpu ≈ child's lifetime at intermediate snapshot
880    //   >> sys_cpu for that short final window.
881    // With the correction:    proc_cpu ≈ only post-intermediate child ticks ≈ 0.
882    #[test]
883    fn test_cutime_correction_multi_interval_child_exit() {
884        let pid = i32::try_from(std::process::id()).expect("PID too large");
885        let mut collector = CpuCollector::new(Some(pid));
886
887        // Spawn a CPU-busy child that accumulates real utime ticks.
888        let mut child = std::process::Command::new("sh")
889            .args(["-c", "while true; do :; done"])
890            .spawn()
891            .expect("failed to spawn sh busy-loop -- required for T-CPU-17");
892
893        // Interval 1 warm-up: child is alive with some initial ticks.
894        std::thread::sleep(std::time::Duration::from_millis(100));
895        let _ = collector.collect().expect("warm-up collect failed");
896
897        // Interval 2: child continues burning CPU. self.prev is updated so the
898        // next correction baseline is the child's tick count at this point.
899        std::thread::sleep(std::time::Duration::from_millis(100));
900        let _ = collector.collect().expect("intermediate collect failed");
901
902        // Interval 3 (final): kill child immediately so its full lifetime since
903        // interval 2 rolls into parent's cutime.  The correction must subtract
904        // the interval-2 tick count (not the warm-up tick count).
905        child.kill().ok();
906        child.wait().ok();
907
908        let m = collector.collect().expect("final collect failed");
909
910        let proc_utime = m
911            .process_utime_secs
912            .expect("process_utime_secs must be Some");
913        let proc_stime = m
914            .process_stime_secs
915            .expect("process_stime_secs must be Some");
916        let proc_cpu = proc_utime + proc_stime;
917        let sys_cpu = m.utime_secs + m.stime_secs;
918
919        // 15 % relative + 50 ms absolute tolerance for TOCTOU jitter.
920        // Without the correction, proc_cpu would include ~200 ms of pre-snapshot
921        // child ticks, far exceeding sys_cpu for this short measurement window.
922        let tolerance = sys_cpu * 0.15 + 0.05;
923        assert!(
924            proc_cpu <= sys_cpu + tolerance,
925            "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
926             must not exceed system CPU ({sys_cpu:.3}s) across multiple intervals -- \
927             cutime multi-interval regression for issue #20"
928        );
929    }
930
931    // T-CPU-18: PSS (via smaps_rollup) correctly tracks a file-backed mapping.
932    //
933    // This is the regression test for the fix shown in
934    // examples/repro_memory_rss_vs_used.rs.  The old VmRSS approach overcounted
935    // shared pages: when N processes map the same file each contributes its full
936    // mapping size to the VmRSS sum, but PSS via /proc/pid/smaps_rollup
937    // attributes only each process's proportional share.
938    //
939    // For a sole mapper with MAP_PRIVATE and all pages touched:
940    //   - RSS increases by >= mapping_mib (all pages in physical RAM)
941    //   - PSS increases by >= mapping_mib (sole mapper gets full proportional share)
942    //   - PSS <= RSS (PSS never over-reports)
943    //   - |PSS_delta - RSS_delta| <= 1 MiB (sole-mapper PSS == RSS for the region)
944    //
945    // The last invariant is the regression guard: if PSS were broken (zero or
946    // reading the wrong field) the delta would diverge from the RSS delta even
947    // though PSS <= RSS holds trivially for zero.
948    //
949    // The multi-process case (N workers sharing the same file, causing
950    // tree_pss << tree_rss) is demonstrated in examples/repro_memory_rss_vs_used.rs.
951    #[test]
952    fn test_pss_tracks_file_backed_mapping() {
953        use std::fs;
954        use std::io::Write as _;
955        use std::os::unix::io::AsRawFd;
956
957        const MAPPING_MIB: usize = 4;
958        const MAPPING_SIZE: usize = MAPPING_MIB * 1024 * 1024;
959
960        let pid = i32::try_from(std::process::id()).expect("PID too large");
961        let path = format!("/tmp/rt_test_pss_{}", std::process::id());
962
963        let (pss_before, rss_before) = process_tree_memory_mib(&[pid]);
964
965        // Write a temp file that this process will map read-only.
966        {
967            let mut f = fs::File::create(&path).expect("cannot create temp file for T-CPU-18");
968            let chunk = vec![0xABu8; 64 * 1024];
969            for _ in 0..(MAPPING_SIZE / chunk.len()) {
970                f.write_all(&chunk).expect("write failed");
971            }
972        }
973
974        let file = fs::File::open(&path).expect("cannot open temp file for T-CPU-18");
975        let ptr = unsafe {
976            libc::mmap(
977                std::ptr::null_mut(),
978                MAPPING_SIZE,
979                libc::PROT_READ,
980                libc::MAP_PRIVATE,
981                file.as_raw_fd(),
982                0,
983            )
984        };
985        assert_ne!(ptr, libc::MAP_FAILED, "mmap failed in T-CPU-18");
986
987        // Touch every page to bring all pages into physical RAM (RSS and PSS).
988        let slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, MAPPING_SIZE) };
989        let mut checksum = 0u64;
990        for offset in (0..MAPPING_SIZE).step_by(4096) {
991            checksum = checksum.wrapping_add(u64::from(slice[offset]));
992        }
993        let _ = checksum;
994
995        let (pss_after, rss_after) = process_tree_memory_mib(&[pid]);
996
997        // Clean up before asserting so a failure does not leak resources.
998        unsafe { libc::munmap(ptr, MAPPING_SIZE) };
999        fs::remove_file(&path).ok();
1000
1001        let pss_delta = pss_after.saturating_sub(pss_before);
1002        let rss_delta = rss_after.saturating_sub(rss_before);
1003
1004        assert!(
1005            rss_delta >= MAPPING_MIB as u64,
1006            "RSS must increase by >= {MAPPING_MIB} MiB after touching the mapping: \
1007             before={rss_before} MiB, after={rss_after} MiB (delta={rss_delta} MiB)"
1008        );
1009        assert!(
1010            pss_delta >= MAPPING_MIB as u64,
1011            "PSS must increase by >= {MAPPING_MIB} MiB as sole mapper of the file: \
1012             before={pss_before} MiB, after={pss_after} MiB (delta={pss_delta} MiB)"
1013        );
1014        assert!(
1015            pss_after <= rss_after,
1016            "PSS ({pss_after} MiB) must not exceed RSS ({rss_after} MiB)"
1017        );
1018        // For the sole mapper the PSS delta and RSS delta must agree within 1 MiB.
1019        // A regression that breaks smaps_rollup reading (e.g. returning 0 for PSS)
1020        // would leave pss_delta == 0 while rss_delta >= MAPPING_MIB.
1021        let skew = pss_delta.abs_diff(rss_delta);
1022        assert!(
1023            skew <= 1,
1024            "PSS delta ({pss_delta} MiB) and RSS delta ({rss_delta} MiB) must agree within \
1025             1 MiB for a sole mapper -- larger skew indicates smaps_rollup is not being read"
1026        );
1027    }
1028}