Skip to main content

resource_tracker/
main.rs

1#![doc = include_str!("../README.md")]
2
3mod collector;
4mod config;
5mod metrics;
6mod output;
7mod sentinel;
8
9extern crate libc;
10
11use collector::{
12    CpuCollector, DiskCollector, GpuCollector, MemoryCollector, NetworkCollector,
13    collect_host_info, spawn_cloud_discovery,
14};
15use config::{Config, OutputFormat};
16use metrics::Sample;
17use sentinel::{BatchUploader, RunContext, SentinelClient, close_run, samples_to_csv, start_run};
18use std::io::Write;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23// ---------------------------------------------------------------------------
24// SIGTERM handler
25// ---------------------------------------------------------------------------
26
27static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
28
29extern "C" fn handle_sigterm(_: libc::c_int) {
30    SIGTERM_RECEIVED.store(true, Ordering::Relaxed);
31}
32
33// ---------------------------------------------------------------------------
34// Graceful shutdown
35// ---------------------------------------------------------------------------
36
37/// Flush remaining samples, close the Sentinel run, then exit.
38///
39/// Called on both shell-wrapper child exit and SIGTERM.  Replaces the former
40/// bare `std::process::exit()` calls so the upload thread always gets a chance
41/// to flush.
42fn shutdown(
43    exit_code: i32,
44    sentinel: Option<&SentinelClient>,
45    run_ctx: Option<Arc<Mutex<RunContext>>>,
46    shutdown_flag: Option<Arc<AtomicBool>>,
47    upload_handle: Option<std::thread::JoinHandle<Vec<String>>>,
48    remaining: Vec<Sample>,
49    interval_secs: u64,
50) -> ! {
51    if let (Some(client), Some(ctx_arc), Some(flag), Some(handle)) =
52        (sentinel, run_ctx, shutdown_flag, upload_handle)
53    {
54        // Signal the upload thread to flush its buffer to S3, then wait for it.
55        // The thread performs one final S3 upload of any remaining buffered samples
56        // before it exits, and returns the list of all successfully uploaded URIs.
57        flag.store(true, Ordering::Relaxed);
58        let uploaded_uris = handle.join().unwrap_or_default();
59
60        // Route selection:
61        //   S3 route   -- at least one batch was uploaded; uploaded_uris is non-empty.
62        //                 The final flush is already included in uploaded_uris.
63        //   Inline route -- no S3 uploads (short run or all S3 failures); send all
64        //                   collected samples as a raw CSV string.
65        let remaining_csv = if uploaded_uris.is_empty() && !remaining.is_empty() {
66            Some(samples_to_csv(&remaining, interval_secs))
67        } else {
68            None
69        };
70
71        let ctx = ctx_arc.lock().unwrap_or_else(|e| e.into_inner());
72        if let Err(e) = close_run(
73            &client.agent,
74            &client.api_base,
75            &client.token,
76            &ctx,
77            Some(exit_code),
78            remaining_csv,
79            &uploaded_uris,
80        ) {
81            eprintln!("warn: sentinel close_run failed: {e}");
82        }
83    }
84
85    std::process::exit(exit_code);
86}
87
88// ---------------------------------------------------------------------------
89// main
90// ---------------------------------------------------------------------------
91
92fn main() {
93    // Install SIGTERM and SIGINT handlers so the binary can flush before exiting.
94    // Both signals set the same flag and trigger the same graceful shutdown path.
95    unsafe {
96        libc::signal(
97            libc::SIGTERM,
98            handle_sigterm as *const () as libc::sighandler_t,
99        );
100        libc::signal(
101            libc::SIGINT,
102            handle_sigterm as *const () as libc::sighandler_t,
103        );
104    }
105
106    let mut config = Config::load();
107
108    // -----------------------------------------------------------------------
109    // Output sink: stdout (default), file (--output), or suppressed (--quiet).
110    // Warnings and errors always go to stderr via eprintln! regardless.
111    // -----------------------------------------------------------------------
112    let mut out_file: Option<std::io::BufWriter<std::fs::File>> = if config.quiet {
113        None
114    } else {
115        config.output_file.as_deref().map(|path| {
116            std::io::BufWriter::new(std::fs::File::create(path).unwrap_or_else(|e| {
117                eprintln!("error: cannot open output file {path}: {e}");
118                std::process::exit(1);
119            }))
120        })
121    };
122
123    // Emit one line of metric output to the selected sink.
124    // quiet=true  -> no-op
125    // output_file -> write to file and flush (so `tail -f` works)
126    // default     -> eprintln! to stderr (keeps stdout clean for the tracked app)
127    macro_rules! emit {
128        ($($arg:tt)*) => {
129            if !config.quiet {
130                if let Some(ref mut f) = out_file {
131                    let _ = writeln!(f, $($arg)*);
132                    let _ = f.flush();
133                } else {
134                    eprintln!($($arg)*);
135                }
136            }
137        }
138    }
139
140    // -----------------------------------------------------------------------
141    // Shell-wrapper mode: spawn the command and track its PID automatically.
142    // -----------------------------------------------------------------------
143    let mut child = if !config.command.is_empty() {
144        let (program, args) = config.command.split_first().expect("command is non-empty");
145        match std::process::Command::new(program).args(args).spawn() {
146            Ok(c) => {
147                config.pid = Some(i32::try_from(c.id()).unwrap_or(i32::MAX));
148                Some(c)
149            }
150            Err(e) => {
151                eprintln!("error: failed to spawn {:?}: {e}", program);
152                std::process::exit(1);
153            }
154        }
155    } else {
156        None
157    };
158
159    let interval = Duration::from_secs(config.interval_secs);
160
161    let mut cpu = CpuCollector::new(config.pid);
162    let memory = MemoryCollector::new();
163    let mut network = NetworkCollector::new();
164    let mut disk = DiskCollector::new();
165    let mut gpu = GpuCollector::new();
166
167    // Collect static GPU info now so host discovery can derive GPU host fields.
168    let initial_gpus = gpu.collect().unwrap_or_default();
169
170    // Host discovery: fast, local, no I/O.
171    let host_info = collect_host_info(&initial_gpus);
172
173    // Cloud discovery: spawn before warm-up so probes run concurrently.
174    let cloud_handle = spawn_cloud_discovery();
175
176    // Warm-up: prime delta state in stateful collectors, then sleep one full
177    // interval so the first real sample has meaningful rates.
178    let _ = cpu.collect();
179    let _ = network.collect();
180    let _ = disk.collect();
181    std::thread::sleep(interval);
182
183    // Cloud probes are bounded by 2s each; they are done by now.
184    let cloud_info = cloud_handle.join().unwrap_or_default();
185
186    // -----------------------------------------------------------------------
187    // Sentinel API setup (gated on SENTINEL_API_TOKEN being set).
188    // -----------------------------------------------------------------------
189    let sentinel = SentinelClient::from_env();
190
191    let (run_ctx_arc, sample_buffer, upload_shutdown_flag, upload_handle) = match &sentinel {
192        None => (None, None, None, None),
193        Some(client) => {
194            match start_run(
195                &client.agent,
196                &client.api_base,
197                &client.token,
198                &config.metadata,
199                config.pid,
200                &host_info,
201                &cloud_info,
202            ) {
203                Err(e) => {
204                    eprintln!("warn: sentinel start_run failed: {e}; streaming disabled");
205                    (None, None, None, None)
206                }
207                Ok(ctx) => {
208                    let ctx_arc = Arc::new(Mutex::new(ctx));
209                    let upload_interval = std::env::var("TRACKER_UPLOAD_INTERVAL")
210                        .ok()
211                        .and_then(|v| v.parse().ok())
212                        .unwrap_or(60u64);
213                    let (uploader, buf) = BatchUploader::new(upload_interval, config.interval_secs);
214                    let flag = uploader.shutdown_flag();
215                    let handle = uploader.spawn(
216                        Arc::clone(&ctx_arc),
217                        client.agent.clone(),
218                        client.api_base.clone(),
219                        client.token.clone(),
220                    );
221                    (Some(ctx_arc), Some(buf), Some(flag), Some(handle))
222                }
223            }
224        }
225    };
226
227    // Emit CSV header once before the loop.
228    if config.format == OutputFormat::Csv {
229        emit!("{}", output::csv::csv_header());
230    }
231
232    // Samples collected since the last S3 batch upload (for local fallback).
233    let mut unflushed: Vec<Sample> = Vec::new();
234
235    // -----------------------------------------------------------------------
236    // Main sampling loop
237    // -----------------------------------------------------------------------
238    loop {
239        let timestamp_secs = SystemTime::now()
240            .duration_since(UNIX_EPOCH)
241            .unwrap_or_default()
242            .as_secs();
243
244        let mut sample = Sample {
245            timestamp_secs,
246            job_name: config.metadata.job_name.clone(),
247            tracked_pid: config.pid,
248            cpu: cpu.collect().unwrap_or_default(),
249            memory: memory.collect().unwrap_or_default(),
250            network: network.collect().unwrap_or_default(),
251            disk: disk.collect().unwrap_or_default(),
252            gpu: gpu.collect().unwrap_or_default(),
253        };
254
255        // Augment with per-process GPU stats.
256        // With --pid: filter to the tracked process tree.
257        // Without --pid: report system-wide GPU allocation (all processes).
258        let (vram_mib, gpu_usage, gpu_utilized) =
259            if config.pid.is_some() && !sample.cpu.process_tree_pids.is_empty() {
260                let pids_u32: Vec<u32> = sample
261                    .cpu
262                    .process_tree_pids
263                    .iter()
264                    .filter_map(|&p| u32::try_from(p).ok())
265                    .collect();
266                gpu.process_gpu_info(&pids_u32, interval)
267            } else {
268                gpu.all_gpu_process_info(interval)
269            };
270        sample.cpu.process_gpu_vram_mib = vram_mib;
271        sample.cpu.process_gpu_usage = gpu_usage;
272        sample.cpu.process_gpu_utilized = gpu_utilized;
273
274        // Emit to selected output sink.
275        match config.format {
276            OutputFormat::Json => match serde_json::to_value(&sample) {
277                Ok(mut v) => {
278                    v[format!("{}-version", env!("CARGO_PKG_NAME"))] =
279                        serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string());
280                    emit!("{}", v);
281                }
282                Err(e) => eprintln!("warn: json serialize error: {e}"),
283            },
284            OutputFormat::Csv => {
285                emit!(
286                    "{}",
287                    output::csv::sample_to_csv_row(&sample, config.interval_secs)
288                );
289            }
290        }
291
292        // Push to sentinel buffer (if streaming is active).
293        if let Some(ref buf) = sample_buffer {
294            buf.lock()
295                .unwrap_or_else(|e| e.into_inner())
296                .push(sample.clone());
297        }
298        unflushed.push(sample);
299
300        // -----------------------------------------------------------------------
301        // Shell-wrapper exit check
302        // -----------------------------------------------------------------------
303        if let Some(ref mut c) = child {
304            match c.try_wait() {
305                Ok(Some(status)) => {
306                    let code = status.code().unwrap_or(1);
307                    shutdown(
308                        code,
309                        sentinel.as_ref(),
310                        run_ctx_arc,
311                        upload_shutdown_flag,
312                        upload_handle,
313                        unflushed,
314                        config.interval_secs,
315                    );
316                }
317                Ok(None) => {}
318                Err(e) => eprintln!("warn: error checking child status: {e}"),
319            }
320        }
321
322        // SIGTERM received: flush and exit cleanly.
323        if SIGTERM_RECEIVED.load(Ordering::Relaxed) {
324            shutdown(
325                0,
326                sentinel.as_ref(),
327                run_ctx_arc,
328                upload_shutdown_flag,
329                upload_handle,
330                unflushed,
331                config.interval_secs,
332            );
333        }
334
335        std::thread::sleep(interval);
336    }
337}
338
339// ---------------------------------------------------------------------------
340// Tests
341// ---------------------------------------------------------------------------
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    /// Verify that SIGINT sets SIGTERM_RECEIVED, triggering the same graceful
348    /// shutdown path as SIGTERM.  The test installs the handler, resets the
349    /// flag, raises SIGINT, then asserts the flag is true.
350    #[test]
351    fn test_sigint_sets_shutdown_flag() {
352        // Reset in case a previous test left the flag set.
353        SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
354
355        // Install the handler for SIGINT (mirrors what main() does).
356        unsafe {
357            libc::signal(
358                libc::SIGINT,
359                handle_sigterm as *const () as libc::sighandler_t,
360            );
361        }
362
363        // Raise SIGINT on the current process.
364        unsafe {
365            libc::raise(libc::SIGINT);
366        }
367
368        assert!(
369            SIGTERM_RECEIVED.load(Ordering::SeqCst),
370            "SIGTERM_RECEIVED flag must be true after SIGINT"
371        );
372
373        // Clean up: reset the flag and restore the default SIGINT disposition
374        // so this does not interfere with other tests.
375        SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
376        unsafe {
377            libc::signal(libc::SIGINT, libc::SIG_DFL);
378        }
379    }
380}