resource_tracker/collector/
disk.rs1use crate::metrics::{DiskMetrics, DiskMountMetrics, DiskType};
2use std::collections::HashMap;
3use std::ffi::CString;
4use std::time::Instant;
5
6type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
7
8const SECTOR_BYTES: u64 = 512;
9
10fn sysfs_read(path: &str) -> Option<String> {
15 std::fs::read_to_string(path)
16 .ok()
17 .map(|s| s.trim().to_string())
18 .filter(|s| !s.is_empty())
19}
20
21fn block_attr(device: &str, attr: &str) -> Option<String> {
22 sysfs_read(&format!("/sys/block/{}/{}", device, attr))
23}
24
25#[derive(Clone)]
30struct DeviceInfo {
31 model: Option<String>,
32 vendor: Option<String>,
33 serial: Option<String>,
34 device_type: Option<DiskType>,
35 capacity_bytes: Option<u64>,
36 sector_size: u32,
39}
40
41fn read_device_info(device: &str) -> DeviceInfo {
42 let model = block_attr(device, "device/model");
43 let vendor = block_attr(device, "device/vendor");
44 let serial = block_attr(device, "device/serial").or_else(|| block_attr(device, "device/wwid"));
45
46 let device_type = if device.starts_with("nvme") {
47 Some(DiskType::Nvme)
48 } else {
49 match block_attr(device, "queue/rotational").as_deref() {
50 Some("0") => Some(DiskType::Ssd),
51 Some("1") => Some(DiskType::Hdd),
52 _ => None,
53 }
54 };
55
56 let capacity_bytes = block_attr(device, "size")
59 .and_then(|s| s.parse::<u64>().ok())
60 .map(|sectors| sectors * SECTOR_BYTES);
61
62 let sector_size = block_attr(device, "queue/hw_sector_size")
66 .and_then(|s| s.parse::<u32>().ok())
67 .filter(|&v| v >= 512)
68 .unwrap_or(u32::try_from(SECTOR_BYTES).unwrap_or(512));
69
70 DeviceInfo {
71 model,
72 vendor,
73 serial,
74 device_type,
75 capacity_bytes,
76 sector_size,
77 }
78}
79
80fn discover_devices() -> HashMap<String, DeviceInfo> {
83 let Ok(entries) = std::fs::read_dir("/sys/block") else {
84 return HashMap::new();
85 };
86 entries
87 .flatten()
88 .filter_map(|e| {
89 let name = e.file_name().to_string_lossy().to_string();
90 if name.starts_with("loop") || name.starts_with("ram") {
91 return None;
92 }
93 let info = read_device_info(&name);
94 Some((name, info))
95 })
96 .collect()
97}
98
99fn statvfs_space(path: &str) -> Option<(u64, u64, u64)> {
104 let cpath = CString::new(path).ok()?;
105 unsafe {
106 let mut buf: libc::statvfs = std::mem::zeroed();
107 if libc::statvfs(cpath.as_ptr(), &mut buf) != 0 {
108 return None;
109 }
110 let bs = if buf.f_frsize > 0 {
112 buf.f_frsize as u64
113 } else {
114 buf.f_bsize as u64
115 };
116 let total = buf.f_blocks * bs;
117 let avail = buf.f_bavail * bs;
118 let used = total.saturating_sub(buf.f_bfree * bs);
119 Some((total, used, avail))
120 }
121}
122
123fn mounts_for_device(device_name: &str) -> Vec<DiskMountMetrics> {
126 let content = match std::fs::read_to_string("/proc/mounts") {
127 Ok(c) => c,
128 Err(_) => return vec![],
129 };
130 let prefix = format!("/dev/{}", device_name);
131 content
132 .lines()
133 .filter(|line| line.starts_with(&prefix))
134 .filter_map(|line| {
135 let mut parts = line.split_whitespace();
136 let _source = parts.next()?;
137 let mount_point = parts.next()?.to_string();
138 let filesystem = parts.next()?.to_string();
139 let (total, used, avail) = statvfs_space(&mount_point)?;
140 let used_pct = if total > 0 {
141 used as f64 / total as f64 * 100.0
142 } else {
143 0.0
144 };
145 Some(DiskMountMetrics {
146 mount_point,
147 filesystem,
148 total_bytes: total,
149 used_bytes: used,
150 available_bytes: avail,
151 used_pct,
152 })
153 })
154 .collect()
155}
156
157struct Snapshot {
162 instant: Instant,
163 sectors_read: HashMap<String, u64>,
164 sectors_written: HashMap<String, u64>,
165}
166
167pub struct DiskCollector {
168 device_cache: HashMap<String, DeviceInfo>,
170 prev: Option<Snapshot>,
171}
172
173impl DiskCollector {
174 pub fn new() -> Self {
175 Self {
176 device_cache: discover_devices(),
177 prev: None,
178 }
179 }
180
181 pub fn collect(&mut self) -> Result<Vec<DiskMetrics>> {
182 let diskstats = procfs::diskstats()?;
183 let now = Instant::now();
184
185 let block_set: std::collections::HashSet<String> = std::fs::read_dir("/sys/block")
191 .map(|dir| {
192 dir.flatten()
193 .map(|e| e.file_name().to_string_lossy().to_string())
194 .collect()
195 })
196 .unwrap_or_default();
197
198 let devs: Vec<_> = diskstats
199 .iter()
200 .filter(|d| block_set.contains(&d.name))
201 .collect();
202
203 let sectors_read: HashMap<String, u64> = devs
204 .iter()
205 .map(|d| (d.name.clone(), d.sectors_read))
206 .collect();
207 let sectors_written: HashMap<String, u64> = devs
208 .iter()
209 .map(|d| (d.name.clone(), d.sectors_written))
210 .collect();
211
212 let mut metrics: Vec<DiskMetrics> = devs
213 .iter()
214 .map(|d| {
215 let info = self.device_cache.get(&d.name);
216
217 let sector_size: u32 = info
218 .map_or(u32::try_from(SECTOR_BYTES).unwrap_or(512), |i| {
219 i.sector_size
220 });
221 let sector_size_f64 = f64::from(sector_size);
222 let sector_size_u64 = u64::from(sector_size);
223
224 let (read_bps, write_bps) = match &self.prev {
225 None => (0.0, 0.0),
226 Some(prev) => {
227 let secs = (now - prev.instant).as_secs_f64().max(0.001);
228 let sr = sectors_read[&d.name];
229 let sw = sectors_written[&d.name];
230 let psr = prev.sectors_read.get(&d.name).copied().unwrap_or(sr);
231 let psw = prev.sectors_written.get(&d.name).copied().unwrap_or(sw);
232 (
234 sr.saturating_sub(psr) as f64 * sector_size_f64 / secs,
235 sw.saturating_sub(psw) as f64 * sector_size_f64 / secs,
236 )
237 }
238 };
239
240 DiskMetrics {
241 device: d.name.clone(),
242 model: info.and_then(|i| i.model.clone()),
243 vendor: info.and_then(|i| i.vendor.clone()),
244 serial: info.and_then(|i| i.serial.clone()),
245 device_type: info.and_then(|i| i.device_type.clone()),
246 capacity_bytes: info.and_then(|i| i.capacity_bytes),
247 mounts: mounts_for_device(&d.name),
248 read_bytes_per_sec: read_bps,
249 write_bytes_per_sec: write_bps,
250 read_bytes_total: sectors_read[&d.name] * sector_size_u64,
251 write_bytes_total: sectors_written[&d.name] * sector_size_u64,
252 }
253 })
254 .collect();
255
256 metrics.sort_by(|a, b| a.device.cmp(&b.device));
257 self.prev = Some(Snapshot {
258 instant: now,
259 sectors_read,
260 sectors_written,
261 });
262 Ok(metrics)
263 }
264}
265
266#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
278 fn test_sector_size_4k_gives_8x_bytes() {
279 let sector_delta: u64 = 1000;
280 let sector_size_512: u32 = 512;
281 let sector_size_4096: u32 = 4096;
282
283 let bytes_512 = sector_delta * u64::from(sector_size_512);
284 let bytes_4096 = sector_delta * u64::from(sector_size_4096);
285
286 assert_eq!(
287 bytes_4096,
288 bytes_512 * 8,
289 "4K sector should produce 8x the bytes of 512-byte sector"
290 );
291 }
292
293 #[test]
296 fn test_sector_size_fallback_is_512() {
297 let info = read_device_info("__nonexistent_device__");
298 assert_eq!(info.sector_size, 512);
299 }
300
301 #[test]
303 fn test_disk_first_collect_rates_zero() {
304 let mut collector = DiskCollector::new();
305 let metrics = collector.collect().expect("first collect() should succeed");
306 metrics.iter().for_each(|d| {
307 assert_eq!(
308 d.read_bytes_per_sec, 0.0,
309 "read_bytes_per_sec must be 0.0 on first collect for {}",
310 d.device
311 );
312 assert_eq!(
313 d.write_bytes_per_sec, 0.0,
314 "write_bytes_per_sec must be 0.0 on first collect for {}",
315 d.device
316 );
317 });
318 }
319
320 #[test]
322 fn test_disk_second_collect_rates_nonneg() {
323 let mut collector = DiskCollector::new();
324 let _ = collector.collect().expect("first collect() failed");
325 let metrics = collector.collect().expect("second collect() failed");
326 metrics.iter().for_each(|d| {
327 assert!(
328 d.read_bytes_per_sec >= 0.0,
329 "read_bytes_per_sec must be >= 0.0 for {}",
330 d.device
331 );
332 assert!(
333 d.write_bytes_per_sec >= 0.0,
334 "write_bytes_per_sec must be >= 0.0 for {}",
335 d.device
336 );
337 });
338 }
339
340 #[test]
342 fn test_disk_results_sorted_by_device() {
343 let mut collector = DiskCollector::new();
344 let metrics = collector.collect().expect("collect() failed");
345 let names: Vec<&str> = metrics.iter().map(|d| d.device.as_str()).collect();
346 let mut sorted = names.clone();
347 sorted.sort();
348 assert_eq!(names, sorted, "disk metrics must be sorted by device name");
349 }
350
351 #[test]
353 fn test_disk_totals_nondecreasing() {
354 let mut collector = DiskCollector::new();
355 let first = collector.collect().expect("first collect() failed");
356 let second = collector.collect().expect("second collect() failed");
357 let first_map: std::collections::HashMap<&str, (u64, u64)> = first
358 .iter()
359 .map(|d| (d.device.as_str(), (d.read_bytes_total, d.write_bytes_total)))
360 .collect();
361 second.iter().for_each(|d| {
362 if let Some(&(prev_r, prev_w)) = first_map.get(d.device.as_str()) {
363 assert!(
364 d.read_bytes_total >= prev_r,
365 "read_bytes_total decreased for {}: {} < {}",
366 d.device,
367 d.read_bytes_total,
368 prev_r
369 );
370 assert!(
371 d.write_bytes_total >= prev_w,
372 "write_bytes_total decreased for {}: {} < {}",
373 d.device,
374 d.write_bytes_total,
375 prev_w
376 );
377 }
378 });
379 }
380
381 #[test]
384 fn test_read_device_info_nonexistent_all_none() {
385 let info = read_device_info("__nonexistent__");
386 assert!(
387 info.model.is_none(),
388 "model must be None for missing device"
389 );
390 assert!(
391 info.vendor.is_none(),
392 "vendor must be None for missing device"
393 );
394 assert!(
395 info.serial.is_none(),
396 "serial must be None for missing device"
397 );
398 assert!(
399 info.device_type.is_none(),
400 "device_type must be None for missing device"
401 );
402 assert!(
403 info.capacity_bytes.is_none(),
404 "capacity_bytes must be None for missing device"
405 );
406 }
407}