1use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34 let mut out = String::with_capacity(samples.len() * 256);
35 out.push_str(csv_header());
36 out.push('\n');
37 samples.iter().for_each(|s| {
38 out.push_str(&sample_to_csv_row(s, interval_secs));
39 out.push('\n');
40 });
41 out
42}
43
44pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47 encoder
48 .write_all(data)
49 .map_err(|e| format!("gzip write failed: {e}"))?;
50 encoder
51 .finish()
52 .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55pub struct BatchUploader {
60 pub buffer: SampleBuffer,
62 shutdown: Arc<AtomicBool>,
64 upload_interval_secs: u64,
66 sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71 pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74 let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75 let uploader = Self {
76 buffer: Arc::clone(&buffer),
77 shutdown: Arc::new(AtomicBool::new(false)),
78 upload_interval_secs,
79 sample_interval_secs,
80 };
81 (uploader, buffer)
82 }
83
84 pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87 Arc::clone(&self.shutdown)
88 }
89
90 pub fn spawn(
103 self,
104 ctx: Arc<Mutex<RunContext>>,
105 agent: ureq::Agent,
106 api_base: String,
107 token: String,
108 ) -> Option<std::thread::JoinHandle<Vec<String>>> {
109 crate::thread_util::spawn_named("sentinel-upload", move || {
110 let mut region_cache = RegionCache::new();
111 let mut seq: u32 = 0;
112 let mut consecutive_failures: u32 = 0;
113 let mut uploaded_uris: Vec<String> = Vec::new();
114
115 let tick = Duration::from_millis(250);
118 let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120 loop {
121 let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123 if !shutting_down {
124 (0..ticks_per_interval)
125 .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126 .for_each(|_| std::thread::sleep(tick));
127 }
128
129 let batch: Vec<Sample> = {
131 let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132 std::mem::take(&mut *guard)
133 };
134
135 if batch.is_empty() {
136 if shutting_down {
137 break;
138 }
139 continue;
140 }
141
142 let csv = samples_to_csv(&batch, self.sample_interval_secs);
144 let compressed = match gzip_compress(csv.as_bytes()) {
145 Ok(b) => b,
146 Err(e) => {
147 eprintln!("warn: upload batch {seq} gzip failed: {e}");
148 if shutting_down {
149 break;
150 }
151 continue;
152 }
153 };
154
155 {
157 let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158 if ctx_guard.creds_expiring_soon()
159 && let Err(e) =
160 refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161 {
162 eprintln!("warn: credential refresh failed: {e}");
163 }
164 }
165
166 let (bucket, prefix, creds, run_id) = {
168 let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169 let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170 crate::sentinel::s3::S3Uri {
171 bucket: String::new(),
172 key: String::new(),
173 }
174 });
175 (
176 uri.bucket,
177 uri.key,
178 ctx_guard.credentials.clone(),
179 ctx_guard.run_id.clone(),
180 )
181 };
182
183 if bucket.is_empty() {
184 eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185 if shutting_down {
186 break;
187 }
188 continue;
189 }
190
191 let region = region_cache.get_or_detect(&bucket);
192 let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194 let result: Result<String, String> = {
198 let mut last_err = String::new();
199 let mut uploaded_uri: Option<String> = None;
200 for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201 if attempt > 0 {
202 std::thread::sleep(Duration::from_secs(1u64 << attempt));
203 }
204 match s3_put(&agent, &bucket, &key, ®ion, &compressed, &creds) {
205 Ok(uri) => {
206 uploaded_uri = Some(uri);
207 break;
208 }
209 Err(e) => {
210 last_err = if last_err.is_empty() {
211 e
212 } else {
213 format!("{last_err}; retry{attempt}: {e}")
214 };
215 }
216 }
217 }
218 match uploaded_uri {
219 Some(uri) => Ok(uri),
220 None => Err(last_err),
221 }
222 };
223
224 match result {
225 Ok(uri) => {
226 uploaded_uris.push(uri);
227 seq += 1;
228 consecutive_failures = 0;
229 }
230 Err(e) => {
231 consecutive_failures += 1;
232 eprintln!(
233 "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234 );
235 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236 eprintln!(
237 "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238 );
239 consecutive_failures = 0;
240 }
241 }
242 }
243
244 if shutting_down {
245 break;
246 }
247 }
248 uploaded_uris
249 })
250 }
251}
252
253#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261 use crate::output::csv::csv_header;
262 use std::io::Read;
263
264 fn minimal_sample() -> Sample {
265 Sample {
266 timestamp_secs: 1_000_000,
267 actual_interval_ms: None,
268 job_name: None,
269 tracked_pid: None,
270 cpu: CpuMetrics {
271 utilization_pct: 1.0,
272 process_utime_secs: None,
273 process_stime_secs: None,
274 process_pss_mib: None,
275 process_rss_mib: None,
276 process_disk_read_bytes: None,
277 process_disk_write_bytes: None,
278 process_gpu_vram_mib: None,
279 process_gpu_utilized: None,
280 process_tree_pids: vec![],
281 ..Default::default()
282 },
283 memory: MemoryMetrics {
284 free_mib: 512,
285 used_mib: 512,
286 ..Default::default()
287 },
288 network: vec![],
289 disk: vec![],
290 gpu: vec![],
291 }
292 }
293
294 #[test]
298 fn test_upload_thread_shuts_down_promptly() {
299 use crate::sentinel::run::RunContext;
300 use crate::sentinel::s3::UploadCredentials;
301 use std::sync::{Arc, Mutex};
302 use std::time::{Duration, Instant};
303
304 let (uploader, _buf) = BatchUploader::new(60, 1);
305 let flag = uploader.shutdown_flag();
306
307 let ctx = Arc::new(Mutex::new(RunContext {
308 run_id: "r".to_string(),
309 upload_uri_prefix: "s3://b/p".to_string(),
310 credentials: UploadCredentials {
311 access_key_id: "k".to_string(),
312 secret_access_key: "s".to_string(),
313 session_token: "t".to_string(),
314 expires_at: "2099-01-01T00:00:00Z".to_string(),
315 },
316 }));
317
318 let agent = ureq::config::Config::builder()
319 .timeout_global(Some(Duration::from_secs(1)))
320 .build()
321 .new_agent();
322
323 let handle = uploader
325 .spawn(
326 ctx,
327 agent,
328 "http://127.0.0.1:1".to_string(),
329 "token".to_string(),
330 )
331 .expect("upload thread must spawn in test");
332
333 let t0 = Instant::now();
335 flag.store(true, Ordering::Relaxed);
336 handle.join().expect("upload thread panicked");
337 let elapsed = t0.elapsed();
338
339 assert!(
340 elapsed < Duration::from_secs(2),
341 "upload thread took {elapsed:?} to shut down; expected < 2 s"
342 );
343 }
344
345 #[test]
351 fn test_gzip_compress_decompresses_to_valid_csv() {
352 let samples = vec![minimal_sample(), minimal_sample()];
353 let csv = samples_to_csv(&samples, 1);
354 let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
355
356 assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
358
359 let mut decoder = GzDecoder::new(&compressed[..]);
361 let mut decompressed = String::new();
362 decoder
363 .read_to_string(&mut decompressed)
364 .expect("gzip decompression failed");
365 assert_eq!(
366 decompressed, csv,
367 "decompressed content does not match original CSV"
368 );
369
370 let first_line = decompressed.lines().next().expect("empty output");
372 assert_eq!(first_line, csv_header(), "first line is not the CSV header");
373
374 let header_cols = csv_header().split(',').count();
376 decompressed
377 .lines()
378 .skip(1)
379 .enumerate()
380 .for_each(|(i, line)| {
381 assert!(!line.is_empty(), "unexpected empty data line at index {i}");
382 let cols = line.split(',').count();
383 assert_eq!(
384 cols, header_cols,
385 "data row {i} has {cols} columns, expected {header_cols}: {line}"
386 );
387 });
388 }
389
390 #[test]
392 fn test_samples_to_csv_all_lines_end_with_newline() {
393 let samples = vec![minimal_sample()];
394 let csv = samples_to_csv(&samples, 1);
395 csv.split_inclusive('\n').for_each(|chunk| {
396 assert!(
397 chunk.ends_with('\n'),
398 "line does not end with newline: {chunk:?}"
399 );
400 });
401 }
402
403 #[test]
410 fn test_upload_thread_processes_batch_with_invalid_uri() {
411 use crate::sentinel::run::RunContext;
412 use crate::sentinel::s3::UploadCredentials;
413 use std::sync::atomic::Ordering;
414 use std::sync::{Arc, Mutex};
415 use std::time::{Duration, Instant};
416
417 let (uploader, buf) = BatchUploader::new(1, 1);
418 let flag = uploader.shutdown_flag();
419
420 {
422 let mut guard = buf.lock().unwrap();
423 guard.push(minimal_sample());
424 }
425
426 let ctx = Arc::new(Mutex::new(RunContext {
428 run_id: "r".to_string(),
429 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
430 credentials: UploadCredentials {
431 access_key_id: "k".to_string(),
432 secret_access_key: "s".to_string(),
433 session_token: "t".to_string(),
434 expires_at: "2099-01-01T00:00:00Z".to_string(),
435 },
436 }));
437
438 let agent = ureq::config::Config::builder()
439 .timeout_global(Some(Duration::from_millis(100)))
440 .build()
441 .new_agent();
442
443 flag.store(true, Ordering::Relaxed);
445
446 let handle = uploader
447 .spawn(
448 ctx,
449 agent,
450 "http://127.0.0.1:1".to_string(),
451 "token".to_string(),
452 )
453 .expect("upload thread must spawn in test");
454
455 let t0 = Instant::now();
456 handle.join().expect("upload thread panicked");
457 let elapsed = t0.elapsed();
458
459 assert!(
460 elapsed < Duration::from_secs(2),
461 "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
462 );
463 }
464
465 #[test]
470 fn test_upload_roundtrip_real_api() {
471 use crate::config::JobMetadata;
472 use crate::metrics::{CloudInfo, HostInfo};
473 use crate::sentinel::run::{close_run, start_run};
474 use std::sync::atomic::Ordering;
475 use std::sync::{Arc, Mutex};
476 use std::time::Duration;
477
478 let token = match std::env::var("SENTINEL_API_TOKEN") {
479 Ok(t) if !t.is_empty() => t,
480 _ => {
481 eprintln!("skip: SENTINEL_API_TOKEN not set");
482 return;
483 }
484 };
485 let api_base = std::env::var("SENTINEL_API_BASE")
486 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
487 eprintln!("T-UPL-INT-01: api_base={api_base}");
488
489 let agent = ureq::config::Config::builder()
490 .timeout_global(Some(Duration::from_secs(30)))
491 .build()
492 .new_agent();
493
494 let ctx = start_run(
495 &agent,
496 &api_base,
497 &token,
498 &JobMetadata {
499 job_name: Some("upload-roundtrip-test".to_string()),
500 ..Default::default()
501 },
502 None,
503 &HostInfo::default(),
504 &CloudInfo::default(),
505 )
506 .expect("start_run failed");
507 eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
508
509 let ctx_arc = Arc::new(Mutex::new(ctx));
510 let (uploader, buf) = BatchUploader::new(1, 1);
511 let flag = uploader.shutdown_flag();
512
513 buf.lock().unwrap().push(minimal_sample());
514
515 flag.store(true, Ordering::Relaxed);
517
518 let handle = uploader
519 .spawn(
520 Arc::clone(&ctx_arc),
521 agent.clone(),
522 api_base.clone(),
523 token.clone(),
524 )
525 .expect("upload thread must spawn in test");
526 let uris = handle.join().expect("upload thread panicked");
527 eprintln!("T-UPL-INT-01: uris={uris:?}");
528
529 assert!(
530 !uris.is_empty(),
531 "expected at least one S3 URI; S3 upload may have failed"
532 );
533 assert!(
534 uris[0].starts_with("s3://"),
535 "URI must have s3:// scheme: {}",
536 uris[0]
537 );
538
539 let ctx_guard = ctx_arc.lock().unwrap();
541 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
542 assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
543 eprintln!("T-UPL-INT-01: close_run ok");
544 }
545
546 #[test]
553 fn test_upload_thread_refreshes_expiring_credentials() {
554 use crate::config::JobMetadata;
555 use crate::metrics::{CloudInfo, HostInfo};
556 use crate::sentinel::run::{close_run, start_run};
557 use std::sync::atomic::Ordering;
558 use std::sync::{Arc, Mutex};
559 use std::time::Duration;
560
561 let token = match std::env::var("SENTINEL_API_TOKEN") {
562 Ok(t) if !t.is_empty() => t,
563 _ => {
564 eprintln!("skip: SENTINEL_API_TOKEN not set");
565 return;
566 }
567 };
568 let api_base = std::env::var("SENTINEL_API_BASE")
569 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
570 eprintln!("T-UPL-INT-02: api_base={api_base}");
571
572 let agent = ureq::config::Config::builder()
573 .timeout_global(Some(Duration::from_secs(30)))
574 .build()
575 .new_agent();
576
577 let mut ctx = start_run(
578 &agent,
579 &api_base,
580 &token,
581 &JobMetadata {
582 job_name: Some("cred-refresh-test".to_string()),
583 ..Default::default()
584 },
585 None,
586 &HostInfo::default(),
587 &CloudInfo::default(),
588 )
589 .expect("start_run failed");
590 eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
591
592 ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
596 eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
597
598 let ctx_arc = Arc::new(Mutex::new(ctx));
599 let (uploader, buf) = BatchUploader::new(1, 1);
600 let flag = uploader.shutdown_flag();
601
602 buf.lock().unwrap().push(minimal_sample());
603
604 flag.store(true, Ordering::Relaxed);
605
606 let handle = uploader
607 .spawn(
608 Arc::clone(&ctx_arc),
609 agent.clone(),
610 api_base.clone(),
611 token.clone(),
612 )
613 .expect("upload thread must spawn in test");
614 let uris = handle.join().expect("upload thread panicked");
615 eprintln!("T-UPL-INT-02: uris={uris:?}");
616
617 let ctx_guard = ctx_arc.lock().unwrap();
619 let csv = if uris.is_empty() {
620 Some(samples_to_csv(&[minimal_sample()], 1))
621 } else {
622 None
623 };
624 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
625 assert!(
626 result.is_ok(),
627 "close_run after credential refresh failed: {result:?}"
628 );
629 eprintln!("T-UPL-INT-02: close_run ok");
630 }
631
632 #[test]
637 fn test_upload_thread_handles_s3_failure_gracefully() {
638 use crate::sentinel::run::RunContext;
639 use crate::sentinel::s3::UploadCredentials;
640 use std::sync::atomic::Ordering;
641 use std::sync::{Arc, Mutex};
642 use std::time::{Duration, Instant};
643
644 let (uploader, buf) = BatchUploader::new(1, 1);
645 let flag = uploader.shutdown_flag();
646
647 {
649 let mut guard = buf.lock().unwrap();
650 guard.push(minimal_sample());
651 }
652
653 let ctx = Arc::new(Mutex::new(RunContext {
655 run_id: "r".to_string(),
656 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
657 credentials: UploadCredentials {
658 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
659 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
660 session_token: "token".to_string(),
661 expires_at: "2099-01-01T00:00:00Z".to_string(),
662 },
663 }));
664
665 let agent = ureq::config::Config::builder()
667 .timeout_global(Some(Duration::from_millis(200)))
668 .build()
669 .new_agent();
670
671 flag.store(true, Ordering::Relaxed);
674
675 let handle = uploader
676 .spawn(
677 ctx,
678 agent,
679 "http://127.0.0.1:1".to_string(),
680 "token".to_string(),
681 )
682 .expect("upload thread must spawn in test");
683
684 let t0 = Instant::now();
685 handle.join().expect("upload thread panicked");
686 let elapsed = t0.elapsed();
687
688 assert!(
695 elapsed < Duration::from_secs(20),
696 "upload thread took {elapsed:?}; expected < 20 s"
697 );
698 }
699
700 #[test]
702 fn test_samples_to_csv_empty_slice() {
703 let csv = samples_to_csv(&[], 1);
704 let mut lines = csv.lines();
705 assert_eq!(
706 lines.next(),
707 Some(csv_header()),
708 "first line must be the CSV header"
709 );
710 assert_eq!(lines.next(), None, "empty slice must produce no data rows");
711 assert!(csv.ends_with('\n'), "output must end with a newline");
712 }
713
714 #[test]
718 fn test_upload_thread_skips_empty_batch_then_processes() {
719 use crate::sentinel::run::RunContext;
720 use crate::sentinel::s3::UploadCredentials;
721 use std::sync::atomic::Ordering;
722 use std::sync::{Arc, Mutex};
723 use std::time::{Duration, Instant};
724
725 let (uploader, buf) = BatchUploader::new(0, 1);
727 let flag = uploader.shutdown_flag();
728
729 let ctx = Arc::new(Mutex::new(RunContext {
731 run_id: "r".to_string(),
732 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
733 credentials: UploadCredentials {
734 access_key_id: "k".to_string(),
735 secret_access_key: "s".to_string(),
736 session_token: "t".to_string(),
737 expires_at: "2099-01-01T00:00:00Z".to_string(),
738 },
739 }));
740
741 let agent = ureq::config::Config::builder()
742 .timeout_global(Some(Duration::from_millis(100)))
743 .build()
744 .new_agent();
745
746 let handle = uploader
747 .spawn(
748 ctx,
749 agent,
750 "http://127.0.0.1:1".to_string(),
751 "token".to_string(),
752 )
753 .expect("upload thread must spawn in test");
754
755 std::thread::sleep(Duration::from_millis(700));
757
758 buf.lock().unwrap().push(minimal_sample());
761 flag.store(true, Ordering::Relaxed);
762
763 let t0 = Instant::now();
764 handle.join().expect("upload thread panicked");
765 let elapsed = t0.elapsed();
766
767 assert!(
768 elapsed < Duration::from_secs(2),
769 "thread took {elapsed:?} after sample push; expected < 2 s"
770 );
771 }
772
773 #[test]
785 fn test_upload_thread_resets_consecutive_failures() {
786 use crate::sentinel::run::RunContext;
787 use crate::sentinel::s3::UploadCredentials;
788 use std::sync::atomic::Ordering;
789 use std::sync::{Arc, Mutex};
790 use std::time::{Duration, Instant};
791
792 let ctx = Arc::new(Mutex::new(RunContext {
793 run_id: "r".to_string(),
794 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
795 credentials: UploadCredentials {
796 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
797 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
798 session_token: "token".to_string(),
799 expires_at: "2099-01-01T00:00:00Z".to_string(),
800 },
801 }));
802
803 let agent = ureq::config::Config::builder()
804 .timeout_global(Some(Duration::from_millis(50)))
805 .build()
806 .new_agent();
807
808 let (uploader, buf) = BatchUploader::new(0, 1);
810 let flag = uploader.shutdown_flag();
811
812 let handle = uploader
813 .spawn(
814 Arc::clone(&ctx),
815 agent,
816 "http://127.0.0.1:1".to_string(),
817 "token".to_string(),
818 )
819 .expect("upload thread must spawn in test");
820
821 let batch_wait = Duration::from_secs(10);
830 for i in 0..3u32 {
831 buf.lock().unwrap().push(minimal_sample());
832 if i < 2 {
833 std::thread::sleep(batch_wait);
834 }
835 }
836
837 flag.store(true, Ordering::Relaxed);
840
841 let t0 = Instant::now();
842 handle.join().expect("upload thread panicked");
843 let elapsed = t0.elapsed();
844
845 assert!(
847 elapsed < Duration::from_secs(15),
848 "thread did not exit after consecutive-failure reset: {elapsed:?}"
849 );
850 }
851
852 #[test]
855 fn test_batch_uploader_new_shares_buffer() {
856 use std::sync::Arc;
857 let (uploader, buf) = BatchUploader::new(60, 1);
858 assert!(
859 Arc::ptr_eq(&uploader.buffer, &buf),
860 "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
861 );
862 }
863}