Skip to main content

resource_tracker/collector/
cpu.rs

1use crate::metrics::CpuMetrics;
2use procfs::prelude::*;
3use procfs::process::all_processes;
4use procfs::{CpuTime, KernelStats};
5use std::collections::HashMap;
6use std::time::Instant;
7
8type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
10// ---------------------------------------------------------------------------
11// Tick helpers
12// ---------------------------------------------------------------------------
13
14fn cpu_total(c: &CpuTime) -> u64 {
15    c.user
16        + c.nice
17        + c.system
18        + c.idle
19        + c.iowait.unwrap_or(0)
20        + c.irq.unwrap_or(0)
21        + c.softirq.unwrap_or(0)
22        + c.steal.unwrap_or(0)
23}
24
25fn cpu_idle(c: &CpuTime) -> u64 {
26    c.idle + c.iowait.unwrap_or(0)
27}
28
29/// Per-core utilization percentage (0.0–100.0, clamped).
30fn core_util_pct(prev: &CpuTime, curr: &CpuTime) -> f64 {
31    util_pct_from_ticks(
32        cpu_total(prev),
33        cpu_idle(prev),
34        cpu_total(curr),
35        cpu_idle(curr),
36    )
37    .clamp(0.0, 100.0)
38}
39
40/// Aggregate utilization expressed as fractional cores in use (0.0..n_cores).
41/// Not clamped: kernel rounding can produce values very slightly above n_cores.
42fn aggregate_util_cores(prev: &CpuTime, curr: &CpuTime, n_cores: usize) -> f64 {
43    util_pct_from_ticks(
44        cpu_total(prev),
45        cpu_idle(prev),
46        cpu_total(curr),
47        cpu_idle(curr),
48    ) / 100.0
49        * n_cores as f64
50}
51
52/// Pure math: percentage of non-idle ticks between two snapshots (0.0–100.0
53/// before any clamping).  Takes raw pre-computed totals/idles so it can be
54/// unit-tested without constructing a `CpuTime` (which has private fields).
55fn util_pct_from_ticks(prev_total: u64, prev_idle: u64, curr_total: u64, curr_idle: u64) -> f64 {
56    let delta_total = curr_total.saturating_sub(prev_total) as f64;
57    let delta_idle = curr_idle.saturating_sub(prev_idle) as f64;
58    if delta_total == 0.0 {
59        return 0.0;
60    }
61    (delta_total - delta_idle) / delta_total * 100.0
62}
63
64// ---------------------------------------------------------------------------
65// Process-tree helpers
66// ---------------------------------------------------------------------------
67
68/// Returns a map of { pid to (utime, stime) } for every process in the tree
69/// rooted at `root_pid` (root included).  Processes that have already exited
70/// are silently skipped: this is a TOCTOU race we accept.
71fn process_tree_ticks(root_pid: i32) -> HashMap<i32, (u64, u64)> {
72    // Collect all readable processes in one pass.
73    let all: Vec<_> = match all_processes() {
74        Ok(iter) => iter.filter_map(|r| r.ok()).collect(),
75        Err(_) => return HashMap::new(),
76    };
77
78    // Build a parent to children lookup.
79    let mut children: HashMap<i32, Vec<i32>> = HashMap::new();
80    all.iter().for_each(|proc| {
81        if let Ok(stat) = proc.stat() {
82            children.entry(stat.ppid).or_default().push(proc.pid);
83        }
84    });
85
86    // Build a pid to (utime, stime) lookup.
87    let ticks_for: HashMap<i32, (u64, u64)> = all
88        .iter()
89        .filter_map(|proc| proc.stat().ok().map(|s| (proc.pid, (s.utime, s.stime))))
90        .collect();
91
92    // BFS from root_pid, collecting (utime, stime) for every reachable node.
93    let mut result = HashMap::new();
94    let mut queue = vec![root_pid];
95    while let Some(pid) = queue.pop() {
96        if let Some(&ticks) = ticks_for.get(&pid) {
97            result.insert(pid, ticks);
98        }
99        if let Some(kids) = children.get(&pid) {
100            queue.extend(kids);
101        }
102    }
103    result
104}
105
106/// Sum of VmRSS (kB) across all given PIDs, converted to MiB.
107/// Processes that have exited or whose /proc/pid/status is unreadable are skipped.
108fn process_tree_rss_mib(pids: &[i32]) -> u64 {
109    let total_kib: u64 = pids
110        .iter()
111        .filter_map(|&pid| {
112            procfs::process::Process::new(pid)
113                .ok()
114                .and_then(|p| p.status().ok())
115                .and_then(|s| s.vmrss)
116        })
117        .sum();
118    total_kib / 1024
119}
120
121/// Per-process cumulative disk I/O bytes from /proc/pid/io.
122/// Returns { pid -> (read_bytes, write_bytes) }.
123/// PIDs whose /proc/pid/io is unreadable (e.g. different UID without ptrace)
124/// are silently omitted -- the delta for those PIDs will be 0.
125fn process_tree_io(pids: &[i32]) -> HashMap<i32, (u64, u64)> {
126    pids.iter()
127        .filter_map(|&pid| {
128            let io = procfs::process::Process::new(pid).ok()?.io().ok()?;
129            Some((pid, (io.read_bytes, io.write_bytes)))
130        })
131        .collect()
132}
133
134// ---------------------------------------------------------------------------
135// Snapshot + Collector
136// ---------------------------------------------------------------------------
137
138struct Snapshot {
139    /// Aggregate across all logical CPUs (the "cpu" summary line in /proc/stat).
140    total: CpuTime,
141    /// Per-logical-CPU entries (cpu0, cpu1, …).
142    per_core: Vec<CpuTime>,
143    /// Wall-clock time of this snapshot, used to normalize process tick deltas.
144    instant: Instant,
145    /// { pid -> (utime, stime) } for root process + all descendants.
146    /// Empty when no PID is being tracked.
147    proc_ticks: HashMap<i32, (u64, u64)>,
148    /// { pid -> (read_bytes, write_bytes) } from /proc/pid/io.
149    /// Empty when no PID is tracked or /proc/pid/io is unreadable.
150    proc_io: HashMap<i32, (u64, u64)>,
151}
152
153pub struct CpuCollector {
154    /// Root PID of the process tree to track. None = system-only metrics.
155    pid: Option<i32>,
156    prev: Option<Snapshot>,
157}
158
159impl CpuCollector {
160    pub fn new(pid: Option<i32>) -> Self {
161        Self { pid, prev: None }
162    }
163
164    pub fn collect(&mut self) -> Result<CpuMetrics> {
165        let stats = KernelStats::current()?;
166        let now = Instant::now();
167
168        let tps = procfs::ticks_per_second() as f64;
169
170        // Total number of existing processes - matches Python resource-tracker's
171        // `processes` column.  Counted by listing numeric entries in /proc,
172        // which is O(n_procs) but cheap for a polling interval.
173        let process_count = std::fs::read_dir("/proc")
174            .map(|dir| {
175                let n = dir
176                    .filter_map(|e| e.ok())
177                    .filter(|e| {
178                        e.file_name()
179                            .to_string_lossy()
180                            .chars()
181                            .all(|c| c.is_ascii_digit())
182                    })
183                    .count();
184                u32::try_from(n).unwrap_or(0)
185            })
186            .unwrap_or(0);
187
188        let proc_ticks = match self.pid {
189            Some(root) => process_tree_ticks(root),
190            None => HashMap::new(),
191        };
192
193        // Read process I/O and RSS only when tracking a PID.
194        let proc_io = if self.pid.is_some() {
195            let pids: Vec<i32> = proc_ticks.keys().copied().collect();
196            process_tree_io(&pids)
197        } else {
198            HashMap::new()
199        };
200
201        // RSS is instantaneous (not a delta), so compute it before storing prev.
202        let process_rss_mib = if self.pid.is_some() {
203            let pids: Vec<i32> = proc_ticks.keys().copied().collect();
204            Some(process_tree_rss_mib(&pids))
205        } else {
206            None
207        };
208
209        let curr = Snapshot {
210            total: stats.total,
211            per_core: stats.cpu_time,
212            instant: now,
213            proc_ticks,
214            proc_io,
215        };
216
217        let metrics = match &self.prev {
218            // First call: store baseline and return zeros. The caller should
219            // sleep for one interval then call collect() again for real data.
220            None => CpuMetrics {
221                utilization_pct: 0.0,
222                per_core_pct: vec![0.0; curr.per_core.len()],
223                utime_secs: 0.0,
224                stime_secs: 0.0,
225                process_count,
226                process_cores_used: self.pid.map(|_| 0.0),
227                process_child_count: self
228                    .pid
229                    .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0)),
230                process_utime_secs: self.pid.map(|_| 0.0),
231                process_stime_secs: self.pid.map(|_| 0.0),
232                process_rss_mib,
233                process_disk_read_bytes: self.pid.map(|_| 0),
234                process_disk_write_bytes: self.pid.map(|_| 0),
235                process_gpu_usage: None, // filled by main.rs after GPU query
236                process_gpu_vram_mib: None, // filled by main.rs after GPU query
237                process_gpu_utilized: None,
238                process_tree_pids: curr.proc_ticks.keys().copied().collect(),
239            },
240
241            Some(prev) => {
242                let n_cores = curr.per_core.len();
243
244                // Per-interval CPU time deltas - matches Python resource-tracker's
245                // utime/stime columns (delta ticks / ticks_per_second).
246                let utime_secs = (curr.total.user + curr.total.nice)
247                    .saturating_sub(prev.total.user + prev.total.nice)
248                    as f64
249                    / tps;
250                let stime_secs = curr.total.system.saturating_sub(prev.total.system) as f64 / tps;
251
252                let per_core_pct = prev
253                    .per_core
254                    .iter()
255                    .zip(curr.per_core.iter())
256                    .map(|(p, c)| core_util_pct(p, c))
257                    .collect();
258
259                // Fractional cores = total (utime+stime) tick delta / (elapsed × tps)
260                let process_cores_used = self.pid.map(|_| {
261                    let elapsed = (curr.instant - prev.instant).as_secs_f64().max(0.001);
262                    let delta: u64 = curr
263                        .proc_ticks
264                        .iter()
265                        .map(|(pid, &(cu, cs))| {
266                            let (pu, ps) = prev.proc_ticks.get(pid).copied().unwrap_or((cu, cs));
267                            cu.saturating_sub(pu) + cs.saturating_sub(ps)
268                        })
269                        .sum();
270                    (delta as f64 / (elapsed * tps)).max(0.0)
271                });
272
273                let process_child_count = self
274                    .pid
275                    .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0));
276
277                // Per-tree utime and stime deltas (seconds this interval).
278                let process_utime_secs = self.pid.map(|_| {
279                    let delta: u64 = curr
280                        .proc_ticks
281                        .iter()
282                        .map(|(pid, &(cu, _))| {
283                            let pu = prev.proc_ticks.get(pid).map(|&(u, _)| u).unwrap_or(cu);
284                            cu.saturating_sub(pu)
285                        })
286                        .sum();
287                    delta as f64 / tps
288                });
289
290                let process_stime_secs = self.pid.map(|_| {
291                    let delta: u64 = curr
292                        .proc_ticks
293                        .iter()
294                        .map(|(pid, &(_, cs))| {
295                            let ps = prev.proc_ticks.get(pid).map(|&(_, s)| s).unwrap_or(cs);
296                            cs.saturating_sub(ps)
297                        })
298                        .sum();
299                    delta as f64 / tps
300                });
301
302                // Per-interval disk I/O deltas across the process tree.
303                let process_disk_read_bytes = self.pid.map(|_| {
304                    curr.proc_io
305                        .iter()
306                        .map(|(pid, &(cr, _))| {
307                            let pr = prev.proc_io.get(pid).map(|&(r, _)| r).unwrap_or(cr);
308                            cr.saturating_sub(pr)
309                        })
310                        .sum::<u64>()
311                });
312
313                let process_disk_write_bytes = self.pid.map(|_| {
314                    curr.proc_io
315                        .iter()
316                        .map(|(pid, &(_, cw))| {
317                            let pw = prev.proc_io.get(pid).map(|&(_, w)| w).unwrap_or(cw);
318                            cw.saturating_sub(pw)
319                        })
320                        .sum::<u64>()
321                });
322
323                CpuMetrics {
324                    utilization_pct: aggregate_util_cores(&prev.total, &curr.total, n_cores),
325                    per_core_pct,
326                    utime_secs,
327                    stime_secs,
328                    process_count,
329                    process_cores_used,
330                    process_child_count,
331                    process_utime_secs,
332                    process_stime_secs,
333                    process_rss_mib,
334                    process_disk_read_bytes,
335                    process_disk_write_bytes,
336                    process_gpu_usage: None, // filled by main.rs after GPU query
337                    process_gpu_vram_mib: None, // filled by main.rs after GPU query
338                    process_gpu_utilized: None,
339                    process_tree_pids: curr.proc_ticks.keys().copied().collect(),
340                }
341            }
342        };
343
344        self.prev = Some(curr);
345        Ok(metrics)
346    }
347}
348
349// ---------------------------------------------------------------------------
350// Unit tests
351// ---------------------------------------------------------------------------
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    // Tests use `util_pct_from_ticks` directly -- `CpuTime` has private fields
358    // and cannot be constructed in tests.  All branching logic in
359    // `aggregate_util_cores` and `core_util_pct` delegates to this one
360    // pure function, so testing it covers all paths.
361    //
362    // Tick layout: (prev_total, prev_idle, curr_total, curr_idle)
363
364    #[test]
365    fn test_util_pct_all_idle_is_zero() {
366        // All new ticks went to idle.
367        assert_eq!(util_pct_from_ticks(0, 0, 1600, 1600), 0.0);
368    }
369
370    #[test]
371    fn test_util_pct_fully_busy_is_100() {
372        // 1600 new ticks, 0 idle -> 100%.
373        let pct = util_pct_from_ticks(0, 0, 1600, 0);
374        assert!((pct - 100.0).abs() < 0.01, "expected 100.0, got {pct}");
375    }
376
377    #[test]
378    fn test_util_pct_half_busy_is_50() {
379        // 1600 new ticks, 800 idle -> 50%.
380        let pct = util_pct_from_ticks(0, 0, 1600, 800);
381        assert!((pct - 50.0).abs() < 0.01, "expected 50.0, got {pct}");
382    }
383
384    #[test]
385    fn test_util_pct_no_delta_is_zero() {
386        // Identical snapshots: no elapsed ticks.
387        assert_eq!(util_pct_from_ticks(100, 50, 100, 50), 0.0);
388    }
389
390    /// Aggregate util converts the percentage to fractional cores and does NOT clamp.
391    /// 99.9% busy on a 4-core machine -> ~3.996 cores, not forced to <= 4.0.
392    #[test]
393    fn test_aggregate_util_cores_no_clamp() {
394        // 999 active ticks, 1 idle, total 1000 -> 99.9% -> 99.9/100*4 = 3.996
395        let pct = util_pct_from_ticks(0, 0, 1000, 1);
396        let cores = pct / 100.0 * 4.0_f64;
397        assert!(cores > 3.9, "expected close to 4.0, got {cores}");
398        assert!(
399            cores < 4.05,
400            "should not greatly exceed n_cores, got {cores}"
401        );
402    }
403
404    /// Per-core values are clamped to 100 by `core_util_pct`; verify the
405    /// underlying math exceeds 100 without the clamp (so the clamp is doing work).
406    #[test]
407    fn test_util_pct_raw_is_not_clamped() {
408        // 100% busy -- raw result is exactly 100, clamp has no effect here.
409        let raw = util_pct_from_ticks(0, 0, 1000, 0);
410        assert!((raw - 100.0).abs() < 0.01);
411        // Apply clamp explicitly to show it would cap any value > 100.
412        assert_eq!(raw.clamp(0.0, 100.0), 100.0);
413    }
414
415    // T-CPU-06: the first call to collect() returns 0.0 for all delta fields
416    // (utilization_pct, per_core_pct, utime_secs, stime_secs).  A warm-up
417    // sleep then a second collect() produces real data.
418    #[test]
419    fn test_first_collect_returns_zero_for_delta_fields() {
420        let mut collector = CpuCollector::new(None);
421        let metrics = collector.collect().expect("first collect failed");
422        assert_eq!(
423            metrics.utilization_pct, 0.0,
424            "utilization_pct must be 0.0 on first collect, got {}",
425            metrics.utilization_pct
426        );
427        assert!(
428            metrics.per_core_pct.iter().all(|&v| v == 0.0),
429            "per_core_pct must be all-zero on first collect: {:?}",
430            metrics.per_core_pct
431        );
432        assert_eq!(
433            metrics.utime_secs, 0.0,
434            "utime_secs must be 0.0 on first collect, got {}",
435            metrics.utime_secs
436        );
437        assert_eq!(
438            metrics.stime_secs, 0.0,
439            "stime_secs must be 0.0 on first collect, got {}",
440            metrics.stime_secs
441        );
442    }
443
444    // T-CPU-07: first collect() with PID tracking returns Some for process fields.
445    #[test]
446    fn test_first_collect_with_pid_returns_some_process_fields() {
447        let pid = i32::try_from(std::process::id()).expect("PID too large");
448        let mut collector = CpuCollector::new(Some(pid));
449        let m = collector.collect().expect("collect() failed");
450        assert!(
451            m.process_cores_used.is_some(),
452            "process_cores_used must be Some when PID is tracked"
453        );
454        assert!(
455            m.process_child_count.is_some(),
456            "process_child_count must be Some when PID is tracked"
457        );
458        assert!(
459            m.process_rss_mib.is_some(),
460            "process_rss_mib must be Some when PID is tracked"
461        );
462        assert!(
463            m.process_utime_secs.is_some(),
464            "process_utime_secs must be Some when PID is tracked"
465        );
466        assert!(
467            m.process_stime_secs.is_some(),
468            "process_stime_secs must be Some when PID is tracked"
469        );
470        assert!(
471            m.process_disk_read_bytes.is_some(),
472            "process_disk_read_bytes must be Some when PID is tracked"
473        );
474        assert!(
475            m.process_disk_write_bytes.is_some(),
476            "process_disk_write_bytes must be Some when PID is tracked"
477        );
478    }
479
480    // T-CPU-08: process_tree_rss_mib returns a positive value for the running test process.
481    #[test]
482    fn test_process_tree_rss_mib_nonzero_for_self() {
483        let pid = i32::try_from(std::process::id()).expect("PID too large");
484        let rss = process_tree_rss_mib(&[pid]);
485        assert!(
486            rss > 0,
487            "RSS for the current process should be > 0, got {rss}"
488        );
489    }
490
491    // T-CPU-09: process_tree_ticks contains the root PID.
492    // PID 1 (init/systemd) is used because it is always present and readable
493    // on any Linux host. Using std::process::id() is unreliable under
494    // llvm-cov instrumentation: the instrumented binary's own /proc entry
495    // can be transiently unreadable when many tests run in parallel.
496    #[test]
497    fn test_process_tree_ticks_contains_root_pid() {
498        let ticks = process_tree_ticks(1);
499        assert!(
500            ticks.contains_key(&1),
501            "process_tree_ticks(1) must contain PID 1 (init/systemd is always present)"
502        );
503    }
504
505    // T-CPU-10: second collect() with PID tracking produces non-negative cores.
506    #[test]
507    fn test_second_collect_with_pid_nonneg_cores() {
508        let pid = i32::try_from(std::process::id()).expect("PID too large");
509        let mut collector = CpuCollector::new(Some(pid));
510        let _ = collector.collect().expect("first collect() failed");
511        let m = collector.collect().expect("second collect() failed");
512        let cores = m
513            .process_cores_used
514            .expect("process_cores_used must be Some");
515        assert!(
516            cores >= 0.0,
517            "process_cores_used must be >= 0.0, got {cores}"
518        );
519    }
520
521    // T-CPU-11: second collect() with no PID still returns None for all process fields.
522    #[test]
523    fn test_second_collect_no_pid_all_process_fields_none() {
524        let mut collector = CpuCollector::new(None);
525        let _ = collector.collect().expect("first collect() failed");
526        let m = collector.collect().expect("second collect() failed");
527        assert!(
528            m.process_cores_used.is_none(),
529            "process_cores_used must be None when not tracking"
530        );
531        assert!(
532            m.process_child_count.is_none(),
533            "process_child_count must be None when not tracking"
534        );
535        assert!(
536            m.process_rss_mib.is_none(),
537            "process_rss_mib must be None when not tracking"
538        );
539        assert!(
540            m.process_utime_secs.is_none(),
541            "process_utime_secs must be None when not tracking"
542        );
543        assert!(
544            m.process_stime_secs.is_none(),
545            "process_stime_secs must be None when not tracking"
546        );
547        assert!(
548            m.process_disk_read_bytes.is_none(),
549            "process_disk_read_bytes must be None when not tracking"
550        );
551        assert!(
552            m.process_disk_write_bytes.is_none(),
553            "process_disk_write_bytes must be None when not tracking"
554        );
555    }
556
557    // T-CPU-12: process_count > 0 (at least one process is always visible).
558    #[test]
559    fn test_process_count_positive() {
560        let mut collector = CpuCollector::new(None);
561        let m = collector.collect().expect("collect() failed");
562        assert!(
563            m.process_count > 0,
564            "process_count must be > 0, got {}",
565            m.process_count
566        );
567    }
568}