Skip to main content

resource_tracker/collector/
disk.rs

1use crate::metrics::{DiskMetrics, DiskMountMetrics, DiskType};
2use std::collections::HashMap;
3use std::ffi::CString;
4use std::time::Instant;
5
6type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
7
8const SECTOR_BYTES: u64 = 512;
9
10// ---------------------------------------------------------------------------
11// sysfs helpers
12// ---------------------------------------------------------------------------
13
14fn sysfs_read(path: &str) -> Option<String> {
15    std::fs::read_to_string(path)
16        .ok()
17        .map(|s| s.trim().to_string())
18        .filter(|s| !s.is_empty())
19}
20
21fn block_attr(device: &str, attr: &str) -> Option<String> {
22    sysfs_read(&format!("/sys/block/{}/{}", device, attr))
23}
24
25// ---------------------------------------------------------------------------
26// Hardware identity - read once at startup
27// ---------------------------------------------------------------------------
28
29#[derive(Clone)]
30struct DeviceInfo {
31    model: Option<String>,
32    vendor: Option<String>,
33    serial: Option<String>,
34    device_type: Option<DiskType>,
35    capacity_bytes: Option<u64>,
36    /// Physical sector size in bytes used for I/O accounting.
37    /// Read from `/sys/block/<dev>/queue/hw_sector_size`; falls back to 512.
38    sector_size: u32,
39}
40
41fn read_device_info(device: &str) -> DeviceInfo {
42    let model = block_attr(device, "device/model");
43    let vendor = block_attr(device, "device/vendor");
44    let serial = block_attr(device, "device/serial").or_else(|| block_attr(device, "device/wwid"));
45
46    let device_type = if device.starts_with("nvme") {
47        Some(DiskType::Nvme)
48    } else {
49        match block_attr(device, "queue/rotational").as_deref() {
50            Some("0") => Some(DiskType::Ssd),
51            Some("1") => Some(DiskType::Hdd),
52            _ => None,
53        }
54    };
55
56    // /sys/block/<dev>/size reports 512-byte logical sectors regardless of
57    // physical sector size, so capacity always uses SECTOR_BYTES (512).
58    let capacity_bytes = block_attr(device, "size")
59        .and_then(|s| s.parse::<u64>().ok())
60        .map(|sectors| sectors * SECTOR_BYTES);
61
62    // Physical sector size for I/O byte accounting.  On 4K-native NVMe drives
63    // this is 4096; on most SATA/HDD it is 512.  The kernel value is
64    // authoritative; fall back to 512 if absent or unparseable.
65    let sector_size = block_attr(device, "queue/hw_sector_size")
66        .and_then(|s| s.parse::<u32>().ok())
67        .filter(|&v| v >= 512)
68        .unwrap_or(u32::try_from(SECTOR_BYTES).unwrap_or(512));
69
70    DeviceInfo {
71        model,
72        vendor,
73        serial,
74        device_type,
75        capacity_bytes,
76        sector_size,
77    }
78}
79
80/// Discover all whole-disk block devices from /sys/block/ and cache their
81/// static identity. Called once in DiskCollector::new().
82fn discover_devices() -> HashMap<String, DeviceInfo> {
83    let Ok(entries) = std::fs::read_dir("/sys/block") else {
84        return HashMap::new();
85    };
86    entries
87        .flatten()
88        .filter_map(|e| {
89            let name = e.file_name().to_string_lossy().to_string();
90            if name.starts_with("loop") || name.starts_with("ram") {
91                return None;
92            }
93            let info = read_device_info(&name);
94            Some((name, info))
95        })
96        .collect()
97}
98
99// ---------------------------------------------------------------------------
100// Filesystem space - statvfs, polled each interval
101// ---------------------------------------------------------------------------
102
103fn statvfs_space(path: &str) -> Option<(u64, u64, u64)> {
104    let cpath = CString::new(path).ok()?;
105    unsafe {
106        let mut buf: libc::statvfs = std::mem::zeroed();
107        if libc::statvfs(cpath.as_ptr(), &mut buf) != 0 {
108            return None;
109        }
110        // f_frsize is the fundamental block size; fall back to f_bsize if zero.
111        let bs = if buf.f_frsize > 0 {
112            buf.f_frsize as u64
113        } else {
114            buf.f_bsize as u64
115        };
116        let total = buf.f_blocks * bs;
117        let avail = buf.f_bavail * bs;
118        let used = total.saturating_sub(buf.f_bfree * bs);
119        Some((total, used, avail))
120    }
121}
122
123/// Read /proc/mounts and return filesystem space for all mount points whose
124/// source device path starts with `/dev/<device_name>` (covers partitions too).
125fn mounts_for_device(device_name: &str) -> Vec<DiskMountMetrics> {
126    let content = match std::fs::read_to_string("/proc/mounts") {
127        Ok(c) => c,
128        Err(_) => return vec![],
129    };
130    let prefix = format!("/dev/{}", device_name);
131    content
132        .lines()
133        .filter(|line| line.starts_with(&prefix))
134        .filter_map(|line| {
135            let mut parts = line.split_whitespace();
136            let _source = parts.next()?;
137            let mount_point = parts.next()?.to_string();
138            let filesystem = parts.next()?.to_string();
139            let (total, used, avail) = statvfs_space(&mount_point)?;
140            let used_pct = if total > 0 {
141                used as f64 / total as f64 * 100.0
142            } else {
143                0.0
144            };
145            Some(DiskMountMetrics {
146                mount_point,
147                filesystem,
148                total_bytes: total,
149                used_bytes: used,
150                available_bytes: avail,
151                used_pct,
152            })
153        })
154        .collect()
155}
156
157// ---------------------------------------------------------------------------
158// Delta snapshot + Collector
159// ---------------------------------------------------------------------------
160
161struct Snapshot {
162    instant: Instant,
163    sectors_read: HashMap<String, u64>,
164    sectors_written: HashMap<String, u64>,
165}
166
167pub struct DiskCollector {
168    /// Static hardware identity, cached once in new().
169    device_cache: HashMap<String, DeviceInfo>,
170    prev: Option<Snapshot>,
171}
172
173impl DiskCollector {
174    pub fn new() -> Self {
175        Self {
176            device_cache: discover_devices(),
177            prev: None,
178        }
179    }
180
181    pub fn collect(&mut self) -> Result<Vec<DiskMetrics>> {
182        let diskstats = procfs::diskstats()?;
183        let now = Instant::now();
184
185        // Include every device that is a direct /sys/block entry - these are
186        // whole-disk devices (not partitions).  This matches Python's
187        // resource-tracker, which uses the same /sys/block membership check to
188        // distinguish whole disks from partitions.  Importantly, this keeps
189        // loop*, dm-*, and zram* devices which Python also includes.
190        let block_set: std::collections::HashSet<String> = std::fs::read_dir("/sys/block")
191            .map(|dir| {
192                dir.flatten()
193                    .map(|e| e.file_name().to_string_lossy().to_string())
194                    .collect()
195            })
196            .unwrap_or_default();
197
198        let devs: Vec<_> = diskstats
199            .iter()
200            .filter(|d| block_set.contains(&d.name))
201            .collect();
202
203        let sectors_read: HashMap<String, u64> = devs
204            .iter()
205            .map(|d| (d.name.clone(), d.sectors_read))
206            .collect();
207        let sectors_written: HashMap<String, u64> = devs
208            .iter()
209            .map(|d| (d.name.clone(), d.sectors_written))
210            .collect();
211
212        let mut metrics: Vec<DiskMetrics> = devs
213            .iter()
214            .map(|d| {
215                let info = self.device_cache.get(&d.name);
216
217                let sector_size: u32 = info
218                    .map_or(u32::try_from(SECTOR_BYTES).unwrap_or(512), |i| {
219                        i.sector_size
220                    });
221                let sector_size_f64 = f64::from(sector_size);
222                let sector_size_u64 = u64::from(sector_size);
223
224                let (read_bps, write_bps) = match &self.prev {
225                    None => (0.0, 0.0),
226                    Some(prev) => {
227                        let secs = (now - prev.instant).as_secs_f64().max(0.001);
228                        let sr = sectors_read[&d.name];
229                        let sw = sectors_written[&d.name];
230                        let psr = prev.sectors_read.get(&d.name).copied().unwrap_or(sr);
231                        let psw = prev.sectors_written.get(&d.name).copied().unwrap_or(sw);
232                        // u64 -> f64 is lossy for very large values but no From impl exists in std.
233                        (
234                            sr.saturating_sub(psr) as f64 * sector_size_f64 / secs,
235                            sw.saturating_sub(psw) as f64 * sector_size_f64 / secs,
236                        )
237                    }
238                };
239
240                DiskMetrics {
241                    device: d.name.clone(),
242                    model: info.and_then(|i| i.model.clone()),
243                    vendor: info.and_then(|i| i.vendor.clone()),
244                    serial: info.and_then(|i| i.serial.clone()),
245                    device_type: info.and_then(|i| i.device_type.clone()),
246                    capacity_bytes: info.and_then(|i| i.capacity_bytes),
247                    mounts: mounts_for_device(&d.name),
248                    read_bytes_per_sec: read_bps,
249                    write_bytes_per_sec: write_bps,
250                    read_bytes_total: sectors_read[&d.name] * sector_size_u64,
251                    write_bytes_total: sectors_written[&d.name] * sector_size_u64,
252                }
253            })
254            .collect();
255
256        metrics.sort_by(|a, b| a.device.cmp(&b.device));
257        self.prev = Some(Snapshot {
258            instant: now,
259            sectors_read,
260            sectors_written,
261        });
262        Ok(metrics)
263    }
264}
265
266// ---------------------------------------------------------------------------
267// Unit tests
268// ---------------------------------------------------------------------------
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    // T-DSK-SECTOR: a 4K-native device (sector_size = 4096) produces byte
275    // counts 8x larger than the hard-coded 512 would give for the same
276    // sector delta.
277    #[test]
278    fn test_sector_size_4k_gives_8x_bytes() {
279        let sector_delta: u64 = 1000;
280        let sector_size_512: u32 = 512;
281        let sector_size_4096: u32 = 4096;
282
283        let bytes_512 = sector_delta * u64::from(sector_size_512);
284        let bytes_4096 = sector_delta * u64::from(sector_size_4096);
285
286        assert_eq!(
287            bytes_4096,
288            bytes_512 * 8,
289            "4K sector should produce 8x the bytes of 512-byte sector"
290        );
291    }
292
293    // Verify read_device_info falls back to 512 when hw_sector_size is absent
294    // (non-existent device path).
295    #[test]
296    fn test_sector_size_fallback_is_512() {
297        let info = read_device_info("__nonexistent_device__");
298        assert_eq!(info.sector_size, 512);
299    }
300
301    // T-DSK-01: first collect() returns Ok; all I/O rates are 0.0 (no prior snapshot).
302    #[test]
303    fn test_disk_first_collect_rates_zero() {
304        let mut collector = DiskCollector::new();
305        let metrics = collector.collect().expect("first collect() should succeed");
306        metrics.iter().for_each(|d| {
307            assert_eq!(
308                d.read_bytes_per_sec, 0.0,
309                "read_bytes_per_sec must be 0.0 on first collect for {}",
310                d.device
311            );
312            assert_eq!(
313                d.write_bytes_per_sec, 0.0,
314                "write_bytes_per_sec must be 0.0 on first collect for {}",
315                d.device
316            );
317        });
318    }
319
320    // T-DSK-02: second collect() returns Ok; all I/O rates are >= 0.0.
321    #[test]
322    fn test_disk_second_collect_rates_nonneg() {
323        let mut collector = DiskCollector::new();
324        let _ = collector.collect().expect("first collect() failed");
325        let metrics = collector.collect().expect("second collect() failed");
326        metrics.iter().for_each(|d| {
327            assert!(
328                d.read_bytes_per_sec >= 0.0,
329                "read_bytes_per_sec must be >= 0.0 for {}",
330                d.device
331            );
332            assert!(
333                d.write_bytes_per_sec >= 0.0,
334                "write_bytes_per_sec must be >= 0.0 for {}",
335                d.device
336            );
337        });
338    }
339
340    // T-DSK-03: results are sorted alphabetically by device name.
341    #[test]
342    fn test_disk_results_sorted_by_device() {
343        let mut collector = DiskCollector::new();
344        let metrics = collector.collect().expect("collect() failed");
345        let names: Vec<&str> = metrics.iter().map(|d| d.device.as_str()).collect();
346        let mut sorted = names.clone();
347        sorted.sort();
348        assert_eq!(names, sorted, "disk metrics must be sorted by device name");
349    }
350
351    // T-DSK-04: cumulative totals are non-decreasing between two calls.
352    #[test]
353    fn test_disk_totals_nondecreasing() {
354        let mut collector = DiskCollector::new();
355        let first = collector.collect().expect("first collect() failed");
356        let second = collector.collect().expect("second collect() failed");
357        let first_map: std::collections::HashMap<&str, (u64, u64)> = first
358            .iter()
359            .map(|d| (d.device.as_str(), (d.read_bytes_total, d.write_bytes_total)))
360            .collect();
361        second.iter().for_each(|d| {
362            if let Some(&(prev_r, prev_w)) = first_map.get(d.device.as_str()) {
363                assert!(
364                    d.read_bytes_total >= prev_r,
365                    "read_bytes_total decreased for {}: {} < {}",
366                    d.device,
367                    d.read_bytes_total,
368                    prev_r
369                );
370                assert!(
371                    d.write_bytes_total >= prev_w,
372                    "write_bytes_total decreased for {}: {} < {}",
373                    d.device,
374                    d.write_bytes_total,
375                    prev_w
376                );
377            }
378        });
379    }
380
381    // T-DSK-05: read_device_info for a non-existent device returns all None fields
382    // except sector_size (which falls back to 512).
383    #[test]
384    fn test_read_device_info_nonexistent_all_none() {
385        let info = read_device_info("__nonexistent__");
386        assert!(
387            info.model.is_none(),
388            "model must be None for missing device"
389        );
390        assert!(
391            info.vendor.is_none(),
392            "vendor must be None for missing device"
393        );
394        assert!(
395            info.serial.is_none(),
396            "serial must be None for missing device"
397        );
398        assert!(
399            info.device_type.is_none(),
400            "device_type must be None for missing device"
401        );
402        assert!(
403            info.capacity_bytes.is_none(),
404            "capacity_bytes must be None for missing device"
405        );
406    }
407}