1use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34 let mut out = String::with_capacity(samples.len() * 256);
35 out.push_str(csv_header());
36 out.push('\n');
37 samples.iter().for_each(|s| {
38 out.push_str(&sample_to_csv_row(s, interval_secs));
39 out.push('\n');
40 });
41 out
42}
43
44pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47 encoder
48 .write_all(data)
49 .map_err(|e| format!("gzip write failed: {e}"))?;
50 encoder
51 .finish()
52 .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55pub struct BatchUploader {
60 pub buffer: SampleBuffer,
62 shutdown: Arc<AtomicBool>,
64 upload_interval_secs: u64,
66 sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71 pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74 let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75 let uploader = Self {
76 buffer: Arc::clone(&buffer),
77 shutdown: Arc::new(AtomicBool::new(false)),
78 upload_interval_secs,
79 sample_interval_secs,
80 };
81 (uploader, buffer)
82 }
83
84 pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87 Arc::clone(&self.shutdown)
88 }
89
90 pub fn spawn(
103 self,
104 ctx: Arc<Mutex<RunContext>>,
105 agent: ureq::Agent,
106 api_base: String,
107 token: String,
108 ) -> std::thread::JoinHandle<Vec<String>> {
109 std::thread::spawn(move || {
110 let mut region_cache = RegionCache::new();
111 let mut seq: u32 = 0;
112 let mut consecutive_failures: u32 = 0;
113 let mut uploaded_uris: Vec<String> = Vec::new();
114
115 let tick = Duration::from_millis(250);
118 let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120 loop {
121 let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123 if !shutting_down {
124 (0..ticks_per_interval)
125 .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126 .for_each(|_| std::thread::sleep(tick));
127 }
128
129 let batch: Vec<Sample> = {
131 let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132 std::mem::take(&mut *guard)
133 };
134
135 if batch.is_empty() {
136 if shutting_down {
137 break;
138 }
139 continue;
140 }
141
142 let csv = samples_to_csv(&batch, self.sample_interval_secs);
144 let compressed = match gzip_compress(csv.as_bytes()) {
145 Ok(b) => b,
146 Err(e) => {
147 eprintln!("warn: upload batch {seq} gzip failed: {e}");
148 if shutting_down {
149 break;
150 }
151 continue;
152 }
153 };
154
155 {
157 let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158 if ctx_guard.creds_expiring_soon()
159 && let Err(e) =
160 refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161 {
162 eprintln!("warn: credential refresh failed: {e}");
163 }
164 }
165
166 let (bucket, prefix, creds, run_id) = {
168 let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169 let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170 crate::sentinel::s3::S3Uri {
171 bucket: String::new(),
172 key: String::new(),
173 }
174 });
175 (
176 uri.bucket,
177 uri.key,
178 ctx_guard.credentials.clone(),
179 ctx_guard.run_id.clone(),
180 )
181 };
182
183 if bucket.is_empty() {
184 eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185 if shutting_down {
186 break;
187 }
188 continue;
189 }
190
191 let region = region_cache.get_or_detect(&bucket);
192 let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194 let result: Result<String, String> = {
198 let mut last_err = String::new();
199 let mut uploaded_uri: Option<String> = None;
200 for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201 if attempt > 0 {
202 std::thread::sleep(Duration::from_secs(1u64 << attempt));
203 }
204 match s3_put(&agent, &bucket, &key, ®ion, &compressed, &creds) {
205 Ok(uri) => {
206 uploaded_uri = Some(uri);
207 break;
208 }
209 Err(e) => {
210 last_err = if last_err.is_empty() {
211 e
212 } else {
213 format!("{last_err}; retry{attempt}: {e}")
214 };
215 }
216 }
217 }
218 match uploaded_uri {
219 Some(uri) => Ok(uri),
220 None => Err(last_err),
221 }
222 };
223
224 match result {
225 Ok(uri) => {
226 uploaded_uris.push(uri);
227 seq += 1;
228 consecutive_failures = 0;
229 }
230 Err(e) => {
231 consecutive_failures += 1;
232 eprintln!(
233 "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234 );
235 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236 eprintln!(
237 "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238 );
239 consecutive_failures = 0;
240 }
241 }
242 }
243
244 if shutting_down {
245 break;
246 }
247 }
248 uploaded_uris
249 })
250 }
251}
252
253#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261 use crate::output::csv::csv_header;
262 use std::io::Read;
263
264 fn minimal_sample() -> Sample {
265 Sample {
266 timestamp_secs: 1_000_000,
267 job_name: None,
268 tracked_pid: None,
269 cpu: CpuMetrics {
270 utilization_pct: 1.0,
271 process_utime_secs: None,
272 process_stime_secs: None,
273 process_pss_mib: None,
274 process_rss_mib: None,
275 process_disk_read_bytes: None,
276 process_disk_write_bytes: None,
277 process_gpu_vram_mib: None,
278 process_gpu_utilized: None,
279 process_tree_pids: vec![],
280 ..Default::default()
281 },
282 memory: MemoryMetrics {
283 free_mib: 512,
284 used_mib: 512,
285 ..Default::default()
286 },
287 network: vec![],
288 disk: vec![],
289 gpu: vec![],
290 }
291 }
292
293 #[test]
297 fn test_upload_thread_shuts_down_promptly() {
298 use crate::sentinel::run::RunContext;
299 use crate::sentinel::s3::UploadCredentials;
300 use std::sync::{Arc, Mutex};
301 use std::time::{Duration, Instant};
302
303 let (uploader, _buf) = BatchUploader::new(60, 1);
304 let flag = uploader.shutdown_flag();
305
306 let ctx = Arc::new(Mutex::new(RunContext {
307 run_id: "r".to_string(),
308 upload_uri_prefix: "s3://b/p".to_string(),
309 credentials: UploadCredentials {
310 access_key_id: "k".to_string(),
311 secret_access_key: "s".to_string(),
312 session_token: "t".to_string(),
313 expires_at: "2099-01-01T00:00:00Z".to_string(),
314 },
315 }));
316
317 let agent = ureq::config::Config::builder()
318 .timeout_global(Some(Duration::from_secs(1)))
319 .build()
320 .new_agent();
321
322 let handle = uploader.spawn(
324 ctx,
325 agent,
326 "http://127.0.0.1:1".to_string(),
327 "token".to_string(),
328 );
329
330 let t0 = Instant::now();
332 flag.store(true, Ordering::Relaxed);
333 handle.join().expect("upload thread panicked");
334 let elapsed = t0.elapsed();
335
336 assert!(
337 elapsed < Duration::from_secs(2),
338 "upload thread took {elapsed:?} to shut down; expected < 2 s"
339 );
340 }
341
342 #[test]
348 fn test_gzip_compress_decompresses_to_valid_csv() {
349 let samples = vec![minimal_sample(), minimal_sample()];
350 let csv = samples_to_csv(&samples, 1);
351 let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
352
353 assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
355
356 let mut decoder = GzDecoder::new(&compressed[..]);
358 let mut decompressed = String::new();
359 decoder
360 .read_to_string(&mut decompressed)
361 .expect("gzip decompression failed");
362 assert_eq!(
363 decompressed, csv,
364 "decompressed content does not match original CSV"
365 );
366
367 let first_line = decompressed.lines().next().expect("empty output");
369 assert_eq!(first_line, csv_header(), "first line is not the CSV header");
370
371 let header_cols = csv_header().split(',').count();
373 decompressed
374 .lines()
375 .skip(1)
376 .enumerate()
377 .for_each(|(i, line)| {
378 assert!(!line.is_empty(), "unexpected empty data line at index {i}");
379 let cols = line.split(',').count();
380 assert_eq!(
381 cols, header_cols,
382 "data row {i} has {cols} columns, expected {header_cols}: {line}"
383 );
384 });
385 }
386
387 #[test]
389 fn test_samples_to_csv_all_lines_end_with_newline() {
390 let samples = vec![minimal_sample()];
391 let csv = samples_to_csv(&samples, 1);
392 csv.split_inclusive('\n').for_each(|chunk| {
393 assert!(
394 chunk.ends_with('\n'),
395 "line does not end with newline: {chunk:?}"
396 );
397 });
398 }
399
400 #[test]
407 fn test_upload_thread_processes_batch_with_invalid_uri() {
408 use crate::sentinel::run::RunContext;
409 use crate::sentinel::s3::UploadCredentials;
410 use std::sync::atomic::Ordering;
411 use std::sync::{Arc, Mutex};
412 use std::time::{Duration, Instant};
413
414 let (uploader, buf) = BatchUploader::new(1, 1);
415 let flag = uploader.shutdown_flag();
416
417 {
419 let mut guard = buf.lock().unwrap();
420 guard.push(minimal_sample());
421 }
422
423 let ctx = Arc::new(Mutex::new(RunContext {
425 run_id: "r".to_string(),
426 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
427 credentials: UploadCredentials {
428 access_key_id: "k".to_string(),
429 secret_access_key: "s".to_string(),
430 session_token: "t".to_string(),
431 expires_at: "2099-01-01T00:00:00Z".to_string(),
432 },
433 }));
434
435 let agent = ureq::config::Config::builder()
436 .timeout_global(Some(Duration::from_millis(100)))
437 .build()
438 .new_agent();
439
440 flag.store(true, Ordering::Relaxed);
442
443 let handle = uploader.spawn(
444 ctx,
445 agent,
446 "http://127.0.0.1:1".to_string(),
447 "token".to_string(),
448 );
449
450 let t0 = Instant::now();
451 handle.join().expect("upload thread panicked");
452 let elapsed = t0.elapsed();
453
454 assert!(
455 elapsed < Duration::from_secs(2),
456 "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
457 );
458 }
459
460 #[test]
465 fn test_upload_roundtrip_real_api() {
466 use crate::config::JobMetadata;
467 use crate::metrics::{CloudInfo, HostInfo};
468 use crate::sentinel::run::{close_run, start_run};
469 use std::sync::atomic::Ordering;
470 use std::sync::{Arc, Mutex};
471 use std::time::Duration;
472
473 let token = match std::env::var("SENTINEL_API_TOKEN") {
474 Ok(t) if !t.is_empty() => t,
475 _ => {
476 eprintln!("skip: SENTINEL_API_TOKEN not set");
477 return;
478 }
479 };
480 let api_base = std::env::var("SENTINEL_API_BASE")
481 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
482 eprintln!("T-UPL-INT-01: api_base={api_base}");
483
484 let agent = ureq::config::Config::builder()
485 .timeout_global(Some(Duration::from_secs(30)))
486 .build()
487 .new_agent();
488
489 let ctx = start_run(
490 &agent,
491 &api_base,
492 &token,
493 &JobMetadata {
494 job_name: Some("upload-roundtrip-test".to_string()),
495 ..Default::default()
496 },
497 None,
498 &HostInfo::default(),
499 &CloudInfo::default(),
500 )
501 .expect("start_run failed");
502 eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
503
504 let ctx_arc = Arc::new(Mutex::new(ctx));
505 let (uploader, buf) = BatchUploader::new(1, 1);
506 let flag = uploader.shutdown_flag();
507
508 buf.lock().unwrap().push(minimal_sample());
509
510 flag.store(true, Ordering::Relaxed);
512
513 let handle = uploader.spawn(
514 Arc::clone(&ctx_arc),
515 agent.clone(),
516 api_base.clone(),
517 token.clone(),
518 );
519 let uris = handle.join().expect("upload thread panicked");
520 eprintln!("T-UPL-INT-01: uris={uris:?}");
521
522 assert!(
523 !uris.is_empty(),
524 "expected at least one S3 URI; S3 upload may have failed"
525 );
526 assert!(
527 uris[0].starts_with("s3://"),
528 "URI must have s3:// scheme: {}",
529 uris[0]
530 );
531
532 let ctx_guard = ctx_arc.lock().unwrap();
534 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
535 assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
536 eprintln!("T-UPL-INT-01: close_run ok");
537 }
538
539 #[test]
546 fn test_upload_thread_refreshes_expiring_credentials() {
547 use crate::config::JobMetadata;
548 use crate::metrics::{CloudInfo, HostInfo};
549 use crate::sentinel::run::{close_run, start_run};
550 use std::sync::atomic::Ordering;
551 use std::sync::{Arc, Mutex};
552 use std::time::Duration;
553
554 let token = match std::env::var("SENTINEL_API_TOKEN") {
555 Ok(t) if !t.is_empty() => t,
556 _ => {
557 eprintln!("skip: SENTINEL_API_TOKEN not set");
558 return;
559 }
560 };
561 let api_base = std::env::var("SENTINEL_API_BASE")
562 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
563 eprintln!("T-UPL-INT-02: api_base={api_base}");
564
565 let agent = ureq::config::Config::builder()
566 .timeout_global(Some(Duration::from_secs(30)))
567 .build()
568 .new_agent();
569
570 let mut ctx = start_run(
571 &agent,
572 &api_base,
573 &token,
574 &JobMetadata {
575 job_name: Some("cred-refresh-test".to_string()),
576 ..Default::default()
577 },
578 None,
579 &HostInfo::default(),
580 &CloudInfo::default(),
581 )
582 .expect("start_run failed");
583 eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
584
585 ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
589 eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
590
591 let ctx_arc = Arc::new(Mutex::new(ctx));
592 let (uploader, buf) = BatchUploader::new(1, 1);
593 let flag = uploader.shutdown_flag();
594
595 buf.lock().unwrap().push(minimal_sample());
596
597 flag.store(true, Ordering::Relaxed);
598
599 let handle = uploader.spawn(
600 Arc::clone(&ctx_arc),
601 agent.clone(),
602 api_base.clone(),
603 token.clone(),
604 );
605 let uris = handle.join().expect("upload thread panicked");
606 eprintln!("T-UPL-INT-02: uris={uris:?}");
607
608 let ctx_guard = ctx_arc.lock().unwrap();
610 let csv = if uris.is_empty() {
611 Some(samples_to_csv(&[minimal_sample()], 1))
612 } else {
613 None
614 };
615 let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
616 assert!(
617 result.is_ok(),
618 "close_run after credential refresh failed: {result:?}"
619 );
620 eprintln!("T-UPL-INT-02: close_run ok");
621 }
622
623 #[test]
628 fn test_upload_thread_handles_s3_failure_gracefully() {
629 use crate::sentinel::run::RunContext;
630 use crate::sentinel::s3::UploadCredentials;
631 use std::sync::atomic::Ordering;
632 use std::sync::{Arc, Mutex};
633 use std::time::{Duration, Instant};
634
635 let (uploader, buf) = BatchUploader::new(1, 1);
636 let flag = uploader.shutdown_flag();
637
638 {
640 let mut guard = buf.lock().unwrap();
641 guard.push(minimal_sample());
642 }
643
644 let ctx = Arc::new(Mutex::new(RunContext {
646 run_id: "r".to_string(),
647 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
648 credentials: UploadCredentials {
649 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
650 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
651 session_token: "token".to_string(),
652 expires_at: "2099-01-01T00:00:00Z".to_string(),
653 },
654 }));
655
656 let agent = ureq::config::Config::builder()
658 .timeout_global(Some(Duration::from_millis(200)))
659 .build()
660 .new_agent();
661
662 flag.store(true, Ordering::Relaxed);
665
666 let handle = uploader.spawn(
667 ctx,
668 agent,
669 "http://127.0.0.1:1".to_string(),
670 "token".to_string(),
671 );
672
673 let t0 = Instant::now();
674 handle.join().expect("upload thread panicked");
675 let elapsed = t0.elapsed();
676
677 assert!(
684 elapsed < Duration::from_secs(20),
685 "upload thread took {elapsed:?}; expected < 20 s"
686 );
687 }
688
689 #[test]
691 fn test_samples_to_csv_empty_slice() {
692 let csv = samples_to_csv(&[], 1);
693 let mut lines = csv.lines();
694 assert_eq!(
695 lines.next(),
696 Some(csv_header()),
697 "first line must be the CSV header"
698 );
699 assert_eq!(lines.next(), None, "empty slice must produce no data rows");
700 assert!(csv.ends_with('\n'), "output must end with a newline");
701 }
702
703 #[test]
707 fn test_upload_thread_skips_empty_batch_then_processes() {
708 use crate::sentinel::run::RunContext;
709 use crate::sentinel::s3::UploadCredentials;
710 use std::sync::atomic::Ordering;
711 use std::sync::{Arc, Mutex};
712 use std::time::{Duration, Instant};
713
714 let (uploader, buf) = BatchUploader::new(0, 1);
716 let flag = uploader.shutdown_flag();
717
718 let ctx = Arc::new(Mutex::new(RunContext {
720 run_id: "r".to_string(),
721 upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
722 credentials: UploadCredentials {
723 access_key_id: "k".to_string(),
724 secret_access_key: "s".to_string(),
725 session_token: "t".to_string(),
726 expires_at: "2099-01-01T00:00:00Z".to_string(),
727 },
728 }));
729
730 let agent = ureq::config::Config::builder()
731 .timeout_global(Some(Duration::from_millis(100)))
732 .build()
733 .new_agent();
734
735 let handle = uploader.spawn(
736 ctx,
737 agent,
738 "http://127.0.0.1:1".to_string(),
739 "token".to_string(),
740 );
741
742 std::thread::sleep(Duration::from_millis(700));
744
745 buf.lock().unwrap().push(minimal_sample());
748 flag.store(true, Ordering::Relaxed);
749
750 let t0 = Instant::now();
751 handle.join().expect("upload thread panicked");
752 let elapsed = t0.elapsed();
753
754 assert!(
755 elapsed < Duration::from_secs(2),
756 "thread took {elapsed:?} after sample push; expected < 2 s"
757 );
758 }
759
760 #[test]
772 fn test_upload_thread_resets_consecutive_failures() {
773 use crate::sentinel::run::RunContext;
774 use crate::sentinel::s3::UploadCredentials;
775 use std::sync::atomic::Ordering;
776 use std::sync::{Arc, Mutex};
777 use std::time::{Duration, Instant};
778
779 let ctx = Arc::new(Mutex::new(RunContext {
780 run_id: "r".to_string(),
781 upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
782 credentials: UploadCredentials {
783 access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
784 secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
785 session_token: "token".to_string(),
786 expires_at: "2099-01-01T00:00:00Z".to_string(),
787 },
788 }));
789
790 let agent = ureq::config::Config::builder()
791 .timeout_global(Some(Duration::from_millis(50)))
792 .build()
793 .new_agent();
794
795 let (uploader, buf) = BatchUploader::new(0, 1);
797 let flag = uploader.shutdown_flag();
798
799 let handle = uploader.spawn(
800 Arc::clone(&ctx),
801 agent,
802 "http://127.0.0.1:1".to_string(),
803 "token".to_string(),
804 );
805
806 let batch_wait = Duration::from_secs(10);
815 for i in 0..3u32 {
816 buf.lock().unwrap().push(minimal_sample());
817 if i < 2 {
818 std::thread::sleep(batch_wait);
819 }
820 }
821
822 flag.store(true, Ordering::Relaxed);
825
826 let t0 = Instant::now();
827 handle.join().expect("upload thread panicked");
828 let elapsed = t0.elapsed();
829
830 assert!(
832 elapsed < Duration::from_secs(15),
833 "thread did not exit after consecutive-failure reset: {elapsed:?}"
834 );
835 }
836
837 #[test]
840 fn test_batch_uploader_new_shares_buffer() {
841 use std::sync::Arc;
842 let (uploader, buf) = BatchUploader::new(60, 1);
843 assert!(
844 Arc::ptr_eq(&uploader.buffer, &buf),
845 "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
846 );
847 }
848}