1use crate::metrics::{DiskMetrics, DiskMountMetrics, DiskType};
2use std::collections::HashMap;
3use std::ffi::CString;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Instant;
6
7type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
8
9const SECTOR_BYTES: u64 = 512;
10
11fn sysfs_read(path: &str) -> Option<String> {
16 std::fs::read_to_string(path)
17 .ok()
18 .map(|s| s.trim().to_string())
19 .filter(|s| !s.is_empty())
20}
21
22fn block_attr(device: &str, attr: &str) -> Option<String> {
23 sysfs_read(&format!("/sys/block/{}/{}", device, attr))
24}
25
26#[derive(Clone)]
31struct DeviceInfo {
32 model: Option<String>,
33 vendor: Option<String>,
34 serial: Option<String>,
35 device_type: Option<DiskType>,
36 capacity_bytes: Option<u64>,
37 sector_size: u32,
40}
41
42fn read_device_info(device: &str) -> DeviceInfo {
43 let model = block_attr(device, "device/model");
44 let vendor = block_attr(device, "device/vendor");
45 let serial = block_attr(device, "device/serial").or_else(|| block_attr(device, "device/wwid"));
46
47 let device_type = if device.starts_with("nvme") {
48 Some(DiskType::Nvme)
49 } else {
50 match block_attr(device, "queue/rotational").as_deref() {
51 Some("0") => Some(DiskType::Ssd),
52 Some("1") => Some(DiskType::Hdd),
53 _ => None,
54 }
55 };
56
57 let capacity_bytes = block_attr(device, "size")
60 .and_then(|s| s.parse::<u64>().ok())
61 .filter(|§ors| sectors > 0)
62 .map(|sectors| sectors * SECTOR_BYTES);
63
64 let sector_size = block_attr(device, "queue/hw_sector_size")
68 .and_then(|s| s.parse::<u32>().ok())
69 .filter(|&v| v >= 512)
70 .unwrap_or(u32::try_from(SECTOR_BYTES).unwrap_or(512));
71
72 DeviceInfo {
73 model,
74 vendor,
75 serial,
76 device_type,
77 capacity_bytes,
78 sector_size,
79 }
80}
81
82fn discover_devices() -> HashMap<String, DeviceInfo> {
85 let Ok(entries) = std::fs::read_dir("/sys/block") else {
86 return HashMap::new();
87 };
88 entries
89 .flatten()
90 .filter_map(|e| {
91 let name = e.file_name().to_string_lossy().to_string();
92 if name.starts_with("loop") || name.starts_with("ram") {
93 return None;
94 }
95 let info = read_device_info(&name);
96 Some((name, info))
97 })
98 .collect()
99}
100
101fn device_source_prefixes(device_name: &str) -> Vec<String> {
122 let primary = format!("/dev/{}", device_name);
123 let mut prefixes = vec![primary.clone()];
124
125 if device_name.starts_with("dm-") {
126 if let Some(name) = sysfs_read(&format!("/sys/block/{}/dm/name", device_name)) {
127 prefixes.push(format!("/dev/mapper/{}", name));
128 }
129 }
130
131 if dev_root_is_on_device(device_name, &primary) {
132 prefixes.push("/dev/root".to_string());
133 }
134
135 prefixes
136}
137
138fn dev_root_is_on_device(device_name: &str, primary: &str) -> bool {
154 if let Ok(target) = std::fs::read_link("/dev/root") {
156 let t = target.to_string_lossy();
157 let resolved = if t.starts_with('/') {
158 t.to_string()
159 } else {
160 format!("/dev/{}", t)
161 };
162 return resolved.starts_with(primary);
163 }
164
165 let mountinfo = std::fs::read_to_string("/proc/self/mountinfo").unwrap_or_default();
167 let Some(root_devnum) = mountinfo.lines().find_map(|line| {
168 let mut f = line.splitn(6, ' ');
171 f.next()?; f.next()?; let devnum = f.next()?.to_string(); f.next()?; let mpt = f.next()?; if mpt != "/" {
177 return None;
178 }
179 let sep = line.find(" - ")?;
181 let source = line[sep + 3..].split_whitespace().nth(1)?;
182 if source == "/dev/root" {
183 Some(devnum)
184 } else {
185 None
186 }
187 }) else {
188 return false;
189 };
190
191 let sys_base = format!("/sys/block/{}", device_name);
193 let Ok(entries) = std::fs::read_dir(&sys_base) else {
194 return false;
195 };
196 entries.flatten().any(|e| {
197 let pname = e.file_name().to_string_lossy().to_string();
198 pname.starts_with(device_name)
199 && sysfs_read(&format!("{}/{}/dev", sys_base, pname))
200 .map_or(false, |dev| dev == root_devnum)
201 })
202}
203
204fn statvfs_space(path: &str) -> Option<(u64, u64, u64)> {
205 let cpath = CString::new(path).ok()?;
206 unsafe {
207 let mut buf: libc::statvfs = std::mem::zeroed();
208 if libc::statvfs(cpath.as_ptr(), &mut buf) != 0 {
209 return None;
210 }
211 let bs = if buf.f_frsize > 0 {
213 buf.f_frsize as u64
214 } else {
215 buf.f_bsize as u64
216 };
217 let total = buf.f_blocks * bs;
218 let avail = buf.f_bavail * bs;
219 let used = total.saturating_sub(buf.f_bfree * bs);
220 Some((total, used, avail))
221 }
222}
223
224fn mounts_for_device(device_name: &str) -> Vec<DiskMountMetrics> {
237 let content = match std::fs::read_to_string("/proc/mounts") {
238 Ok(c) => c,
239 Err(_) => return vec![],
240 };
241 let prefixes = device_source_prefixes(device_name);
242 let mut seen_sources: std::collections::HashSet<String> = std::collections::HashSet::new();
243 let mut result = Vec::new();
244
245 for line in content.lines() {
246 if !prefixes.iter().any(|p| line.starts_with(p.as_str())) {
247 continue;
248 }
249 let mut parts = line.split_whitespace();
250 let (Some(source), Some(mount_point), Some(filesystem)) =
251 (parts.next(), parts.next(), parts.next())
252 else {
253 continue;
254 };
255
256 if mount_point.starts_with("/proc")
258 || mount_point.starts_with("/sys")
259 || mount_point.starts_with("/dev")
260 || mount_point.starts_with("/run")
261 {
262 continue;
263 }
264
265 if !seen_sources.insert(source.to_string()) {
268 continue;
269 }
270
271 let Some((total, used, avail)) = statvfs_space(mount_point) else {
272 continue;
273 };
274
275 if total == 0 {
277 continue;
278 }
279
280 let used_pct = used as f64 / total as f64 * 100.0;
281 result.push(DiskMountMetrics {
282 mount_point: mount_point.to_string(),
283 filesystem: filesystem.to_string(),
284 total_bytes: total,
285 used_bytes: used,
286 available_bytes: avail,
287 used_pct,
288 });
289 }
290 result
291}
292
293fn collect_zfs_spaces(timeout: std::time::Duration) -> Vec<(String, DiskMountMetrics)> {
309 if !std::path::Path::new("/proc/spl/kstat/zfs").exists() {
313 return vec![];
314 }
315
316 let (tx, rx) = std::sync::mpsc::channel();
319 static ZFS_HELPER_WARNED: AtomicBool = AtomicBool::new(false);
320 if ZFS_HELPER_WARNED.load(Ordering::Relaxed) {
321 return vec![];
322 }
323 if crate::thread_util::spawn_named("zpool-list", move || {
324 let _ = tx.send(
325 std::process::Command::new("zpool")
326 .args(["list", "-Hp", "-o", "name,size,allocated,free"])
327 .output(),
328 );
329 })
330 .is_none()
331 {
332 if !ZFS_HELPER_WARNED.swap(true, Ordering::Relaxed) {
333 eprintln!("warn: skipping zfs pool stats: cannot spawn helper thread");
334 }
335 return vec![];
336 }
337 let out = match rx.recv_timeout(timeout) {
338 Ok(Ok(o)) if o.status.success() => o,
339 _ => return vec![],
340 };
341 let stdout = match std::str::from_utf8(&out.stdout) {
342 Ok(s) => s,
343 Err(_) => return vec![],
344 };
345
346 let mount_map = zfs_pool_mount_map();
347 let mut result = Vec::new();
348
349 for line in stdout.lines() {
350 let mut parts = line.split('\t');
351 let (Some(name), Some(size), Some(allocated), Some(free)) =
352 (parts.next(), parts.next(), parts.next(), parts.next())
353 else {
354 continue;
355 };
356 let total: u64 = match size.parse() {
357 Ok(v) => v,
358 Err(_) => continue,
359 };
360 if total == 0 {
361 continue;
362 }
363 let used: u64 = allocated.parse().unwrap_or(0);
364 let avail: u64 = free.parse().unwrap_or(0);
365 let mount_point = mount_map.get(name).cloned().unwrap_or_default();
366 result.push((
367 name.to_string(),
368 DiskMountMetrics {
369 mount_point,
370 filesystem: "zfs".to_string(),
371 total_bytes: total,
372 used_bytes: used,
373 available_bytes: avail,
374 used_pct: used as f64 / total as f64 * 100.0,
375 },
376 ));
377 }
378 result
379}
380
381fn zfs_pool_mount_map() -> HashMap<String, String> {
385 let content = std::fs::read_to_string("/proc/mounts").unwrap_or_default();
386 let mut map: HashMap<String, (usize, String)> = HashMap::new();
387 for line in content.lines() {
388 let mut parts = line.split_whitespace();
389 let (Some(source), Some(mount_point), Some(fs_type)) =
390 (parts.next(), parts.next(), parts.next())
391 else {
392 continue;
393 };
394 if fs_type != "zfs" {
395 continue;
396 }
397 let pool_name = source.split('/').next().unwrap_or(source).to_string();
398 let depth = source.split('/').count();
399 let entry = map.entry(pool_name).or_insert((usize::MAX, String::new()));
400 if depth < entry.0 {
401 *entry = (depth, mount_point.to_string());
402 }
403 }
404 map.into_iter().map(|(k, (_, v))| (k, v)).collect()
405}
406
407struct Snapshot {
412 instant: Instant,
413 sectors_read: HashMap<String, u64>,
414 sectors_written: HashMap<String, u64>,
415}
416
417pub struct DiskCollector {
418 device_cache: HashMap<String, DeviceInfo>,
420 prev: Option<Snapshot>,
421 zfs_timeout: std::time::Duration,
425}
426
427impl DiskCollector {
428 pub fn new(interval: std::time::Duration) -> Self {
429 Self {
430 device_cache: discover_devices(),
431 prev: None,
432 zfs_timeout: interval / 2,
433 }
434 }
435
436 pub fn collect(&mut self) -> Result<Vec<DiskMetrics>> {
437 let diskstats = procfs::diskstats()?;
438 let now = Instant::now();
439
440 let block_set: std::collections::HashSet<String> = std::fs::read_dir("/sys/block")
445 .map(|dir| {
446 dir.flatten()
447 .filter_map(|e| {
448 let name = e.file_name().to_string_lossy().to_string();
449 if name.starts_with("loop") || name.starts_with("ram") {
450 None
451 } else {
452 Some(name)
453 }
454 })
455 .collect()
456 })
457 .unwrap_or_default();
458
459 let devs: Vec<_> = diskstats
460 .iter()
461 .filter(|d| block_set.contains(&d.name))
462 .collect();
463
464 let sectors_read: HashMap<String, u64> = devs
465 .iter()
466 .map(|d| (d.name.clone(), d.sectors_read))
467 .collect();
468 let sectors_written: HashMap<String, u64> = devs
469 .iter()
470 .map(|d| (d.name.clone(), d.sectors_written))
471 .collect();
472
473 let mut metrics: Vec<DiskMetrics> = devs
474 .iter()
475 .map(|d| {
476 let info = self.device_cache.get(&d.name);
477
478 let sector_size: u32 = info
479 .map_or(u32::try_from(SECTOR_BYTES).unwrap_or(512), |i| {
480 i.sector_size
481 });
482 let sector_size_f64 = f64::from(sector_size);
483 let sector_size_u64 = u64::from(sector_size);
484
485 let (read_bps, write_bps) = match &self.prev {
486 None => (0.0, 0.0),
487 Some(prev) => {
488 let secs = (now - prev.instant).as_secs_f64().max(0.001);
489 let sr = sectors_read[&d.name];
490 let sw = sectors_written[&d.name];
491 let psr = prev.sectors_read.get(&d.name).copied().unwrap_or(sr);
492 let psw = prev.sectors_written.get(&d.name).copied().unwrap_or(sw);
493 (
495 sr.saturating_sub(psr) as f64 * sector_size_f64 / secs,
496 sw.saturating_sub(psw) as f64 * sector_size_f64 / secs,
497 )
498 }
499 };
500
501 DiskMetrics {
502 device: d.name.clone(),
503 model: info.and_then(|i| i.model.clone()),
504 vendor: info.and_then(|i| i.vendor.clone()),
505 serial: info.and_then(|i| i.serial.clone()),
506 device_type: info.and_then(|i| i.device_type.clone()),
507 capacity_bytes: info.and_then(|i| i.capacity_bytes),
508 mounts: mounts_for_device(&d.name),
509 read_bytes_per_sec: read_bps,
510 write_bytes_per_sec: write_bps,
511 read_bytes_total: sectors_read[&d.name] * sector_size_u64,
512 write_bytes_total: sectors_written[&d.name] * sector_size_u64,
513 }
514 })
515 .collect();
516
517 for (pool_name, mount) in collect_zfs_spaces(self.zfs_timeout) {
521 let total = mount.total_bytes;
522 metrics.push(DiskMetrics {
523 device: format!("zfs:{}", pool_name),
524 model: None,
525 vendor: None,
526 serial: None,
527 device_type: None,
528 capacity_bytes: Some(total),
529 mounts: vec![mount],
530 read_bytes_per_sec: 0.0,
535 write_bytes_per_sec: 0.0,
536 read_bytes_total: 0,
537 write_bytes_total: 0,
538 });
539 }
540
541 metrics.sort_by(|a, b| a.device.cmp(&b.device));
542 self.prev = Some(Snapshot {
543 instant: now,
544 sectors_read,
545 sectors_written,
546 });
547 Ok(metrics)
548 }
549}
550
551#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
563 fn test_sector_size_4k_gives_8x_bytes() {
564 let sector_delta: u64 = 1000;
565 let sector_size_512: u32 = 512;
566 let sector_size_4096: u32 = 4096;
567
568 let bytes_512 = sector_delta * u64::from(sector_size_512);
569 let bytes_4096 = sector_delta * u64::from(sector_size_4096);
570
571 assert_eq!(
572 bytes_4096,
573 bytes_512 * 8,
574 "4K sector should produce 8x the bytes of 512-byte sector"
575 );
576 }
577
578 #[test]
581 fn test_sector_size_fallback_is_512() {
582 let info = read_device_info("__nonexistent_device__");
583 assert_eq!(info.sector_size, 512);
584 }
585
586 #[test]
588 fn test_disk_first_collect_rates_zero() {
589 let mut collector = DiskCollector::new(std::time::Duration::from_secs(1));
590 let metrics = collector.collect().expect("first collect() should succeed");
591 metrics.iter().for_each(|d| {
592 assert_eq!(
593 d.read_bytes_per_sec, 0.0,
594 "read_bytes_per_sec must be 0.0 on first collect for {}",
595 d.device
596 );
597 assert_eq!(
598 d.write_bytes_per_sec, 0.0,
599 "write_bytes_per_sec must be 0.0 on first collect for {}",
600 d.device
601 );
602 });
603 }
604
605 #[test]
607 fn test_disk_second_collect_rates_nonneg() {
608 let mut collector = DiskCollector::new(std::time::Duration::from_secs(1));
609 let _ = collector.collect().expect("first collect() failed");
610 let metrics = collector.collect().expect("second collect() failed");
611 metrics.iter().for_each(|d| {
612 assert!(
613 d.read_bytes_per_sec >= 0.0,
614 "read_bytes_per_sec must be >= 0.0 for {}",
615 d.device
616 );
617 assert!(
618 d.write_bytes_per_sec >= 0.0,
619 "write_bytes_per_sec must be >= 0.0 for {}",
620 d.device
621 );
622 });
623 }
624
625 #[test]
627 fn test_disk_results_sorted_by_device() {
628 let mut collector = DiskCollector::new(std::time::Duration::from_secs(1));
629 let metrics = collector.collect().expect("collect() failed");
630 let names: Vec<&str> = metrics.iter().map(|d| d.device.as_str()).collect();
631 let mut sorted = names.clone();
632 sorted.sort();
633 assert_eq!(names, sorted, "disk metrics must be sorted by device name");
634 }
635
636 #[test]
638 fn test_disk_totals_nondecreasing() {
639 let mut collector = DiskCollector::new(std::time::Duration::from_secs(1));
640 let first = collector.collect().expect("first collect() failed");
641 let second = collector.collect().expect("second collect() failed");
642 let first_map: std::collections::HashMap<&str, (u64, u64)> = first
643 .iter()
644 .map(|d| (d.device.as_str(), (d.read_bytes_total, d.write_bytes_total)))
645 .collect();
646 second.iter().for_each(|d| {
647 if let Some(&(prev_r, prev_w)) = first_map.get(d.device.as_str()) {
648 assert!(
649 d.read_bytes_total >= prev_r,
650 "read_bytes_total decreased for {}: {} < {}",
651 d.device,
652 d.read_bytes_total,
653 prev_r
654 );
655 assert!(
656 d.write_bytes_total >= prev_w,
657 "write_bytes_total decreased for {}: {} < {}",
658 d.device,
659 d.write_bytes_total,
660 prev_w
661 );
662 }
663 });
664 }
665
666 #[test]
669 fn test_read_device_info_nonexistent_all_none() {
670 let info = read_device_info("__nonexistent__");
671 assert!(
672 info.model.is_none(),
673 "model must be None for missing device"
674 );
675 assert!(
676 info.vendor.is_none(),
677 "vendor must be None for missing device"
678 );
679 assert!(
680 info.serial.is_none(),
681 "serial must be None for missing device"
682 );
683 assert!(
684 info.device_type.is_none(),
685 "device_type must be None for missing device"
686 );
687 assert!(
688 info.capacity_bytes.is_none(),
689 "capacity_bytes must be None for missing device"
690 );
691 }
692}