1use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34 let mut out = String::with_capacity(samples.len() * 256);
35 out.push_str(csv_header());
36 out.push('\n');
37 samples.iter().for_each(|s| {
38 out.push_str(&sample_to_csv_row(s, interval_secs));
39 out.push('\n');
40 });
41 out
42}
43
44pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47 encoder
48 .write_all(data)
49 .map_err(|e| format!("gzip write failed: {e}"))?;
50 encoder
51 .finish()
52 .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55pub struct BatchUploader {
60 pub buffer: SampleBuffer,
62 shutdown: Arc<AtomicBool>,
64 upload_interval_secs: u64,
66 sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71 pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74 let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75 let uploader = Self {
76 buffer: Arc::clone(&buffer),
77 shutdown: Arc::new(AtomicBool::new(false)),
78 upload_interval_secs,
79 sample_interval_secs,
80 };
81 (uploader, buffer)
82 }
83
84 pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87 Arc::clone(&self.shutdown)
88 }
89
90 pub fn spawn(
103 self,
104 ctx: Arc<Mutex<RunContext>>,
105 agent: ureq::Agent,
106 api_base: String,
107 token: String,
108 ) -> std::thread::JoinHandle<Vec<String>> {
109 std::thread::spawn(move || {
110 let mut region_cache = RegionCache::new();
111 let mut seq: u32 = 0;
112 let mut consecutive_failures: u32 = 0;
113 let mut uploaded_uris: Vec<String> = Vec::new();
114
115 let tick = Duration::from_millis(250);
118 let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120 loop {
121 let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123 if !shutting_down {
124 (0..ticks_per_interval)
125 .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126 .for_each(|_| std::thread::sleep(tick));
127 }
128
129 let batch: Vec<Sample> = {
131 let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132 std::mem::take(&mut *guard)
133 };
134
135 if batch.is_empty() {
136 if shutting_down {
137 break;
138 }
139 continue;
140 }
141
142 let csv = samples_to_csv(&batch, self.sample_interval_secs);
144 let compressed = match gzip_compress(csv.as_bytes()) {
145 Ok(b) => b,
146 Err(e) => {
147 eprintln!("warn: upload batch {seq} gzip failed: {e}");
148 if shutting_down {
149 break;
150 }
151 continue;
152 }
153 };
154
155 {
157 let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158 if ctx_guard.creds_expiring_soon()
159 && let Err(e) =
160 refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161 {
162 eprintln!("warn: credential refresh failed: {e}");
163 }
164 }
165
166 let (bucket, prefix, creds, run_id) = {
168 let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169 let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170 crate::sentinel::s3::S3Uri {
171 bucket: String::new(),
172 key: String::new(),
173 }
174 });
175 (
176 uri.bucket,
177 uri.key,
178 ctx_guard.credentials.clone(),
179 ctx_guard.run_id.clone(),
180 )
181 };
182
183 if bucket.is_empty() {
184 eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185 if shutting_down {
186 break;
187 }
188 continue;
189 }
190
191 let region = region_cache.get_or_detect(&bucket);
192 let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194 let result: Result<String, String> = {
198 let mut last_err = String::new();
199 let mut uploaded_uri: Option<String> = None;
200 for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201 if attempt > 0 {
202 std::thread::sleep(Duration::from_secs(1u64 << attempt));
203 }
204 match s3_put(&agent, &bucket, &key, ®ion, &compressed, &creds) {
205 Ok(uri) => {
206 uploaded_uri = Some(uri);
207 break;
208 }
209 Err(e) => {
210 last_err = if last_err.is_empty() {
211 e
212 } else {
213 format!("{last_err}; retry{attempt}: {e}")
214 };
215 }
216 }
217 }
218 match uploaded_uri {
219 Some(uri) => Ok(uri),
220 None => Err(last_err),
221 }
222 };
223
224 match result {
225 Ok(uri) => {
226 uploaded_uris.push(uri);
227 seq += 1;
228 consecutive_failures = 0;
229 }
230 Err(e) => {
231 consecutive_failures += 1;
232 eprintln!(
233 "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234 );
235 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236 eprintln!(
237 "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238 );
239 consecutive_failures = 0;
240 }
241 }
242 }
243
244 if shutting_down {
245 break;
246 }
247 }
248 uploaded_uris
249 })
250 }
251}
252
253#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261 use crate::output::csv::csv_header;
262 use std::io::Read;
263
264 fn minimal_sample() -> Sample {
265 Sample {
266 timestamp_secs: 1_000_000,
267 job_name: None,
268 tracked_pid: None,
269 cpu: CpuMetrics {
270 utilization_pct: 1.0,
271 process_utime_secs: None,
272 process_stime_secs: None,
273 process_rss_mib: None,
274 process_disk_read_bytes: None,
275 process_disk_write_bytes: None,
276 process_gpu_vram_mib: None,
277 process_gpu_utilized: None,
278 process_tree_pids: vec![],
279 ..Default::default()
280 },
281 memory: MemoryMetrics {
282 free_mib: 512,
283 used_mib: 512,
284 ..Default::default()
285 },
286 network: vec![],
287 disk: vec![],
288 gpu: vec![],
289 }
290 }
291
292 #[test]
296 fn test_upload_thread_shuts_down_promptly() {
297 use crate::sentinel::run::RunContext;
298 use crate::sentinel::s3::UploadCredentials;
299 use std::sync::{Arc, Mutex};
300 use std::time::{Duration, Instant};
301
302 let (uploader, _buf) = BatchUploader::new(60, 1);
303 let flag = uploader.shutdown_flag();
304
305 let ctx = Arc::new(Mutex::new(RunContext {
306 run_id: "r".to_string(),
307 upload_uri_prefix: "s3://b/p".to_string(),
308 credentials: UploadCredentials {
309 access_key_id: "k".to_string(),
310 secret_access_key: "s".to_string(),
311 session_token: "t".to_string(),
312 expires_at: "2099-01-01T00:00:00Z".to_string(),
313 },
314 }));
315
316 let agent = ureq::config::Config::builder()
317 .timeout_global(Some(Duration::from_secs(1)))
318 .build()
319 .new_agent();
320
321 let handle = uploader.spawn(
323 ctx,
324 agent,
325 "http://127.0.0.1:1".to_string(),
326 "token".to_string(),
327 );
328
329 let t0 = Instant::now();
331 flag.store(true, Ordering::Relaxed);
332 handle.join().expect("upload thread panicked");
333 let elapsed = t0.elapsed();
334
335 assert!(
336 elapsed < Duration::from_secs(2),
337 "upload thread took {elapsed:?} to shut down; expected < 2 s"
338 );
339 }
340
341 #[test]
347 fn test_gzip_compress_decompresses_to_valid_csv() {
348 let samples = vec![minimal_sample(), minimal_sample()];
349 let csv = samples_to_csv(&samples, 1);
350 let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
351
352 assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
354
355 let mut decoder = GzDecoder::new(&compressed[..]);
357 let mut decompressed = String::new();
358 decoder
359 .read_to_string(&mut decompressed)
360 .expect("gzip decompression failed");
361 assert_eq!(
362 decompressed, csv,
363 "decompressed content does not match original CSV"
364 );
365
366 let first_line = decompressed.lines().next().expect("empty output");
368 assert_eq!(first_line, csv_header(), "first line is not the CSV header");
369
370 let header_cols = csv_header().split(',').count();
372 decompressed
373 .lines()
374 .skip(1)
375 .enumerate()
376 .for_each(|(i, line)| {
377 assert!(!line.is_empty(), "unexpected empty data line at index {i}");
378 let cols = line.split(',').count();
379 assert_eq!(
380 cols, header_cols,
381 "data row {i} has {cols} columns, expected {header_cols}: {line}"
382 );
383 });
384 }
385
386 #[test]
388 fn test_samples_to_csv_all_lines_end_with_newline() {
389 let samples = vec![minimal_sample()];
390 let csv = samples_to_csv(&samples, 1);
391 csv.split_inclusive('\n').for_each(|chunk| {
392 assert!(
393 chunk.ends_with('\n'),
394 "line does not end with newline: {chunk:?}"
395 );
396 });
397 }
398
399 #[test]
406 fn test_upload_thread_processes_batch_with_invalid_uri() {
407 use crate::sentinel::run::RunContext;
408 use crate::sentinel::s3::UploadCredentials;
409 use std::sync::atomic::Ordering;
410 use std::sync::{Arc, Mutex};
411 use std::time::{Duration, Instant};
412
413 let (uploader, buf) = BatchUploader::new(1, 1);
414 let flag = uploader.shutdown_flag();
415
416 {
418 let mut guard = buf.lock().unwrap();
419 guard.push(minimal_sample());
420 }
421
422 let ctx = Arc::new(Mutex::new(RunContext {
424 run_id: "r".to_string(),
425 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
426 credentials: UploadCredentials {
427 access_key_id: "k".to_string(),
428 secret_access_key: "s".to_string(),
429 session_token: "t".to_string(),
430 expires_at: "2099-01-01T00:00:00Z".to_string(),
431 },
432 }));
433
434 let agent = ureq::config::Config::builder()
435 .timeout_global(Some(Duration::from_millis(100)))
436 .build()
437 .new_agent();
438
439 flag.store(true, Ordering::Relaxed);
441
442 let handle = uploader.spawn(
443 ctx,
444 agent,
445 "http://127.0.0.1:1".to_string(),
446 "token".to_string(),
447 );
448
449 let t0 = Instant::now();
450 handle.join().expect("upload thread panicked");
451 let elapsed = t0.elapsed();
452
453 assert!(
454 elapsed < Duration::from_secs(2),
455 "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
456 );
457 }
458
459 #[test]
464 fn test_upload_roundtrip_real_api() {
465 use crate::config::JobMetadata;
466 use crate::metrics::{CloudInfo, HostInfo};
467 use crate::sentinel::run::{close_run, start_run};
468 use std::sync::atomic::Ordering;
469 use std::sync::{Arc, Mutex};
470 use std::time::Duration;
471
472 let token = match std::env::var("SENTINEL_API_TOKEN") {
473 Ok(t) if !t.is_empty() => t,
474 _ => {
475 eprintln!("skip: SENTINEL_API_TOKEN not set");
476 return;
477 }
478 };
479 let api_base = std::env::var("SENTINEL_API_BASE")
480 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
481 eprintln!("T-UPL-INT-01: api_base={api_base}");
482
483 let agent = ureq::config::Config::builder()
484 .timeout_global(Some(Duration::from_secs(30)))
485 .build()
486 .new_agent();
487
488 let ctx = start_run(
489 &agent,
490 &api_base,
491 &token,
492 &JobMetadata {
493 job_name: Some("upload-roundtrip-test".to_string()),
494 ..Default::default()
495 },
496 None,
497 &HostInfo::default(),
498 &CloudInfo::default(),
499 )
500 .expect("start_run failed");
501 eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
502
503 let ctx_arc = Arc::new(Mutex::new(ctx));
504 let (uploader, buf) = BatchUploader::new(1, 1);
505 let flag = uploader.shutdown_flag();
506
507 buf.lock().unwrap().push(minimal_sample());
508
509 flag.store(true, Ordering::Relaxed);
511
512 let handle = uploader.spawn(
513 Arc::clone(&ctx_arc),
514 agent.clone(),
515 api_base.clone(),
516 token.clone(),
517 );
518 let uris = handle.join().expect("upload thread panicked");
519 eprintln!("T-UPL-INT-01: uris={uris:?}");
520
521 assert!(
522 !uris.is_empty(),
523 "expected at least one S3 URI; S3 upload may have failed"
524 );
525 assert!(
526 uris[0].starts_with("s3://"),
527 "URI must have s3:// scheme: {}",
528 uris[0]
529 );
530
531 let ctx_guard = ctx_arc.lock().unwrap();
533 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
534 assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
535 eprintln!("T-UPL-INT-01: close_run ok");
536 }
537
538 #[test]
545 fn test_upload_thread_refreshes_expiring_credentials() {
546 use crate::config::JobMetadata;
547 use crate::metrics::{CloudInfo, HostInfo};
548 use crate::sentinel::run::{close_run, start_run};
549 use std::sync::atomic::Ordering;
550 use std::sync::{Arc, Mutex};
551 use std::time::Duration;
552
553 let token = match std::env::var("SENTINEL_API_TOKEN") {
554 Ok(t) if !t.is_empty() => t,
555 _ => {
556 eprintln!("skip: SENTINEL_API_TOKEN not set");
557 return;
558 }
559 };
560 let api_base = std::env::var("SENTINEL_API_BASE")
561 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
562 eprintln!("T-UPL-INT-02: api_base={api_base}");
563
564 let agent = ureq::config::Config::builder()
565 .timeout_global(Some(Duration::from_secs(30)))
566 .build()
567 .new_agent();
568
569 let mut ctx = start_run(
570 &agent,
571 &api_base,
572 &token,
573 &JobMetadata {
574 job_name: Some("cred-refresh-test".to_string()),
575 ..Default::default()
576 },
577 None,
578 &HostInfo::default(),
579 &CloudInfo::default(),
580 )
581 .expect("start_run failed");
582 eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
583
584 ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
588 eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
589
590 let ctx_arc = Arc::new(Mutex::new(ctx));
591 let (uploader, buf) = BatchUploader::new(1, 1);
592 let flag = uploader.shutdown_flag();
593
594 buf.lock().unwrap().push(minimal_sample());
595
596 flag.store(true, Ordering::Relaxed);
597
598 let handle = uploader.spawn(
599 Arc::clone(&ctx_arc),
600 agent.clone(),
601 api_base.clone(),
602 token.clone(),
603 );
604 let uris = handle.join().expect("upload thread panicked");
605 eprintln!("T-UPL-INT-02: uris={uris:?}");
606
607 let ctx_guard = ctx_arc.lock().unwrap();
609 let csv = if uris.is_empty() {
610 Some(samples_to_csv(&[minimal_sample()], 1))
611 } else {
612 None
613 };
614 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
615 assert!(
616 result.is_ok(),
617 "close_run after credential refresh failed: {result:?}"
618 );
619 eprintln!("T-UPL-INT-02: close_run ok");
620 }
621
622 #[test]
627 fn test_upload_thread_handles_s3_failure_gracefully() {
628 use crate::sentinel::run::RunContext;
629 use crate::sentinel::s3::UploadCredentials;
630 use std::sync::atomic::Ordering;
631 use std::sync::{Arc, Mutex};
632 use std::time::{Duration, Instant};
633
634 let (uploader, buf) = BatchUploader::new(1, 1);
635 let flag = uploader.shutdown_flag();
636
637 {
639 let mut guard = buf.lock().unwrap();
640 guard.push(minimal_sample());
641 }
642
643 let ctx = Arc::new(Mutex::new(RunContext {
645 run_id: "r".to_string(),
646 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
647 credentials: UploadCredentials {
648 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
649 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
650 session_token: "token".to_string(),
651 expires_at: "2099-01-01T00:00:00Z".to_string(),
652 },
653 }));
654
655 let agent = ureq::config::Config::builder()
657 .timeout_global(Some(Duration::from_millis(200)))
658 .build()
659 .new_agent();
660
661 flag.store(true, Ordering::Relaxed);
664
665 let handle = uploader.spawn(
666 ctx,
667 agent,
668 "http://127.0.0.1:1".to_string(),
669 "token".to_string(),
670 );
671
672 let t0 = Instant::now();
673 handle.join().expect("upload thread panicked");
674 let elapsed = t0.elapsed();
675
676 assert!(
683 elapsed < Duration::from_secs(20),
684 "upload thread took {elapsed:?}; expected < 20 s"
685 );
686 }
687
688 #[test]
690 fn test_samples_to_csv_empty_slice() {
691 let csv = samples_to_csv(&[], 1);
692 let mut lines = csv.lines();
693 assert_eq!(
694 lines.next(),
695 Some(csv_header()),
696 "first line must be the CSV header"
697 );
698 assert_eq!(lines.next(), None, "empty slice must produce no data rows");
699 assert!(csv.ends_with('\n'), "output must end with a newline");
700 }
701
702 #[test]
706 fn test_upload_thread_skips_empty_batch_then_processes() {
707 use crate::sentinel::run::RunContext;
708 use crate::sentinel::s3::UploadCredentials;
709 use std::sync::atomic::Ordering;
710 use std::sync::{Arc, Mutex};
711 use std::time::{Duration, Instant};
712
713 let (uploader, buf) = BatchUploader::new(0, 1);
715 let flag = uploader.shutdown_flag();
716
717 let ctx = Arc::new(Mutex::new(RunContext {
719 run_id: "r".to_string(),
720 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
721 credentials: UploadCredentials {
722 access_key_id: "k".to_string(),
723 secret_access_key: "s".to_string(),
724 session_token: "t".to_string(),
725 expires_at: "2099-01-01T00:00:00Z".to_string(),
726 },
727 }));
728
729 let agent = ureq::config::Config::builder()
730 .timeout_global(Some(Duration::from_millis(100)))
731 .build()
732 .new_agent();
733
734 let handle = uploader.spawn(
735 ctx,
736 agent,
737 "http://127.0.0.1:1".to_string(),
738 "token".to_string(),
739 );
740
741 std::thread::sleep(Duration::from_millis(700));
743
744 buf.lock().unwrap().push(minimal_sample());
747 flag.store(true, Ordering::Relaxed);
748
749 let t0 = Instant::now();
750 handle.join().expect("upload thread panicked");
751 let elapsed = t0.elapsed();
752
753 assert!(
754 elapsed < Duration::from_secs(2),
755 "thread took {elapsed:?} after sample push; expected < 2 s"
756 );
757 }
758
759 #[test]
771 fn test_upload_thread_resets_consecutive_failures() {
772 use crate::sentinel::run::RunContext;
773 use crate::sentinel::s3::UploadCredentials;
774 use std::sync::atomic::Ordering;
775 use std::sync::{Arc, Mutex};
776 use std::time::{Duration, Instant};
777
778 let ctx = Arc::new(Mutex::new(RunContext {
779 run_id: "r".to_string(),
780 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
781 credentials: UploadCredentials {
782 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
783 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
784 session_token: "token".to_string(),
785 expires_at: "2099-01-01T00:00:00Z".to_string(),
786 },
787 }));
788
789 let agent = ureq::config::Config::builder()
790 .timeout_global(Some(Duration::from_millis(50)))
791 .build()
792 .new_agent();
793
794 let (uploader, buf) = BatchUploader::new(0, 1);
796 let flag = uploader.shutdown_flag();
797
798 let handle = uploader.spawn(
799 Arc::clone(&ctx),
800 agent,
801 "http://127.0.0.1:1".to_string(),
802 "token".to_string(),
803 );
804
805 let batch_wait = Duration::from_secs(10);
814 for i in 0..3u32 {
815 buf.lock().unwrap().push(minimal_sample());
816 if i < 2 {
817 std::thread::sleep(batch_wait);
818 }
819 }
820
821 flag.store(true, Ordering::Relaxed);
824
825 let t0 = Instant::now();
826 handle.join().expect("upload thread panicked");
827 let elapsed = t0.elapsed();
828
829 assert!(
831 elapsed < Duration::from_secs(15),
832 "thread did not exit after consecutive-failure reset: {elapsed:?}"
833 );
834 }
835
836 #[test]
839 fn test_batch_uploader_new_shares_buffer() {
840 use std::sync::Arc;
841 let (uploader, buf) = BatchUploader::new(60, 1);
842 assert!(
843 Arc::ptr_eq(&uploader.buffer, &buf),
844 "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
845 );
846 }
847}