Skip to main content

resource_tracker/
main.rs

1#![doc = include_str!("../README.md")]
2
3#[cfg(not(target_os = "linux"))]
4compile_error!(
5    "resource-tracker only supports Linux; /proc and cgroup interfaces are Linux-specific."
6);
7
8mod collector;
9mod config;
10mod metrics;
11mod output;
12mod sentinel;
13mod thread_util;
14
15extern crate libc;
16
17use collector::{
18    CpuCollector, DiskCollector, GpuCollector, MemoryCollector, NetworkCollector,
19    collect_host_info, spawn_cloud_discovery,
20};
21use config::{Config, OutputFormat};
22use metrics::CloudInfo;
23use metrics::Sample;
24use sentinel::{BatchUploader, RunContext, SentinelClient, close_run, samples_to_csv, start_run};
25use std::io::Write;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30// ---------------------------------------------------------------------------
31// SIGTERM handler
32// ---------------------------------------------------------------------------
33
34static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
35
36extern "C" fn handle_sigterm(_: libc::c_int) {
37    SIGTERM_RECEIVED.store(true, Ordering::Relaxed);
38}
39
40// ---------------------------------------------------------------------------
41// Graceful shutdown
42// ---------------------------------------------------------------------------
43
44/// Flush remaining samples, close the Sentinel run, then exit.
45///
46/// Called on both shell-wrapper child exit and SIGTERM.  Replaces the former
47/// bare `std::process::exit()` calls so the upload thread always gets a chance
48/// to flush.
49fn shutdown(
50    exit_code: i32,
51    sentinel: Option<&SentinelClient>,
52    run_ctx: Option<Arc<Mutex<RunContext>>>,
53    shutdown_flag: Option<Arc<AtomicBool>>,
54    upload_handle: Option<std::thread::JoinHandle<Vec<String>>>,
55    remaining: Vec<Sample>,
56    interval_secs: u64,
57) -> ! {
58    if let (Some(client), Some(ctx_arc), Some(flag), Some(handle)) =
59        (sentinel, run_ctx, shutdown_flag, upload_handle)
60    {
61        // Signal the upload thread to flush its buffer to S3, then wait for it.
62        // The thread performs one final S3 upload of any remaining buffered samples
63        // before it exits, and returns the list of all successfully uploaded URIs.
64        flag.store(true, Ordering::Relaxed);
65        let uploaded_uris = handle.join().unwrap_or_default();
66
67        // Route selection:
68        //   S3 route   -- at least one batch was uploaded; uploaded_uris is non-empty.
69        //                 The final flush is already included in uploaded_uris.
70        //   Inline route -- no S3 uploads (short run or all S3 failures); send all
71        //                   collected samples as a raw CSV string.
72        let remaining_csv = if uploaded_uris.is_empty() && !remaining.is_empty() {
73            Some(samples_to_csv(&remaining, interval_secs))
74        } else {
75            None
76        };
77
78        let ctx = ctx_arc.lock().unwrap_or_else(|e| e.into_inner());
79        if let Err(e) = close_run(
80            &client.agent,
81            &client.api_base,
82            &client.token,
83            &ctx,
84            Some(exit_code),
85            remaining_csv,
86            &uploaded_uris,
87        ) {
88            eprintln!("warn: sentinel close_run failed: {e}");
89        }
90    }
91
92    std::process::exit(exit_code);
93}
94
95// ---------------------------------------------------------------------------
96// main
97// ---------------------------------------------------------------------------
98
99fn main() {
100    // Install SIGTERM and SIGINT handlers so the binary can flush before exiting.
101    // Both signals set the same flag and trigger the same graceful shutdown path.
102    unsafe {
103        libc::signal(
104            libc::SIGTERM,
105            handle_sigterm as *const () as libc::sighandler_t,
106        );
107        libc::signal(
108            libc::SIGINT,
109            handle_sigterm as *const () as libc::sighandler_t,
110        );
111    }
112
113    let mut config = Config::load();
114
115    // -----------------------------------------------------------------------
116    // Output sink: stdout (default), file (--output), or suppressed (--quiet).
117    // Warnings and errors always go to stderr via eprintln! regardless.
118    // -----------------------------------------------------------------------
119    let mut out_file: Option<std::io::BufWriter<std::fs::File>> = if config.quiet {
120        None
121    } else {
122        config.output_file.as_deref().map(|path| {
123            std::io::BufWriter::new(std::fs::File::create(path).unwrap_or_else(|e| {
124                eprintln!("error: cannot open output file {path}: {e}");
125                std::process::exit(1);
126            }))
127        })
128    };
129
130    // Emit one line of metric output to the selected sink.
131    // quiet=true  -> no-op
132    // output_file -> write to file and flush (so `tail -f` works)
133    // default     -> eprintln! to stderr (keeps stdout clean for the tracked app)
134    macro_rules! emit {
135        ($($arg:tt)*) => {
136            if !config.quiet {
137                if let Some(ref mut f) = out_file {
138                    let _ = writeln!(f, $($arg)*);
139                    let _ = f.flush();
140                } else {
141                    eprintln!($($arg)*);
142                }
143            }
144        }
145    }
146
147    let interval = Duration::from_secs(config.interval_secs);
148
149    // Shell-wrapper child is spawned after warm-up so cloud IMDS probes (ureq may
150    // use helper threads) do not race with fork-heavy stressors under PID limits.
151    let mut child: Option<std::process::Child> = None;
152
153    let mut cpu = CpuCollector::new(config.pid);
154    let memory = MemoryCollector::new();
155    let mut network = NetworkCollector::new();
156    let mut disk = DiskCollector::new(interval);
157    let mut gpu = GpuCollector::new();
158
159    // Collect static GPU info now so host discovery can derive GPU host fields.
160    let initial_gpus = gpu.collect().unwrap_or_default();
161
162    // Host discovery: fast, local, no I/O.
163    let host_info = collect_host_info(&initial_gpus);
164
165    // Warm-up: prime delta state in stateful collectors while cloud probes run
166    // in the background. spawn_cloud_discovery returns a channel Receiver so
167    // the caller never blocks on probe completion -- try_recv() picks up the
168    // result if probes finished during the sleep, or leaves cloud_info as None
169    // to be resolved later (per-tick poll in the main loop, or recv_timeout
170    // before start_run for Sentinel runs).
171    let cloud_rx = spawn_cloud_discovery();
172    let _ = cpu.collect();
173    let _ = network.collect();
174    let _ = disk.collect();
175    std::thread::sleep(interval);
176    // Non-blocking: on most non-cloud machines all probes fail fast
177    // (EHOSTUNREACH); on cloud machines the matching probe returns in < 100 ms.
178    // Either way the result is typically waiting by the time we reach here.
179    let mut cloud_info: Option<CloudInfo> = cloud_rx.as_ref().and_then(|rx| rx.try_recv().ok());
180
181    // -----------------------------------------------------------------------
182    // Shell-wrapper mode: spawn the tracked command after warm-up / cloud probe.
183    // -----------------------------------------------------------------------
184    if !config.command.is_empty() {
185        let (program, args) = config.command.split_first().expect("command is non-empty");
186        match std::process::Command::new(program).args(args).spawn() {
187            Ok(c) => {
188                config.pid = Some(i32::try_from(c.id()).unwrap_or(i32::MAX));
189                cpu.set_tracked_pid(config.pid);
190                child = Some(c);
191            }
192            Err(e) => {
193                eprintln!("error: failed to spawn {:?}: {e}", program);
194                std::process::exit(1);
195            }
196        }
197    }
198
199    // -----------------------------------------------------------------------
200    // Sentinel API setup (gated on SENTINEL_API_TOKEN being set).
201    // -----------------------------------------------------------------------
202    let sentinel = SentinelClient::from_env();
203
204    let (run_ctx_arc, sample_buffer, upload_shutdown_flag, upload_handle) = match &sentinel {
205        None => (None, None, None, None),
206        Some(client) => {
207            // Bounded wait: give cloud discovery a chance to complete before
208            // start_run so the run record carries cloud metadata. IMDS probes
209            // run in parallel and finish within IMDS_TIMEOUT (1 s); 3 s is a
210            // generous ceiling for unusual network paths. Pure metric runs
211            // (no Sentinel token) skip this entirely.
212            if cloud_info.is_none() {
213                if let Some(ref rx) = cloud_rx {
214                    cloud_info = rx.recv_timeout(Duration::from_secs(3)).ok();
215                }
216            }
217            let default_cloud = CloudInfo::default();
218            match start_run(
219                &client.agent,
220                &client.api_base,
221                &client.token,
222                &config.metadata,
223                config.pid,
224                &host_info,
225                cloud_info.as_ref().unwrap_or(&default_cloud),
226            ) {
227                Err(e) => {
228                    eprintln!("warn: sentinel start_run failed: {e}; streaming disabled");
229                    (None, None, None, None)
230                }
231                Ok(ctx) => {
232                    let ctx_arc = Arc::new(Mutex::new(ctx));
233                    let upload_interval = std::env::var("TRACKER_UPLOAD_INTERVAL")
234                        .ok()
235                        .and_then(|v| v.parse().ok())
236                        .unwrap_or(60u64);
237                    let (uploader, buf) = BatchUploader::new(upload_interval, config.interval_secs);
238                    let flag = uploader.shutdown_flag();
239                    let upload_handle = uploader.spawn(
240                        Arc::clone(&ctx_arc),
241                        SentinelClient::new_upload_agent(),
242                        client.api_base.clone(),
243                        client.token.clone(),
244                    );
245                    if upload_handle.is_none() {
246                        eprintln!(
247                            "warn: sentinel background upload disabled; samples will be flushed inline on exit"
248                        );
249                    }
250                    (Some(ctx_arc), Some(buf), Some(flag), upload_handle)
251                }
252            }
253        }
254    };
255
256    // Emit CSV header once before the loop.
257    if config.format == OutputFormat::Csv {
258        emit!("{}", output::csv::csv_header());
259    }
260
261    // Samples collected since the last S3 batch upload (for local fallback).
262    let mut unflushed: Vec<Sample> = Vec::new();
263
264    // Tracks the Instant at the start of each loop iteration so we can
265    // compute the actual elapsed interval between samples and sleep only
266    // for the remainder of the nominal interval (deadline-based scheduling).
267    let mut prev_loop_start: Option<Instant> = None;
268
269    // -----------------------------------------------------------------------
270    // Main sampling loop
271    // -----------------------------------------------------------------------
272    loop {
273        // Poll for cloud discovery result if not yet received. Typically a
274        // no-op because probes complete within IMDS_TIMEOUT and the warm-up
275        // sleep covers that window. Ensures the channel is drained and
276        // cloud_info is populated for any future use.
277        if cloud_info.is_none()
278            && let Some(ref rx) = cloud_rx
279            && let Ok(info) = rx.try_recv()
280        {
281            cloud_info = Some(info);
282        }
283
284        let loop_start = Instant::now();
285
286        // Actual elapsed since the previous iteration started.  None on the
287        // first real sample (no prior loop start to compare against).
288        let actual_interval_ms: Option<u64> = prev_loop_start
289            .map(|p| u64::try_from((loop_start - p).as_millis()).unwrap_or(u64::MAX));
290
291        let timestamp_secs = SystemTime::now()
292            .duration_since(UNIX_EPOCH)
293            .unwrap_or_default()
294            .as_secs();
295
296        let mut sample = Sample {
297            timestamp_secs,
298            actual_interval_ms,
299            job_name: config.metadata.job_name.clone(),
300            tracked_pid: config.pid,
301            cpu: cpu.collect().unwrap_or_default(),
302            memory: memory.collect().unwrap_or_default(),
303            network: network.collect().unwrap_or_default(),
304            disk: disk.collect().unwrap_or_default(),
305            gpu: gpu.collect().unwrap_or_default(),
306        };
307
308        // Augment with per-process GPU stats.
309        // With --pid: filter to the tracked process tree.
310        // Without --pid: report system-wide GPU allocation (all processes).
311        let (vram_mib, gpu_usage, gpu_utilized) =
312            if config.pid.is_some() && !sample.cpu.process_tree_pids.is_empty() {
313                let pids_u32: Vec<u32> = sample
314                    .cpu
315                    .process_tree_pids
316                    .iter()
317                    .filter_map(|&p| u32::try_from(p).ok())
318                    .collect();
319                gpu.process_gpu_info(&pids_u32, interval)
320            } else {
321                gpu.all_gpu_process_info(interval)
322            };
323        sample.cpu.process_gpu_vram_mib = vram_mib;
324        sample.cpu.process_gpu_usage = gpu_usage;
325        sample.cpu.process_gpu_utilized = gpu_utilized;
326
327        // Emit to selected output sink.
328        match config.format {
329            OutputFormat::Json => match serde_json::to_value(&sample) {
330                Ok(mut v) => {
331                    v[format!("{}-version", env!("CARGO_PKG_NAME"))] =
332                        serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string());
333                    emit!("{}", v);
334                }
335                Err(e) => eprintln!("warn: json serialize error: {e}"),
336            },
337            OutputFormat::Csv => {
338                emit!(
339                    "{}",
340                    output::csv::sample_to_csv_row(&sample, config.interval_secs)
341                );
342            }
343        }
344
345        // Push to sentinel buffer (if streaming is active).
346        if let Some(ref buf) = sample_buffer {
347            buf.lock()
348                .unwrap_or_else(|e| e.into_inner())
349                .push(sample.clone());
350        }
351        unflushed.push(sample);
352
353        // -----------------------------------------------------------------------
354        // Shell-wrapper exit check
355        // -----------------------------------------------------------------------
356        if let Some(ref mut c) = child {
357            match c.try_wait() {
358                Ok(Some(status)) => {
359                    let code = status.code().unwrap_or(1);
360                    shutdown(
361                        code,
362                        sentinel.as_ref(),
363                        run_ctx_arc,
364                        upload_shutdown_flag,
365                        upload_handle,
366                        unflushed,
367                        config.interval_secs,
368                    );
369                }
370                Ok(None) => {}
371                Err(e) => eprintln!("warn: error checking child status: {e}"),
372            }
373        }
374
375        // SIGTERM received: flush and exit cleanly.
376        if SIGTERM_RECEIVED.load(Ordering::Relaxed) {
377            shutdown(
378                0,
379                sentinel.as_ref(),
380                run_ctx_arc,
381                upload_shutdown_flag,
382                upload_handle,
383                unflushed,
384                config.interval_secs,
385            );
386        }
387
388        prev_loop_start = Some(loop_start);
389
390        // Deadline-based sleep: sleep only for the time remaining in the
391        // nominal interval.  If collection itself took longer than the
392        // interval, skip sleeping entirely and start the next sample right
393        // away.  This prevents drift accumulation and matches the Python
394        // resource-tracker's timer approach.
395        let elapsed = loop_start.elapsed();
396        if let Some(remaining) = interval.checked_sub(elapsed) {
397            std::thread::sleep(remaining);
398        }
399    }
400}
401
402// ---------------------------------------------------------------------------
403// Tests
404// ---------------------------------------------------------------------------
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    /// Verify that SIGINT sets SIGTERM_RECEIVED, triggering the same graceful
411    /// shutdown path as SIGTERM.  The test installs the handler, resets the
412    /// flag, raises SIGINT, then asserts the flag is true.
413    #[test]
414    fn test_sigint_sets_shutdown_flag() {
415        // Reset in case a previous test left the flag set.
416        SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
417
418        // Install the handler for SIGINT (mirrors what main() does).
419        unsafe {
420            libc::signal(
421                libc::SIGINT,
422                handle_sigterm as *const () as libc::sighandler_t,
423            );
424        }
425
426        // Raise SIGINT on the current process.
427        unsafe {
428            libc::raise(libc::SIGINT);
429        }
430
431        assert!(
432            SIGTERM_RECEIVED.load(Ordering::SeqCst),
433            "SIGTERM_RECEIVED flag must be true after SIGINT"
434        );
435
436        // Clean up: reset the flag and restore the default SIGINT disposition
437        // so this does not interfere with other tests.
438        SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
439        unsafe {
440            libc::signal(libc::SIGINT, libc::SIG_DFL);
441        }
442    }
443}