1#![doc = include_str!("../README.md")]
2
3mod collector;
4mod config;
5mod metrics;
6mod output;
7mod sentinel;
8
9extern crate libc;
10
11use collector::{
12 CpuCollector, DiskCollector, GpuCollector, MemoryCollector, NetworkCollector,
13 collect_host_info, spawn_cloud_discovery,
14};
15use config::{Config, OutputFormat};
16use metrics::Sample;
17use sentinel::{BatchUploader, RunContext, SentinelClient, close_run, samples_to_csv, start_run};
18use std::io::Write;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
28
29extern "C" fn handle_sigterm(_: libc::c_int) {
30 SIGTERM_RECEIVED.store(true, Ordering::Relaxed);
31}
32
33fn shutdown(
43 exit_code: i32,
44 sentinel: Option<&SentinelClient>,
45 run_ctx: Option<Arc<Mutex<RunContext>>>,
46 shutdown_flag: Option<Arc<AtomicBool>>,
47 upload_handle: Option<std::thread::JoinHandle<Vec<String>>>,
48 remaining: Vec<Sample>,
49 interval_secs: u64,
50) -> ! {
51 if let (Some(client), Some(ctx_arc), Some(flag), Some(handle)) =
52 (sentinel, run_ctx, shutdown_flag, upload_handle)
53 {
54 flag.store(true, Ordering::Relaxed);
58 let uploaded_uris = handle.join().unwrap_or_default();
59
60 let remaining_csv = if uploaded_uris.is_empty() && !remaining.is_empty() {
66 Some(samples_to_csv(&remaining, interval_secs))
67 } else {
68 None
69 };
70
71 let ctx = ctx_arc.lock().unwrap_or_else(|e| e.into_inner());
72 if let Err(e) = close_run(
73 &client.agent,
74 &client.api_base,
75 &client.token,
76 &ctx,
77 Some(exit_code),
78 remaining_csv,
79 &uploaded_uris,
80 ) {
81 eprintln!("warn: sentinel close_run failed: {e}");
82 }
83 }
84
85 std::process::exit(exit_code);
86}
87
88fn main() {
93 unsafe {
96 libc::signal(
97 libc::SIGTERM,
98 handle_sigterm as *const () as libc::sighandler_t,
99 );
100 libc::signal(
101 libc::SIGINT,
102 handle_sigterm as *const () as libc::sighandler_t,
103 );
104 }
105
106 let mut config = Config::load();
107
108 let mut out_file: Option<std::io::BufWriter<std::fs::File>> = if config.quiet {
113 None
114 } else {
115 config.output_file.as_deref().map(|path| {
116 std::io::BufWriter::new(std::fs::File::create(path).unwrap_or_else(|e| {
117 eprintln!("error: cannot open output file {path}: {e}");
118 std::process::exit(1);
119 }))
120 })
121 };
122
123 macro_rules! emit {
128 ($($arg:tt)*) => {
129 if !config.quiet {
130 if let Some(ref mut f) = out_file {
131 let _ = writeln!(f, $($arg)*);
132 let _ = f.flush();
133 } else {
134 eprintln!($($arg)*);
135 }
136 }
137 }
138 }
139
140 let mut child = if !config.command.is_empty() {
144 let (program, args) = config.command.split_first().expect("command is non-empty");
145 match std::process::Command::new(program).args(args).spawn() {
146 Ok(c) => {
147 config.pid = Some(i32::try_from(c.id()).unwrap_or(i32::MAX));
148 Some(c)
149 }
150 Err(e) => {
151 eprintln!("error: failed to spawn {:?}: {e}", program);
152 std::process::exit(1);
153 }
154 }
155 } else {
156 None
157 };
158
159 let interval = Duration::from_secs(config.interval_secs);
160
161 let mut cpu = CpuCollector::new(config.pid);
162 let memory = MemoryCollector::new();
163 let mut network = NetworkCollector::new();
164 let mut disk = DiskCollector::new();
165 let mut gpu = GpuCollector::new();
166
167 let initial_gpus = gpu.collect().unwrap_or_default();
169
170 let host_info = collect_host_info(&initial_gpus);
172
173 let cloud_handle = spawn_cloud_discovery();
175
176 let _ = cpu.collect();
179 let _ = network.collect();
180 let _ = disk.collect();
181 std::thread::sleep(interval);
182
183 let cloud_info = cloud_handle.join().unwrap_or_default();
185
186 let sentinel = SentinelClient::from_env();
190
191 let (run_ctx_arc, sample_buffer, upload_shutdown_flag, upload_handle) = match &sentinel {
192 None => (None, None, None, None),
193 Some(client) => {
194 match start_run(
195 &client.agent,
196 &client.api_base,
197 &client.token,
198 &config.metadata,
199 config.pid,
200 &host_info,
201 &cloud_info,
202 ) {
203 Err(e) => {
204 eprintln!("warn: sentinel start_run failed: {e}; streaming disabled");
205 (None, None, None, None)
206 }
207 Ok(ctx) => {
208 let ctx_arc = Arc::new(Mutex::new(ctx));
209 let upload_interval = std::env::var("TRACKER_UPLOAD_INTERVAL")
210 .ok()
211 .and_then(|v| v.parse().ok())
212 .unwrap_or(60u64);
213 let (uploader, buf) = BatchUploader::new(upload_interval, config.interval_secs);
214 let flag = uploader.shutdown_flag();
215 let handle = uploader.spawn(
216 Arc::clone(&ctx_arc),
217 client.agent.clone(),
218 client.api_base.clone(),
219 client.token.clone(),
220 );
221 (Some(ctx_arc), Some(buf), Some(flag), Some(handle))
222 }
223 }
224 }
225 };
226
227 if config.format == OutputFormat::Csv {
229 emit!("{}", output::csv::csv_header());
230 }
231
232 let mut unflushed: Vec<Sample> = Vec::new();
234
235 loop {
239 let timestamp_secs = SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .unwrap_or_default()
242 .as_secs();
243
244 let mut sample = Sample {
245 timestamp_secs,
246 job_name: config.metadata.job_name.clone(),
247 tracked_pid: config.pid,
248 cpu: cpu.collect().unwrap_or_default(),
249 memory: memory.collect().unwrap_or_default(),
250 network: network.collect().unwrap_or_default(),
251 disk: disk.collect().unwrap_or_default(),
252 gpu: gpu.collect().unwrap_or_default(),
253 };
254
255 let (vram_mib, gpu_usage, gpu_utilized) =
259 if config.pid.is_some() && !sample.cpu.process_tree_pids.is_empty() {
260 let pids_u32: Vec<u32> = sample
261 .cpu
262 .process_tree_pids
263 .iter()
264 .filter_map(|&p| u32::try_from(p).ok())
265 .collect();
266 gpu.process_gpu_info(&pids_u32, interval)
267 } else {
268 gpu.all_gpu_process_info(interval)
269 };
270 sample.cpu.process_gpu_vram_mib = vram_mib;
271 sample.cpu.process_gpu_usage = gpu_usage;
272 sample.cpu.process_gpu_utilized = gpu_utilized;
273
274 match config.format {
276 OutputFormat::Json => match serde_json::to_value(&sample) {
277 Ok(mut v) => {
278 v[format!("{}-version", env!("CARGO_PKG_NAME"))] =
279 serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string());
280 emit!("{}", v);
281 }
282 Err(e) => eprintln!("warn: json serialize error: {e}"),
283 },
284 OutputFormat::Csv => {
285 emit!(
286 "{}",
287 output::csv::sample_to_csv_row(&sample, config.interval_secs)
288 );
289 }
290 }
291
292 if let Some(ref buf) = sample_buffer {
294 buf.lock()
295 .unwrap_or_else(|e| e.into_inner())
296 .push(sample.clone());
297 }
298 unflushed.push(sample);
299
300 if let Some(ref mut c) = child {
304 match c.try_wait() {
305 Ok(Some(status)) => {
306 let code = status.code().unwrap_or(1);
307 shutdown(
308 code,
309 sentinel.as_ref(),
310 run_ctx_arc,
311 upload_shutdown_flag,
312 upload_handle,
313 unflushed,
314 config.interval_secs,
315 );
316 }
317 Ok(None) => {}
318 Err(e) => eprintln!("warn: error checking child status: {e}"),
319 }
320 }
321
322 if SIGTERM_RECEIVED.load(Ordering::Relaxed) {
324 shutdown(
325 0,
326 sentinel.as_ref(),
327 run_ctx_arc,
328 upload_shutdown_flag,
329 upload_handle,
330 unflushed,
331 config.interval_secs,
332 );
333 }
334
335 std::thread::sleep(interval);
336 }
337}
338
339#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[test]
351 fn test_sigint_sets_shutdown_flag() {
352 SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
354
355 unsafe {
357 libc::signal(
358 libc::SIGINT,
359 handle_sigterm as *const () as libc::sighandler_t,
360 );
361 }
362
363 unsafe {
365 libc::raise(libc::SIGINT);
366 }
367
368 assert!(
369 SIGTERM_RECEIVED.load(Ordering::SeqCst),
370 "SIGTERM_RECEIVED flag must be true after SIGINT"
371 );
372
373 SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
376 unsafe {
377 libc::signal(libc::SIGINT, libc::SIG_DFL);
378 }
379 }
380}