1use crate::metrics::CpuMetrics;
2use procfs::prelude::*;
3use procfs::process::all_processes;
4use procfs::{CpuTime, KernelStats};
5use std::collections::{HashMap, HashSet};
6use std::time::Instant;
7
8type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
10#[derive(Debug, Clone, Copy, PartialEq)]
16enum CpuSource {
17 CgroupV2,
19 CgroupV1,
21 ProcStat,
23}
24
25impl CpuSource {
26 fn is_cgroup(self) -> bool {
27 !matches!(self, CpuSource::ProcStat)
28 }
29}
30
31#[derive(Debug, Clone, Copy)]
33struct CfsQuota {
34 max_cores: Option<f64>,
36}
37
38#[allow(clippy::collapsible_if)]
41fn detect_cpu_source() -> CpuSource {
42 if let Ok(contents) = std::fs::read_to_string("/sys/fs/cgroup/cpu.stat") {
44 if contents.contains("usage_usec") {
45 return CpuSource::CgroupV2;
46 }
47 }
48 for path in &[
50 "/sys/fs/cgroup/cpuacct/cpuacct.usage",
51 "/sys/fs/cgroup/cpu,cpuacct/cpuacct.usage",
52 "/sys/fs/cgroup/cpu/cpuacct.usage",
53 ] {
54 if std::fs::read_to_string(path).is_ok() {
55 return CpuSource::CgroupV1;
56 }
57 }
58 CpuSource::ProcStat
59}
60
61#[allow(clippy::collapsible_if)]
63fn detect_cfs_quota() -> CfsQuota {
64 if let Ok(contents) = std::fs::read_to_string("/sys/fs/cgroup/cpu.max") {
66 let parts: Vec<&str> = contents.split_whitespace().collect();
67 if parts.len() == 2 && parts[0] != "max" {
68 if let (Ok(quota), Ok(period)) = (parts[0].parse::<f64>(), parts[1].parse::<f64>()) {
69 if period > 0.0 {
70 return CfsQuota {
71 max_cores: Some(quota / period),
72 };
73 }
74 }
75 }
76 }
77 for prefix in &[
79 "/sys/fs/cgroup/cpu",
80 "/sys/fs/cgroup/cpu,cpuacct",
81 "/sys/fs/cgroup/cpuacct",
82 ] {
83 let quota_path = format!("{}/cpu.cfs_quota_us", prefix);
84 let period_path = format!("{}/cpu.cfs_period_us", prefix);
85 if let (Ok(q_str), Ok(p_str)) = (
86 std::fs::read_to_string("a_path),
87 std::fs::read_to_string(&period_path),
88 ) {
89 if let (Ok(quota), Ok(period)) =
90 (q_str.trim().parse::<i64>(), p_str.trim().parse::<i64>())
91 {
92 if quota > 0 && period > 0 {
94 return CfsQuota {
95 max_cores: Some(quota as f64 / period as f64),
96 };
97 }
98 }
99 }
100 }
101 CfsQuota { max_cores: None }
102}
103
104fn read_cgroupv2_usage_usec() -> Option<u64> {
106 let contents = std::fs::read_to_string("/sys/fs/cgroup/cpu.stat").ok()?;
107 for line in contents.lines() {
108 if let Some(val) = line.strip_prefix("usage_usec ") {
109 return val.trim().parse().ok();
110 }
111 }
112 None
113}
114
115#[allow(clippy::collapsible_if)]
117fn read_cgroupv1_usage_ns() -> Option<u64> {
118 for path in &[
119 "/sys/fs/cgroup/cpuacct/cpuacct.usage",
120 "/sys/fs/cgroup/cpu,cpuacct/cpuacct.usage",
121 "/sys/fs/cgroup/cpu/cpuacct.usage",
122 ] {
123 if let Ok(contents) = std::fs::read_to_string(path)
124 && let Ok(val) = contents.trim().parse()
125 {
126 return Some(val);
127 }
128 }
129 None
130}
131
132fn read_cgroup_usage_secs(source: CpuSource) -> Option<f64> {
135 match source {
136 CpuSource::CgroupV2 => read_cgroupv2_usage_usec().map(|usec| usec as f64 / 1_000_000.0),
137 CpuSource::CgroupV1 => read_cgroupv1_usage_ns().map(|ns| ns as f64 / 1_000_000_000.0),
138 CpuSource::ProcStat => None,
139 }
140}
141
142fn cpu_total(c: &CpuTime) -> u64 {
147 c.user
148 + c.nice
149 + c.system
150 + cpu_idle(c)
151 + c.irq.unwrap_or(0)
152 + c.softirq.unwrap_or(0)
153 + c.steal.unwrap_or(0)
154}
155
156fn cpu_idle(c: &CpuTime) -> u64 {
157 c.idle + c.iowait.unwrap_or(0)
158}
159
160fn core_util_pct(prev: &CpuTime, curr: &CpuTime) -> f64 {
162 util_pct_from_ticks(
163 cpu_total(prev),
164 cpu_idle(prev),
165 cpu_total(curr),
166 cpu_idle(curr),
167 )
168 .clamp(0.0, 100.0)
169}
170
171fn aggregate_util_cores(prev: &CpuTime, curr: &CpuTime, n_cores: usize) -> f64 {
174 util_pct_from_ticks(
175 cpu_total(prev),
176 cpu_idle(prev),
177 cpu_total(curr),
178 cpu_idle(curr),
179 ) / 100.0
180 * n_cores as f64
181}
182
183fn util_pct_from_ticks(prev_total: u64, prev_idle: u64, curr_total: u64, curr_idle: u64) -> f64 {
187 let delta_total = curr_total.saturating_sub(prev_total) as f64;
188 let delta_idle = curr_idle.saturating_sub(prev_idle) as f64;
189 if delta_total == 0.0 {
190 return 0.0;
191 }
192 (delta_total - delta_idle) / delta_total * 100.0
193}
194
195fn process_tree_ticks(root_pid: i32) -> HashMap<i32, (u64, u64)> {
203 let all: Vec<_> = match all_processes() {
205 Ok(iter) => iter.filter_map(|r| r.ok()).collect(),
206 Err(_) => return HashMap::new(),
207 };
208
209 let mut children: HashMap<i32, Vec<i32>> = HashMap::new();
223 let ticks_for: HashMap<i32, (u64, u64)> = all
224 .iter()
225 .filter_map(|proc| {
226 proc.stat().ok().map(|s| {
227 children.entry(s.ppid).or_default().push(proc.pid);
228 let user = s.utime + u64::try_from(s.cutime).unwrap_or(0);
229 let system = s.stime + u64::try_from(s.cstime).unwrap_or(0);
230 (proc.pid, (user, system))
231 })
232 })
233 .collect();
234
235 let mut result = HashMap::new();
237 let mut queue = vec![root_pid];
238 while let Some(pid) = queue.pop() {
239 if let Some(&ticks) = ticks_for.get(&pid) {
240 result.insert(pid, ticks);
241 }
242 if let Some(kids) = children.get(&pid) {
243 queue.extend(kids);
244 }
245 }
246 result
247}
248
249fn process_tree_memory_mib(pids: &[i32]) -> (u64, u64) {
253 let mut pss_kib = 0u64;
254 let mut rss_kib = 0u64;
255 for &pid in pids {
256 let Some(proc_) = procfs::process::Process::new(pid).ok() else {
257 continue;
258 };
259 if let Ok(rollup) = proc_.smaps_rollup()
260 && let Some(bytes) = rollup
261 .memory_map_rollup
262 .iter()
263 .find_map(|m| m.extension.map.get("Pss").copied())
264 {
265 pss_kib += bytes / 1024;
266 }
267 if let Ok(status) = proc_.status()
268 && let Some(vmrss) = status.vmrss
269 {
270 rss_kib += vmrss;
271 }
272 }
273 (pss_kib / 1024, rss_kib / 1024)
274}
275
276fn process_tree_io(pids: &[i32]) -> HashMap<i32, (u64, u64)> {
281 pids.iter()
282 .filter_map(|&pid| {
283 let io = procfs::process::Process::new(pid).ok()?.io().ok()?;
284 Some((pid, (io.read_bytes, io.write_bytes)))
285 })
286 .collect()
287}
288
289struct Snapshot {
294 total: CpuTime,
296 per_core: Vec<CpuTime>,
298 instant: Instant,
301 cgroup_usage_secs: Option<f64>,
303 proc_ticks: HashMap<i32, (u64, u64)>,
306 proc_io: HashMap<i32, (u64, u64)>,
309}
310
311pub struct CpuCollector {
312 pid: Option<i32>,
314 prev: Option<Snapshot>,
315 cpu_source: CpuSource,
317 cfs_quota: CfsQuota,
319 effective_cores: f64,
322 carried_forward: HashSet<i32>,
326}
327
328impl CpuCollector {
329 pub fn new(pid: Option<i32>) -> Self {
330 let cpu_source = detect_cpu_source();
331 let cfs_quota = detect_cfs_quota();
332
333 let physical_cores = KernelStats::current()
335 .map(|s| s.cpu_time.len())
336 .unwrap_or(1) as f64;
337 let effective_cores = match cfs_quota.max_cores {
338 Some(quota) => physical_cores.min(quota),
339 None => physical_cores,
340 };
341
342 Self {
343 pid,
344 prev: None,
345 cpu_source,
346 cfs_quota,
347 effective_cores,
348 carried_forward: HashSet::new(),
349 }
350 }
351
352 pub fn set_tracked_pid(&mut self, pid: Option<i32>) {
354 self.pid = pid;
355 }
356
357 pub fn collect(&mut self) -> Result<CpuMetrics> {
358 let tps = procfs::ticks_per_second() as f64;
359
360 let process_count = std::fs::read_dir("/proc")
361 .map(|dir| {
362 let n = dir
363 .filter_map(|e| e.ok())
364 .filter(|e| {
365 e.file_name()
366 .to_string_lossy()
367 .chars()
368 .all(|c| c.is_ascii_digit())
369 })
370 .count();
371 u32::try_from(n).unwrap_or(0)
372 })
373 .unwrap_or(0);
374
375 let stats = KernelStats::current()?;
382
383 let cgroup_usage_secs = read_cgroup_usage_secs(self.cpu_source);
385
386 let proc_ticks = match self.pid {
388 Some(root) => process_tree_ticks(root),
389 None => HashMap::new(),
390 };
391
392 let now = Instant::now();
394
395 let proc_io = if self.pid.is_some() {
397 let pids: Vec<i32> = proc_ticks.keys().copied().collect();
398 process_tree_io(&pids)
399 } else {
400 HashMap::new()
401 };
402
403 let (process_pss_mib, process_rss_mib) = if self.pid.is_some() {
405 let pids: Vec<i32> = proc_ticks.keys().copied().collect();
406 let (pss, rss) = process_tree_memory_mib(&pids);
407 (Some(pss), Some(rss))
408 } else {
409 (None, None)
410 };
411
412 let mut curr = Snapshot {
413 total: stats.total,
414 per_core: stats.cpu_time,
415 instant: now,
416 cgroup_usage_secs,
417 proc_ticks,
418 proc_io,
419 };
420
421 let metrics = match &self.prev {
422 None => CpuMetrics {
425 utilization_pct: 0.0,
426 cgroup_utilization_pct: curr
427 .cgroup_usage_secs
428 .filter(|_| self.cpu_source.is_cgroup())
429 .map(|_| 0.0),
430 cgroup_usage_secs: curr
431 .cgroup_usage_secs
432 .filter(|_| self.cpu_source.is_cgroup())
433 .map(|_| 0.0),
434 per_core_pct: vec![0.0; curr.per_core.len()],
435 utime_secs: 0.0,
436 stime_secs: 0.0,
437 process_count,
438 process_cores_used: self.pid.map(|_| 0.0),
439 process_child_count: self
440 .pid
441 .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0)),
442 process_utime_secs: self.pid.map(|_| 0.0),
443 process_stime_secs: self.pid.map(|_| 0.0),
444 process_pss_mib,
445 process_rss_mib,
446 process_disk_read_bytes: self.pid.map(|_| 0),
447 process_disk_write_bytes: self.pid.map(|_| 0),
448 process_gpu_usage: None, process_gpu_vram_mib: None, process_gpu_utilized: None,
451 process_tree_pids: curr.proc_ticks.keys().copied().collect(),
452 },
453
454 Some(prev) => {
455 let n_cores = curr.per_core.len();
456 let elapsed = (curr.instant - prev.instant).as_secs_f64().max(0.001);
457
458 let utime_secs = (curr.total.user + curr.total.nice)
461 .saturating_sub(prev.total.user + prev.total.nice)
462 as f64
463 / tps;
464 let stime_secs = curr.total.system.saturating_sub(prev.total.system) as f64 / tps;
465
466 let per_core_pct = prev
467 .per_core
468 .iter()
469 .zip(curr.per_core.iter())
470 .map(|(p, c)| core_util_pct(p, c))
471 .collect();
472
473 let utilization_pct = aggregate_util_cores(&prev.total, &curr.total, n_cores);
476
477 let (cgroup_usage_secs, cgroup_utilization_pct) =
479 match (curr.cgroup_usage_secs, prev.cgroup_usage_secs) {
480 (Some(curr_cg), Some(prev_cg)) => {
481 let delta = (curr_cg - prev_cg).max(0.0);
482 let cores_used = delta / elapsed;
483 (Some(delta), Some(cores_used.min(self.effective_cores)))
484 }
485 _ => (None, None),
486 };
487
488 let (exited_utime, exited_stime): (u64, u64) = if self.pid.is_some() {
500 prev.proc_ticks
501 .iter()
502 .filter(|(pid, _)| !curr.proc_ticks.contains_key(pid))
503 .fold((0u64, 0u64), |(au, as_), (_, &(pu, ps))| {
504 (au + pu, as_ + ps)
505 })
506 } else {
507 (0, 0)
508 };
509
510 let process_child_count = self
511 .pid
512 .map(|_| u32::try_from(curr.proc_ticks.len().saturating_sub(1)).unwrap_or(0));
513
514 let process_utime_secs = self.pid.map(|_| {
516 let raw: u64 = curr
517 .proc_ticks
518 .iter()
519 .map(|(pid, &(cu, _))| {
520 let pu = prev.proc_ticks.get(pid).map(|&(u, _)| u).unwrap_or(cu);
521 cu.saturating_sub(pu)
522 })
523 .sum();
524 if exited_utime <= raw {
525 (raw - exited_utime) as f64 / tps
526 } else {
527 raw as f64 / tps
528 }
529 });
530
531 let process_stime_secs = self.pid.map(|_| {
532 let raw: u64 = curr
533 .proc_ticks
534 .iter()
535 .map(|(pid, &(_, cs))| {
536 let ps = prev.proc_ticks.get(pid).map(|&(_, s)| s).unwrap_or(cs);
537 cs.saturating_sub(ps)
538 })
539 .sum();
540 if exited_stime <= raw {
541 (raw - exited_stime) as f64 / tps
542 } else {
543 raw as f64 / tps
544 }
545 });
546
547 let process_cores_used = match (self.pid, process_utime_secs, process_stime_secs) {
553 (Some(_), Some(u), Some(s)) => {
554 let raw_cores = ((u + s) / elapsed).max(0.0);
555
556 let sys_total_delta =
560 cpu_total(&curr.total).saturating_sub(cpu_total(&prev.total));
561 let sys_idle_delta =
562 cpu_idle(&curr.total).saturating_sub(cpu_idle(&prev.total));
563 let sys_busy_secs = if sys_total_delta > 0 {
564 (sys_total_delta - sys_idle_delta.min(sys_total_delta)) as f64 / tps
565 } else {
566 f64::MAX
567 };
568 let tick_ratio_cap = sys_busy_secs / elapsed;
569
570 let quota_cap = self.cfs_quota.max_cores.unwrap_or(n_cores as f64);
572
573 let capped = raw_cores.min(tick_ratio_cap).min(quota_cap);
575
576 Some(capped)
577 }
578 _ => None,
579 };
580
581 let process_disk_read_bytes = self.pid.map(|_| {
583 curr.proc_io
584 .iter()
585 .map(|(pid, &(cr, _))| {
586 let pr = prev.proc_io.get(pid).map(|&(r, _)| r).unwrap_or(cr);
587 cr.saturating_sub(pr)
588 })
589 .sum::<u64>()
590 });
591
592 let process_disk_write_bytes = self.pid.map(|_| {
593 curr.proc_io
594 .iter()
595 .map(|(pid, &(_, cw))| {
596 let pw = prev.proc_io.get(pid).map(|&(_, w)| w).unwrap_or(cw);
597 cw.saturating_sub(pw)
598 })
599 .sum::<u64>()
600 });
601
602 CpuMetrics {
603 utilization_pct,
604 cgroup_utilization_pct,
605 cgroup_usage_secs,
606 per_core_pct,
607 utime_secs,
608 stime_secs,
609 process_count,
610 process_cores_used,
611 process_child_count,
612 process_utime_secs,
613 process_stime_secs,
614 process_pss_mib,
615 process_rss_mib,
616 process_disk_read_bytes,
617 process_disk_write_bytes,
618 process_gpu_usage: None, process_gpu_vram_mib: None, process_gpu_utilized: None,
621 process_tree_pids: curr.proc_ticks.keys().copied().collect(),
622 }
623 }
624 };
625
626 let mut new_carried = HashSet::new();
636 if let Some(ref prev_snap) = self.prev {
637 for (&pid, &ticks) in &prev_snap.proc_ticks {
638 if !curr.proc_ticks.contains_key(&pid) && !self.carried_forward.contains(&pid) {
639 curr.proc_ticks.insert(pid, ticks);
640 new_carried.insert(pid);
641 }
642 }
643 for (&pid, &io) in &prev_snap.proc_io {
644 if !curr.proc_io.contains_key(&pid) && !self.carried_forward.contains(&pid) {
645 curr.proc_io.insert(pid, io);
646 }
647 }
648 }
649 self.carried_forward = new_carried;
650
651 self.prev = Some(curr);
652 Ok(metrics)
653 }
654}
655
656#[cfg(test)]
661mod tests {
662 use super::*;
663
664 #[test]
672 fn test_util_pct_all_idle_is_zero() {
673 assert_eq!(util_pct_from_ticks(0, 0, 1600, 1600), 0.0);
675 }
676
677 #[test]
678 fn test_util_pct_fully_busy_is_100() {
679 let pct = util_pct_from_ticks(0, 0, 1600, 0);
681 assert!((pct - 100.0).abs() < 0.01, "expected 100.0, got {pct}");
682 }
683
684 #[test]
685 fn test_util_pct_half_busy_is_50() {
686 let pct = util_pct_from_ticks(0, 0, 1600, 800);
688 assert!((pct - 50.0).abs() < 0.01, "expected 50.0, got {pct}");
689 }
690
691 #[test]
692 fn test_util_pct_no_delta_is_zero() {
693 assert_eq!(util_pct_from_ticks(100, 50, 100, 50), 0.0);
695 }
696
697 #[test]
700 fn test_aggregate_util_cores_no_clamp() {
701 let pct = util_pct_from_ticks(0, 0, 1000, 1);
703 let cores = pct / 100.0 * 4.0_f64;
704 assert!(cores > 3.9, "expected close to 4.0, got {cores}");
705 assert!(
706 cores < 4.05,
707 "should not greatly exceed n_cores, got {cores}"
708 );
709 }
710
711 #[test]
714 fn test_util_pct_raw_is_not_clamped() {
715 let raw = util_pct_from_ticks(0, 0, 1000, 0);
717 assert!((raw - 100.0).abs() < 0.01);
718 assert_eq!(raw.clamp(0.0, 100.0), 100.0);
720 }
721
722 #[test]
726 fn test_first_collect_returns_zero_for_delta_fields() {
727 let mut collector = CpuCollector::new(None);
728 let metrics = collector.collect().expect("first collect failed");
729 assert_eq!(
730 metrics.utilization_pct, 0.0,
731 "utilization_pct must be 0.0 on first collect, got {}",
732 metrics.utilization_pct
733 );
734 assert!(
735 metrics.per_core_pct.iter().all(|&v| v == 0.0),
736 "per_core_pct must be all-zero on first collect: {:?}",
737 metrics.per_core_pct
738 );
739 assert_eq!(
740 metrics.utime_secs, 0.0,
741 "utime_secs must be 0.0 on first collect, got {}",
742 metrics.utime_secs
743 );
744 assert_eq!(
745 metrics.stime_secs, 0.0,
746 "stime_secs must be 0.0 on first collect, got {}",
747 metrics.stime_secs
748 );
749 }
750
751 #[test]
753 fn test_first_collect_with_pid_returns_some_process_fields() {
754 let pid = i32::try_from(std::process::id()).expect("PID too large");
755 let mut collector = CpuCollector::new(Some(pid));
756 let m = collector.collect().expect("collect() failed");
757 assert!(
758 m.process_cores_used.is_some(),
759 "process_cores_used must be Some when PID is tracked"
760 );
761 assert!(
762 m.process_child_count.is_some(),
763 "process_child_count must be Some when PID is tracked"
764 );
765 assert!(
766 m.process_pss_mib.is_some(),
767 "process_pss_mib must be Some when PID is tracked"
768 );
769 assert!(
770 m.process_rss_mib.is_some(),
771 "process_rss_mib must be Some when PID is tracked"
772 );
773 assert!(
774 m.process_utime_secs.is_some(),
775 "process_utime_secs must be Some when PID is tracked"
776 );
777 assert!(
778 m.process_stime_secs.is_some(),
779 "process_stime_secs must be Some when PID is tracked"
780 );
781 assert!(
782 m.process_disk_read_bytes.is_some(),
783 "process_disk_read_bytes must be Some when PID is tracked"
784 );
785 assert!(
786 m.process_disk_write_bytes.is_some(),
787 "process_disk_write_bytes must be Some when PID is tracked"
788 );
789 }
790
791 #[test]
793 fn test_process_tree_memory_nonzero_for_self() {
794 let pid = i32::try_from(std::process::id()).expect("PID too large");
795 let (pss, rss) = process_tree_memory_mib(&[pid]);
796 assert!(
797 pss > 0,
798 "PSS for the current process should be > 0, got {pss}"
799 );
800 assert!(
801 rss > 0,
802 "RSS for the current process should be > 0, got {rss}"
803 );
804 assert!(
805 pss <= rss,
806 "PSS ({pss}) should not exceed RSS ({rss}) for a single process"
807 );
808 }
809
810 #[test]
816 fn test_process_tree_ticks_contains_root_pid() {
817 let ticks = process_tree_ticks(1);
818 assert!(
819 ticks.contains_key(&1),
820 "process_tree_ticks(1) must contain PID 1 (init/systemd is always present)"
821 );
822 }
823
824 #[test]
826 fn test_second_collect_with_pid_nonneg_cores() {
827 let pid = i32::try_from(std::process::id()).expect("PID too large");
828 let mut collector = CpuCollector::new(Some(pid));
829 let _ = collector.collect().expect("first collect() failed");
830 let m = collector.collect().expect("second collect() failed");
831 let cores = m
832 .process_cores_used
833 .expect("process_cores_used must be Some");
834 assert!(
835 cores >= 0.0,
836 "process_cores_used must be >= 0.0, got {cores}"
837 );
838 }
839
840 #[test]
842 fn test_second_collect_no_pid_all_process_fields_none() {
843 let mut collector = CpuCollector::new(None);
844 let _ = collector.collect().expect("first collect() failed");
845 let m = collector.collect().expect("second collect() failed");
846 assert!(
847 m.process_cores_used.is_none(),
848 "process_cores_used must be None when not tracking"
849 );
850 assert!(
851 m.process_child_count.is_none(),
852 "process_child_count must be None when not tracking"
853 );
854 assert!(
855 m.process_pss_mib.is_none(),
856 "process_pss_mib must be None when not tracking"
857 );
858 assert!(
859 m.process_rss_mib.is_none(),
860 "process_rss_mib must be None when not tracking"
861 );
862 assert!(
863 m.process_utime_secs.is_none(),
864 "process_utime_secs must be None when not tracking"
865 );
866 assert!(
867 m.process_stime_secs.is_none(),
868 "process_stime_secs must be None when not tracking"
869 );
870 assert!(
871 m.process_disk_read_bytes.is_none(),
872 "process_disk_read_bytes must be None when not tracking"
873 );
874 assert!(
875 m.process_disk_write_bytes.is_none(),
876 "process_disk_write_bytes must be None when not tracking"
877 );
878 }
879
880 #[test]
882 fn test_process_count_positive() {
883 let mut collector = CpuCollector::new(None);
884 let m = collector.collect().expect("collect() failed");
885 assert!(
886 m.process_count > 0,
887 "process_count must be > 0, got {}",
888 m.process_count
889 );
890 }
891
892 #[test]
904 fn test_cutime_correction_cancels_exited_child_ticks() {
905 let prev: HashMap<i32, (u64, u64)> = [
906 (200, (50, 0)), (100, (500, 0)), ]
909 .iter()
910 .cloned()
911 .collect();
912
913 let curr: HashMap<i32, (u64, u64)> =
917 [(200, (50 + 250 + 2500, 0))].iter().cloned().collect();
918
919 let raw: u64 = curr
920 .iter()
921 .map(|(pid, &(cu, cs))| {
922 let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
923 cu.saturating_sub(pu) + cs.saturating_sub(ps)
924 })
925 .sum();
926 assert_eq!(
927 raw, 2750,
928 "raw delta must include the double-counted pre-snapshot child ticks"
929 );
930
931 let exited: u64 = prev
932 .iter()
933 .filter(|(pid, _)| !curr.contains_key(pid))
934 .map(|(_, &(pu, ps))| pu + ps)
935 .sum();
936 assert_eq!(
937 exited, 500,
938 "exited ticks must equal the child's pre-snapshot tick count"
939 );
940
941 let corrected = raw.saturating_sub(exited);
942 assert_eq!(
944 corrected, 2250,
945 "corrected delta must exclude the child's pre-snapshot ticks"
946 );
947 }
948
949 #[test]
956 fn test_cutime_correction_handles_cascaded_exits() {
957 let prev: HashMap<i32, (u64, u64)> = [
958 (7, (0, 0)), (8, (100, 0)), (9, (200, 0)), ]
962 .iter()
963 .cloned()
964 .collect();
965
966 let curr: HashMap<i32, (u64, u64)> = [(7, (30 + 400, 0))].iter().cloned().collect();
973
974 let raw: u64 = curr
975 .iter()
976 .map(|(pid, &(cu, cs))| {
977 let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
978 cu.saturating_sub(pu) + cs.saturating_sub(ps)
979 })
980 .sum();
981 assert_eq!(raw, 430);
983
984 let exited: u64 = prev
985 .iter()
986 .filter(|(pid, _)| !curr.contains_key(pid))
987 .map(|(_, &(pu, ps))| pu + ps)
988 .sum();
989 assert_eq!(
990 exited, 300,
991 "exited = child pre-snap (100) + grandchild pre-snap (200)"
992 );
993
994 let corrected = raw.saturating_sub(exited);
995 assert_eq!(corrected, 130);
997 }
998
999 #[test]
1017 fn test_process_cores_used_does_not_exceed_system_utilization() {
1018 let pid = i32::try_from(std::process::id()).expect("PID too large");
1019 let mut collector = CpuCollector::new(Some(pid));
1020
1021 let mut child = std::process::Command::new("sh")
1024 .args(["-c", "while true; do :; done"])
1025 .spawn()
1026 .expect("failed to spawn sh busy-loop -- required for T-CPU-15");
1027
1028 std::thread::sleep(std::time::Duration::from_millis(200));
1032
1033 let _ = collector.collect().expect("warm-up collect failed");
1035
1036 child.kill().ok();
1041 child.wait().ok();
1042
1043 let m = collector.collect().expect("second collect failed");
1044
1045 let proc_utime = m
1046 .process_utime_secs
1047 .expect("process_utime_secs must be Some");
1048 let proc_stime = m
1049 .process_stime_secs
1050 .expect("process_stime_secs must be Some");
1051 let proc_cpu = proc_utime + proc_stime;
1052 let sys_cpu = m.utime_secs + m.stime_secs;
1053
1054 let tolerance = sys_cpu * 0.15 + 0.05;
1059 assert!(
1060 proc_cpu <= sys_cpu + tolerance,
1061 "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
1062 must not exceed system CPU ({sys_cpu:.3}s) -- cutime double-counting regression \
1063 for issue #20"
1064 );
1065 }
1066
1067 #[test]
1075 fn test_process_utime_no_double_count_after_child_exits() {
1076 let pid = i32::try_from(std::process::id()).expect("PID too large");
1077 let mut collector = CpuCollector::new(Some(pid));
1078
1079 let mut child = std::process::Command::new("sh")
1082 .args(["-c", "for i in $(seq 1 20000); do :; done"])
1083 .spawn()
1084 .expect("failed to spawn sh -- required for T-CPU-16");
1085
1086 std::thread::sleep(std::time::Duration::from_millis(20));
1089
1090 let _ = collector.collect().expect("warm-up collect failed");
1092
1093 let _ = child.wait().expect("failed to wait for child");
1095
1096 let m = collector.collect().expect("second collect failed");
1100
1101 let proc_utime = m
1102 .process_utime_secs
1103 .expect("process_utime_secs must be Some when a PID is tracked");
1104 let sys_utime = m.utime_secs;
1105
1106 let tolerance = sys_utime * 0.05 + 0.05;
1108 assert!(
1109 proc_utime <= sys_utime + tolerance,
1110 "process_utime_secs ({proc_utime:.3}s) exceeds system utime_secs ({sys_utime:.3}s) -- \
1111 cutime double-counting regression (issue #20)"
1112 );
1113 }
1114
1115 #[test]
1132 fn test_cutime_correction_multi_interval_child_exit() {
1133 let pid = i32::try_from(std::process::id()).expect("PID too large");
1134 let mut collector = CpuCollector::new(Some(pid));
1135
1136 let mut child = std::process::Command::new("sh")
1138 .args(["-c", "while true; do :; done"])
1139 .spawn()
1140 .expect("failed to spawn sh busy-loop -- required for T-CPU-17");
1141
1142 std::thread::sleep(std::time::Duration::from_millis(100));
1144 let _ = collector.collect().expect("warm-up collect failed");
1145
1146 std::thread::sleep(std::time::Duration::from_millis(100));
1149 let _ = collector.collect().expect("intermediate collect failed");
1150
1151 child.kill().ok();
1155 child.wait().ok();
1156
1157 let m = collector.collect().expect("final collect failed");
1158
1159 let proc_utime = m
1160 .process_utime_secs
1161 .expect("process_utime_secs must be Some");
1162 let proc_stime = m
1163 .process_stime_secs
1164 .expect("process_stime_secs must be Some");
1165 let proc_cpu = proc_utime + proc_stime;
1166 let sys_cpu = m.utime_secs + m.stime_secs;
1167
1168 let tolerance = sys_cpu * 1.0 + 0.50;
1174 assert!(
1175 proc_cpu <= sys_cpu + tolerance,
1176 "process CPU ({proc_cpu:.3}s = {proc_utime:.3}s utime + {proc_stime:.3}s stime) \
1177 must not exceed system CPU ({sys_cpu:.3}s) across multiple intervals -- \
1178 cutime multi-interval regression for issue #20"
1179 );
1180 }
1181
1182 #[test]
1203 fn test_pss_tracks_file_backed_mapping() {
1204 use std::fs;
1205 use std::io::Write as _;
1206 use std::os::unix::io::AsRawFd;
1207
1208 const MAPPING_MIB: usize = 4;
1209 const MAPPING_SIZE: usize = MAPPING_MIB * 1024 * 1024;
1210
1211 let pid = i32::try_from(std::process::id()).expect("PID too large");
1212 let path = format!("/tmp/rt_test_pss_{}", std::process::id());
1213
1214 let (pss_before, rss_before) = process_tree_memory_mib(&[pid]);
1215
1216 {
1218 let mut f = fs::File::create(&path).expect("cannot create temp file for T-CPU-18");
1219 let chunk = vec![0xABu8; 64 * 1024];
1220 for _ in 0..(MAPPING_SIZE / chunk.len()) {
1221 f.write_all(&chunk).expect("write failed");
1222 }
1223 }
1224
1225 let file = fs::File::open(&path).expect("cannot open temp file for T-CPU-18");
1226 let ptr = unsafe {
1227 libc::mmap(
1228 std::ptr::null_mut(),
1229 MAPPING_SIZE,
1230 libc::PROT_READ,
1231 libc::MAP_PRIVATE,
1232 file.as_raw_fd(),
1233 0,
1234 )
1235 };
1236 assert_ne!(ptr, libc::MAP_FAILED, "mmap failed in T-CPU-18");
1237
1238 let slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, MAPPING_SIZE) };
1240 let mut checksum = 0u64;
1241 for offset in (0..MAPPING_SIZE).step_by(4096) {
1242 checksum = checksum.wrapping_add(u64::from(slice[offset]));
1243 }
1244 let _ = checksum;
1245
1246 let (pss_after, rss_after) = process_tree_memory_mib(&[pid]);
1247
1248 unsafe { libc::munmap(ptr, MAPPING_SIZE) };
1250 fs::remove_file(&path).ok();
1251
1252 let pss_delta = pss_after.saturating_sub(pss_before);
1253 let rss_delta = rss_after.saturating_sub(rss_before);
1254
1255 const TRUNC_SLACK_MIB: u64 = 1;
1260
1261 assert!(
1262 rss_delta + TRUNC_SLACK_MIB >= MAPPING_MIB as u64,
1263 "RSS must increase by >= {MAPPING_MIB} MiB after touching the mapping: \
1264 before={rss_before} MiB, after={rss_after} MiB (delta={rss_delta} MiB)"
1265 );
1266 assert!(
1267 pss_delta + TRUNC_SLACK_MIB >= MAPPING_MIB as u64,
1268 "PSS must increase by >= {MAPPING_MIB} MiB as sole mapper of the file: \
1269 before={pss_before} MiB, after={pss_after} MiB (delta={pss_delta} MiB)"
1270 );
1271 assert!(
1272 pss_after <= rss_after + TRUNC_SLACK_MIB,
1273 "PSS ({pss_after} MiB) must not exceed RSS ({rss_after} MiB)"
1274 );
1275 let skew = pss_delta.abs_diff(rss_delta);
1279 assert!(
1280 skew <= 1 + TRUNC_SLACK_MIB,
1281 "PSS delta ({pss_delta} MiB) and RSS delta ({rss_delta} MiB) must agree within \
1282 2 MiB for a sole mapper -- larger skew indicates smaps_rollup is not being read"
1283 );
1284 }
1285
1286 #[test]
1294 fn test_mib_truncation_can_underreport_pss_delta() {
1295 let pss_before_bytes: u64 = (8 * 1024 + 1) * 1024; let pss_after_bytes: u64 = (12 * 1024 - 1) * 1024; let before_mib = (pss_before_bytes / 1024) / 1024; let after_mib = (pss_after_bytes / 1024) / 1024; let delta_mib = after_mib.saturating_sub(before_mib); assert_eq!(before_mib, 8);
1304 assert_eq!(after_mib, 11);
1305 assert_eq!(
1306 delta_mib, 3,
1307 "truncation makes ~4 MiB delta appear as 3 MiB"
1308 );
1309
1310 assert!(delta_mib < 4);
1312
1313 const TRUNC_SLACK_MIB: u64 = 1;
1315 assert!(delta_mib + TRUNC_SLACK_MIB >= 4);
1316 }
1317
1318 fn read_pss_kib(pid: i32) -> u64 {
1321 let proc_ = procfs::process::Process::new(pid).expect("process not found");
1322 proc_
1323 .smaps_rollup()
1324 .expect("smaps_rollup unavailable")
1325 .memory_map_rollup
1326 .iter()
1327 .find_map(|m| m.extension.map.get("Pss").copied())
1328 .unwrap_or(0)
1329 / 1024
1330 }
1331
1332 #[test]
1337 fn test_pss_tracks_file_backed_mapping_kib_resolution() {
1338 use std::fs;
1339 use std::io::Write as _;
1340 use std::os::unix::io::AsRawFd;
1341
1342 const MAPPING_MIB: usize = 4;
1343 const MAPPING_SIZE: usize = MAPPING_MIB * 1024 * 1024;
1344 const EXPECTED_DELTA_KIB: u64 = (MAPPING_MIB * 1024) as u64;
1345 const SLACK_KIB: u64 = 64;
1346
1347 let pid = i32::try_from(std::process::id()).expect("PID too large");
1348 let path = format!("/tmp/rt_test_pss_kib_{}", std::process::id());
1349
1350 let pss_before_kib = read_pss_kib(pid);
1351
1352 {
1353 let mut f = fs::File::create(&path).expect("cannot create temp file for T-CPU-18b");
1354 let chunk = vec![0xCDu8; 64 * 1024];
1355 for _ in 0..(MAPPING_SIZE / chunk.len()) {
1356 f.write_all(&chunk).expect("write failed");
1357 }
1358 }
1359
1360 let file = fs::File::open(&path).expect("cannot open temp file for T-CPU-18b");
1361 let ptr = unsafe {
1362 libc::mmap(
1363 std::ptr::null_mut(),
1364 MAPPING_SIZE,
1365 libc::PROT_READ,
1366 libc::MAP_PRIVATE,
1367 file.as_raw_fd(),
1368 0,
1369 )
1370 };
1371 assert_ne!(ptr, libc::MAP_FAILED, "mmap failed in T-CPU-18b");
1372
1373 let slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, MAPPING_SIZE) };
1374 let mut checksum = 0u64;
1375 for offset in (0..MAPPING_SIZE).step_by(4096) {
1376 checksum = checksum.wrapping_add(u64::from(slice[offset]));
1377 }
1378 let _ = checksum;
1379
1380 let pss_after_kib = read_pss_kib(pid);
1381
1382 unsafe { libc::munmap(ptr, MAPPING_SIZE) };
1383 fs::remove_file(&path).ok();
1384
1385 let pss_delta_kib = pss_after_kib.saturating_sub(pss_before_kib);
1386
1387 assert!(
1388 pss_delta_kib + SLACK_KIB >= EXPECTED_DELTA_KIB,
1389 "PSS must increase by >= {EXPECTED_DELTA_KIB} KiB (±{SLACK_KIB} KiB) as sole \
1390 mapper: before={pss_before_kib} KiB, after={pss_after_kib} KiB \
1391 (delta={pss_delta_kib} KiB)"
1392 );
1393 }
1394
1395 #[test]
1404 fn test_cutime_correction_skipped_when_exited_exceeds_raw() {
1405 let prev: HashMap<i32, (u64, u64)> =
1406 [(1, (500, 0)), (2, (50000, 0))].iter().cloned().collect();
1407
1408 let curr: HashMap<i32, (u64, u64)> = [(1, (600, 0))].iter().cloned().collect();
1409
1410 let raw: u64 = curr
1411 .iter()
1412 .map(|(pid, &(cu, cs))| {
1413 let (pu, ps) = prev.get(pid).copied().unwrap_or((cu, cs));
1414 cu.saturating_sub(pu) + cs.saturating_sub(ps)
1415 })
1416 .sum();
1417 assert_eq!(raw, 100, "raw delta is parent's own 100 ticks");
1418
1419 let exited: u64 = prev
1420 .iter()
1421 .filter(|(pid, _)| !curr.contains_key(pid))
1422 .map(|(_, &(pu, ps))| pu + ps)
1423 .sum();
1424 assert_eq!(exited, 50000);
1425
1426 assert_eq!(raw.saturating_sub(exited), 0);
1428
1429 let corrected = if exited <= raw { raw - exited } else { raw };
1431 assert_eq!(
1432 corrected, 100,
1433 "must preserve raw delta when correction is implausible"
1434 );
1435 }
1436
1437 #[test]
1441 fn test_carry_forward_spans_gap_for_reappearing_pid() {
1442 let prev: HashMap<i32, (u64, u64)> =
1443 [(1, (500, 0)), (2, (10000, 0))].iter().cloned().collect();
1444
1445 let mut stored_prev: HashMap<i32, (u64, u64)> = [(1, (600, 0))].iter().cloned().collect();
1447 for (&pid, &ticks) in &prev {
1448 stored_prev.entry(pid).or_insert(ticks);
1449 }
1450 assert_eq!(
1451 stored_prev.get(&2),
1452 Some(&(10000, 0)),
1453 "child must be carried forward with prev ticks"
1454 );
1455
1456 let curr: HashMap<i32, (u64, u64)> =
1458 [(1, (700, 0)), (2, (11000, 0))].iter().cloned().collect();
1459
1460 let delta_with_cf: u64 = curr
1461 .iter()
1462 .map(|(pid, &(cu, cs))| {
1463 let (pu, ps) = stored_prev.get(pid).copied().unwrap_or((cu, cs));
1464 cu.saturating_sub(pu) + cs.saturating_sub(ps)
1465 })
1466 .sum();
1467 assert_eq!(
1468 delta_with_cf, 1100,
1469 "with carry-forward: parent delta (100) + child delta spanning gap (1000)"
1470 );
1471
1472 let no_cf_prev: HashMap<i32, (u64, u64)> = [(1, (600, 0))].iter().cloned().collect();
1474 let delta_without_cf: u64 = curr
1475 .iter()
1476 .map(|(pid, &(cu, cs))| {
1477 let (pu, ps) = no_cf_prev.get(pid).copied().unwrap_or((cu, cs));
1478 cu.saturating_sub(pu) + cs.saturating_sub(ps)
1479 })
1480 .sum();
1481 assert_eq!(
1482 delta_without_cf, 100,
1483 "without carry-forward: only parent delta (100), child contribution lost"
1484 );
1485 }
1486
1487 #[test]
1491 fn test_carry_forward_limited_to_one_hop() {
1492 let mut carried_forward: HashSet<i32> = HashSet::new();
1493
1494 let prev_ticks: HashMap<i32, (u64, u64)> =
1496 [(1, (500, 0)), (2, (10000, 0))].iter().cloned().collect();
1497 let mut curr_ticks: HashMap<i32, (u64, u64)> = [(1, (600, 0))].iter().cloned().collect();
1498
1499 let mut new_carried = HashSet::new();
1500 for (&pid, &ticks) in &prev_ticks {
1501 if !curr_ticks.contains_key(&pid) && !carried_forward.contains(&pid) {
1502 curr_ticks.insert(pid, ticks);
1503 new_carried.insert(pid);
1504 }
1505 }
1506 carried_forward = new_carried;
1507
1508 assert!(
1509 curr_ticks.contains_key(&2),
1510 "child must be carried forward in interval N"
1511 );
1512 assert!(
1513 carried_forward.contains(&2),
1514 "child must be in the carried-forward set"
1515 );
1516
1517 let prev_ticks_n1 = curr_ticks.clone();
1519 let mut curr_ticks_n1: HashMap<i32, (u64, u64)> = [(1, (700, 0))].iter().cloned().collect();
1520
1521 let mut new_carried_n1 = HashSet::new();
1522 for (&pid, &ticks) in &prev_ticks_n1 {
1523 if !curr_ticks_n1.contains_key(&pid) && !carried_forward.contains(&pid) {
1524 curr_ticks_n1.insert(pid, ticks);
1525 new_carried_n1.insert(pid);
1526 }
1527 }
1528
1529 assert!(
1530 !curr_ticks_n1.contains_key(&2),
1531 "child must NOT be carried forward a second time"
1532 );
1533 assert!(
1534 !new_carried_n1.contains(&2),
1535 "child must NOT be in the new carried-forward set"
1536 );
1537 }
1538}