resource_tracker/main.rs
1#![doc = include_str!("../README.md")]
2
3#[cfg(not(target_os = "linux"))]
4compile_error!(
5 "resource-tracker only supports Linux; /proc and cgroup interfaces are Linux-specific."
6);
7
8mod collector;
9mod config;
10mod metrics;
11mod output;
12mod sentinel;
13mod thread_util;
14
15extern crate libc;
16
17use collector::{
18 CpuCollector, DiskCollector, GpuCollector, MemoryCollector, NetworkCollector,
19 collect_host_info, spawn_cloud_discovery,
20};
21use config::{Config, OutputFormat};
22use metrics::CloudInfo;
23use metrics::Sample;
24use sentinel::{BatchUploader, RunContext, SentinelClient, close_run, samples_to_csv, start_run};
25use std::io::Write;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30// ---------------------------------------------------------------------------
31// SIGTERM handler
32// ---------------------------------------------------------------------------
33
34static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
35
36extern "C" fn handle_sigterm(_: libc::c_int) {
37 SIGTERM_RECEIVED.store(true, Ordering::Relaxed);
38}
39
40// ---------------------------------------------------------------------------
41// Graceful shutdown
42// ---------------------------------------------------------------------------
43
44/// Flush remaining samples, close the Sentinel run, then exit.
45///
46/// Called on both shell-wrapper child exit and SIGTERM. Replaces the former
47/// bare `std::process::exit()` calls so the upload thread always gets a chance
48/// to flush.
49fn shutdown(
50 exit_code: i32,
51 sentinel: Option<&SentinelClient>,
52 run_ctx: Option<Arc<Mutex<RunContext>>>,
53 shutdown_flag: Option<Arc<AtomicBool>>,
54 upload_handle: Option<std::thread::JoinHandle<Vec<String>>>,
55 remaining: Vec<Sample>,
56 interval_secs: u64,
57) -> ! {
58 if let (Some(client), Some(ctx_arc), Some(flag), Some(handle)) =
59 (sentinel, run_ctx, shutdown_flag, upload_handle)
60 {
61 // Signal the upload thread to flush its buffer to S3, then wait for it.
62 // The thread performs one final S3 upload of any remaining buffered samples
63 // before it exits, and returns the list of all successfully uploaded URIs.
64 flag.store(true, Ordering::Relaxed);
65 let uploaded_uris = handle.join().unwrap_or_default();
66
67 // Route selection:
68 // S3 route -- at least one batch was uploaded; uploaded_uris is non-empty.
69 // The final flush is already included in uploaded_uris.
70 // Inline route -- no S3 uploads (short run or all S3 failures); send all
71 // collected samples as a raw CSV string.
72 let remaining_csv = if uploaded_uris.is_empty() && !remaining.is_empty() {
73 Some(samples_to_csv(&remaining, interval_secs))
74 } else {
75 None
76 };
77
78 let ctx = ctx_arc.lock().unwrap_or_else(|e| e.into_inner());
79 if let Err(e) = close_run(
80 &client.agent,
81 &client.api_base,
82 &client.token,
83 &ctx,
84 Some(exit_code),
85 remaining_csv,
86 &uploaded_uris,
87 ) {
88 eprintln!("warn: sentinel close_run failed: {e}");
89 }
90 }
91
92 std::process::exit(exit_code);
93}
94
95// ---------------------------------------------------------------------------
96// main
97// ---------------------------------------------------------------------------
98
99fn main() {
100 // Install SIGTERM and SIGINT handlers so the binary can flush before exiting.
101 // Both signals set the same flag and trigger the same graceful shutdown path.
102 unsafe {
103 libc::signal(
104 libc::SIGTERM,
105 handle_sigterm as *const () as libc::sighandler_t,
106 );
107 libc::signal(
108 libc::SIGINT,
109 handle_sigterm as *const () as libc::sighandler_t,
110 );
111 }
112
113 let mut config = Config::load();
114
115 // -----------------------------------------------------------------------
116 // Output sink: stdout (default), file (--output), or suppressed (--quiet).
117 // Warnings and errors always go to stderr via eprintln! regardless.
118 // -----------------------------------------------------------------------
119 let mut out_file: Option<std::io::BufWriter<std::fs::File>> = if config.quiet {
120 None
121 } else {
122 config.output_file.as_deref().map(|path| {
123 std::io::BufWriter::new(std::fs::File::create(path).unwrap_or_else(|e| {
124 eprintln!("error: cannot open output file {path}: {e}");
125 std::process::exit(1);
126 }))
127 })
128 };
129
130 // Emit one line of metric output to the selected sink.
131 // quiet=true -> no-op
132 // output_file -> write to file and flush (so `tail -f` works)
133 // default -> eprintln! to stderr (keeps stdout clean for the tracked app)
134 macro_rules! emit {
135 ($($arg:tt)*) => {
136 if !config.quiet {
137 if let Some(ref mut f) = out_file {
138 let _ = writeln!(f, $($arg)*);
139 let _ = f.flush();
140 } else {
141 eprintln!($($arg)*);
142 }
143 }
144 }
145 }
146
147 let interval = Duration::from_secs(config.interval_secs);
148
149 // Shell-wrapper child is spawned after warm-up so cloud IMDS probes (ureq may
150 // use helper threads) do not race with fork-heavy stressors under PID limits.
151 let mut child: Option<std::process::Child> = None;
152
153 let mut cpu = CpuCollector::new(config.pid);
154 let memory = MemoryCollector::new();
155 let mut network = NetworkCollector::new();
156 let mut disk = DiskCollector::new(interval);
157 let mut gpu = GpuCollector::new();
158
159 // Collect static GPU info now so host discovery can derive GPU host fields.
160 let initial_gpus = gpu.collect().unwrap_or_default();
161
162 // Host discovery: fast, local, no I/O.
163 let host_info = collect_host_info(&initial_gpus);
164
165 // Warm-up: prime delta state in stateful collectors while cloud probes run
166 // in the background. spawn_cloud_discovery returns a channel Receiver so
167 // the caller never blocks on probe completion -- try_recv() picks up the
168 // result if probes finished during the sleep, or leaves cloud_info as None
169 // to be resolved later (per-tick poll in the main loop, or recv_timeout
170 // before start_run for Sentinel runs).
171 let cloud_rx = spawn_cloud_discovery();
172 let _ = cpu.collect();
173 let _ = network.collect();
174 let _ = disk.collect();
175 std::thread::sleep(interval);
176 // Non-blocking: on most non-cloud machines all probes fail fast
177 // (EHOSTUNREACH); on cloud machines the matching probe returns in < 100 ms.
178 // Either way the result is typically waiting by the time we reach here.
179 let mut cloud_info: Option<CloudInfo> = cloud_rx.as_ref().and_then(|rx| rx.try_recv().ok());
180
181 // -----------------------------------------------------------------------
182 // Shell-wrapper mode: spawn the tracked command after warm-up / cloud probe.
183 // -----------------------------------------------------------------------
184 if !config.command.is_empty() {
185 let (program, args) = config.command.split_first().expect("command is non-empty");
186 match std::process::Command::new(program).args(args).spawn() {
187 Ok(c) => {
188 config.pid = Some(i32::try_from(c.id()).unwrap_or(i32::MAX));
189 cpu.set_tracked_pid(config.pid);
190 child = Some(c);
191 }
192 Err(e) => {
193 eprintln!("error: failed to spawn {:?}: {e}", program);
194 std::process::exit(1);
195 }
196 }
197 }
198
199 // -----------------------------------------------------------------------
200 // Sentinel API setup (gated on SENTINEL_API_TOKEN being set).
201 // -----------------------------------------------------------------------
202 let sentinel = SentinelClient::from_env();
203
204 let (run_ctx_arc, sample_buffer, upload_shutdown_flag, upload_handle) = match &sentinel {
205 None => (None, None, None, None),
206 Some(client) => {
207 // Bounded wait: give cloud discovery a chance to complete before
208 // start_run so the run record carries cloud metadata. IMDS probes
209 // run in parallel and finish within IMDS_TIMEOUT (1 s); 3 s is a
210 // generous ceiling for unusual network paths. Pure metric runs
211 // (no Sentinel token) skip this entirely.
212 if cloud_info.is_none() {
213 if let Some(ref rx) = cloud_rx {
214 cloud_info = rx.recv_timeout(Duration::from_secs(3)).ok();
215 }
216 }
217 let default_cloud = CloudInfo::default();
218 match start_run(
219 &client.agent,
220 &client.api_base,
221 &client.token,
222 &config.metadata,
223 config.pid,
224 &host_info,
225 cloud_info.as_ref().unwrap_or(&default_cloud),
226 ) {
227 Err(e) => {
228 eprintln!("warn: sentinel start_run failed: {e}; streaming disabled");
229 (None, None, None, None)
230 }
231 Ok(ctx) => {
232 let ctx_arc = Arc::new(Mutex::new(ctx));
233 let upload_interval = std::env::var("TRACKER_UPLOAD_INTERVAL")
234 .ok()
235 .and_then(|v| v.parse().ok())
236 .unwrap_or(60u64);
237 let (uploader, buf) = BatchUploader::new(upload_interval, config.interval_secs);
238 let flag = uploader.shutdown_flag();
239 let upload_handle = uploader.spawn(
240 Arc::clone(&ctx_arc),
241 SentinelClient::new_upload_agent(),
242 client.api_base.clone(),
243 client.token.clone(),
244 );
245 if upload_handle.is_none() {
246 eprintln!(
247 "warn: sentinel background upload disabled; samples will be flushed inline on exit"
248 );
249 }
250 (Some(ctx_arc), Some(buf), Some(flag), upload_handle)
251 }
252 }
253 }
254 };
255
256 // Emit CSV header once before the loop.
257 if config.format == OutputFormat::Csv {
258 emit!("{}", output::csv::csv_header());
259 }
260
261 // Samples collected since the last S3 batch upload (for local fallback).
262 let mut unflushed: Vec<Sample> = Vec::new();
263
264 // Tracks the Instant at the start of each loop iteration so we can
265 // compute the actual elapsed interval between samples and sleep only
266 // for the remainder of the nominal interval (deadline-based scheduling).
267 let mut prev_loop_start: Option<Instant> = None;
268
269 // -----------------------------------------------------------------------
270 // Main sampling loop
271 // -----------------------------------------------------------------------
272 loop {
273 // Poll for cloud discovery result if not yet received. Typically a
274 // no-op because probes complete within IMDS_TIMEOUT and the warm-up
275 // sleep covers that window. Ensures the channel is drained and
276 // cloud_info is populated for any future use.
277 if cloud_info.is_none()
278 && let Some(ref rx) = cloud_rx
279 && let Ok(info) = rx.try_recv()
280 {
281 cloud_info = Some(info);
282 }
283
284 let loop_start = Instant::now();
285
286 // Actual elapsed since the previous iteration started. None on the
287 // first real sample (no prior loop start to compare against).
288 let actual_interval_ms: Option<u64> = prev_loop_start
289 .map(|p| u64::try_from((loop_start - p).as_millis()).unwrap_or(u64::MAX));
290
291 let timestamp_secs = SystemTime::now()
292 .duration_since(UNIX_EPOCH)
293 .unwrap_or_default()
294 .as_secs();
295
296 let mut sample = Sample {
297 timestamp_secs,
298 actual_interval_ms,
299 job_name: config.metadata.job_name.clone(),
300 tracked_pid: config.pid,
301 cpu: cpu.collect().unwrap_or_default(),
302 memory: memory.collect().unwrap_or_default(),
303 network: network.collect().unwrap_or_default(),
304 disk: disk.collect().unwrap_or_default(),
305 gpu: gpu.collect().unwrap_or_default(),
306 };
307
308 // Augment with per-process GPU stats.
309 // With --pid: filter to the tracked process tree.
310 // Without --pid: report system-wide GPU allocation (all processes).
311 let (vram_mib, gpu_usage, gpu_utilized) =
312 if config.pid.is_some() && !sample.cpu.process_tree_pids.is_empty() {
313 let pids_u32: Vec<u32> = sample
314 .cpu
315 .process_tree_pids
316 .iter()
317 .filter_map(|&p| u32::try_from(p).ok())
318 .collect();
319 gpu.process_gpu_info(&pids_u32, interval)
320 } else {
321 gpu.all_gpu_process_info(interval)
322 };
323 sample.cpu.process_gpu_vram_mib = vram_mib;
324 sample.cpu.process_gpu_usage = gpu_usage;
325 sample.cpu.process_gpu_utilized = gpu_utilized;
326
327 // Emit to selected output sink.
328 match config.format {
329 OutputFormat::Json => match serde_json::to_value(&sample) {
330 Ok(mut v) => {
331 v[format!("{}-version", env!("CARGO_PKG_NAME"))] =
332 serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string());
333 emit!("{}", v);
334 }
335 Err(e) => eprintln!("warn: json serialize error: {e}"),
336 },
337 OutputFormat::Csv => {
338 emit!(
339 "{}",
340 output::csv::sample_to_csv_row(&sample, config.interval_secs)
341 );
342 }
343 }
344
345 // Push to sentinel buffer (if streaming is active).
346 if let Some(ref buf) = sample_buffer {
347 buf.lock()
348 .unwrap_or_else(|e| e.into_inner())
349 .push(sample.clone());
350 }
351 unflushed.push(sample);
352
353 // -----------------------------------------------------------------------
354 // Shell-wrapper exit check
355 // -----------------------------------------------------------------------
356 if let Some(ref mut c) = child {
357 match c.try_wait() {
358 Ok(Some(status)) => {
359 let code = status.code().unwrap_or(1);
360 shutdown(
361 code,
362 sentinel.as_ref(),
363 run_ctx_arc,
364 upload_shutdown_flag,
365 upload_handle,
366 unflushed,
367 config.interval_secs,
368 );
369 }
370 Ok(None) => {}
371 Err(e) => eprintln!("warn: error checking child status: {e}"),
372 }
373 }
374
375 // SIGTERM received: flush and exit cleanly.
376 if SIGTERM_RECEIVED.load(Ordering::Relaxed) {
377 shutdown(
378 0,
379 sentinel.as_ref(),
380 run_ctx_arc,
381 upload_shutdown_flag,
382 upload_handle,
383 unflushed,
384 config.interval_secs,
385 );
386 }
387
388 prev_loop_start = Some(loop_start);
389
390 // Deadline-based sleep: sleep only for the time remaining in the
391 // nominal interval. If collection itself took longer than the
392 // interval, skip sleeping entirely and start the next sample right
393 // away. This prevents drift accumulation and matches the Python
394 // resource-tracker's timer approach.
395 let elapsed = loop_start.elapsed();
396 if let Some(remaining) = interval.checked_sub(elapsed) {
397 std::thread::sleep(remaining);
398 }
399 }
400}
401
402// ---------------------------------------------------------------------------
403// Tests
404// ---------------------------------------------------------------------------
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 /// Verify that SIGINT sets SIGTERM_RECEIVED, triggering the same graceful
411 /// shutdown path as SIGTERM. The test installs the handler, resets the
412 /// flag, raises SIGINT, then asserts the flag is true.
413 #[test]
414 fn test_sigint_sets_shutdown_flag() {
415 // Reset in case a previous test left the flag set.
416 SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
417
418 // Install the handler for SIGINT (mirrors what main() does).
419 unsafe {
420 libc::signal(
421 libc::SIGINT,
422 handle_sigterm as *const () as libc::sighandler_t,
423 );
424 }
425
426 // Raise SIGINT on the current process.
427 unsafe {
428 libc::raise(libc::SIGINT);
429 }
430
431 assert!(
432 SIGTERM_RECEIVED.load(Ordering::SeqCst),
433 "SIGTERM_RECEIVED flag must be true after SIGINT"
434 );
435
436 // Clean up: reset the flag and restore the default SIGINT disposition
437 // so this does not interfere with other tests.
438 SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
439 unsafe {
440 libc::signal(libc::SIGINT, libc::SIG_DFL);
441 }
442 }
443}