Skip to main content

resource_tracker/sentinel/
upload.rs

1//! Background batch upload thread: buffers samples, serializes as CSV,
2//! gzip-compresses, and uploads to S3 every 60 seconds (configurable).
3
4use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17/// Shared sample buffer: main thread pushes, upload thread drains.
18pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20// Maximum consecutive upload failures before the thread stops retrying
21// for the current batch and logs a warning.
22const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24// Total upload attempts per batch (1 initial + N-1 retries).
25// Delay before attempt i (i > 0) is 2^i seconds: 2 s, 4 s, 8 s, …
26const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28// ---------------------------------------------------------------------------
29// CSV serialization helper
30// ---------------------------------------------------------------------------
31
32/// Serialize a slice of samples as a complete CSV string (header + rows).
33pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34    let mut out = String::with_capacity(samples.len() * 256);
35    out.push_str(csv_header());
36    out.push('\n');
37    samples.iter().for_each(|s| {
38        out.push_str(&sample_to_csv_row(s, interval_secs));
39        out.push('\n');
40    });
41    out
42}
43
44/// Gzip-compress `data` using the default compression level.
45pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47    encoder
48        .write_all(data)
49        .map_err(|e| format!("gzip write failed: {e}"))?;
50    encoder
51        .finish()
52        .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55// ---------------------------------------------------------------------------
56// BatchUploader
57// ---------------------------------------------------------------------------
58
59pub struct BatchUploader {
60    /// Buffer shared with the main thread.
61    pub buffer: SampleBuffer,
62    /// Set to true by `request_shutdown()` to trigger a final flush.
63    shutdown: Arc<AtomicBool>,
64    /// Polling interval for the upload thread (seconds, default 60).
65    upload_interval_secs: u64,
66    /// Sampling interval (seconds) -- needed to compute per-interval byte counts in CSV.
67    sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71    /// Create a new `BatchUploader` and return the shared `SampleBuffer`
72    /// so the main thread can push samples into it.
73    pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74        let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75        let uploader = Self {
76            buffer: Arc::clone(&buffer),
77            shutdown: Arc::new(AtomicBool::new(false)),
78            upload_interval_secs,
79            sample_interval_secs,
80        };
81        (uploader, buffer)
82    }
83
84    /// Clone the shutdown flag so `main.rs` can signal the upload thread to
85    /// flush and exit after moving `self` into the spawned thread.
86    pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87        Arc::clone(&self.shutdown)
88    }
89
90    /// Spawn the background upload thread.
91    ///
92    /// The thread wakes every `upload_interval_secs`, drains the buffer, builds
93    /// a `.csv.gz` batch (gzip-compressed CSV, `Content-Type: application/gzip`),
94    /// and uploads it to S3.  On shutdown signal it performs one final flush
95    /// before exiting.
96    ///
97    /// Returns a `JoinHandle<Vec<String>>` whose value is the list of all
98    /// successfully uploaded S3 URIs (e.g. `"s3://bucket/prefix/run-id/000000.csv.gz"`).
99    /// The caller uses this list to decide the `/finish` route:
100    /// - non-empty → `data_source: "s3"` with `data_uris`
101    /// - empty     → `data_source: "inline"` with `data_csv`
102    pub fn spawn(
103        self,
104        ctx: Arc<Mutex<RunContext>>,
105        agent: ureq::Agent,
106        api_base: String,
107        token: String,
108    ) -> Option<std::thread::JoinHandle<Vec<String>>> {
109        crate::thread_util::spawn_named("sentinel-upload", move || {
110            let mut region_cache = RegionCache::new();
111            let mut seq: u32 = 0;
112            let mut consecutive_failures: u32 = 0;
113            let mut uploaded_uris: Vec<String> = Vec::new();
114
115            // Break the upload interval into 250 ms ticks so a shutdown signal
116            // is noticed within 250 ms rather than waiting a full 60 seconds.
117            let tick = Duration::from_millis(250);
118            let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120            loop {
121                let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123                if !shutting_down {
124                    (0..ticks_per_interval)
125                        .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126                        .for_each(|_| std::thread::sleep(tick));
127                }
128
129                // Drain buffer under a minimal lock window.
130                let batch: Vec<Sample> = {
131                    let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132                    std::mem::take(&mut *guard)
133                };
134
135                if batch.is_empty() {
136                    if shutting_down {
137                        break;
138                    }
139                    continue;
140                }
141
142                // Serialize and compress outside the lock.
143                let csv = samples_to_csv(&batch, self.sample_interval_secs);
144                let compressed = match gzip_compress(csv.as_bytes()) {
145                    Ok(b) => b,
146                    Err(e) => {
147                        eprintln!("warn: upload batch {seq} gzip failed: {e}");
148                        if shutting_down {
149                            break;
150                        }
151                        continue;
152                    }
153                };
154
155                // Check credentials near expires_at; refresh if needed.
156                {
157                    let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158                    if ctx_guard.creds_expiring_soon()
159                        && let Err(e) =
160                            refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161                    {
162                        eprintln!("warn: credential refresh failed: {e}");
163                    }
164                }
165
166                // Build the S3 key.
167                let (bucket, prefix, creds, run_id) = {
168                    let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169                    let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170                        crate::sentinel::s3::S3Uri {
171                            bucket: String::new(),
172                            key: String::new(),
173                        }
174                    });
175                    (
176                        uri.bucket,
177                        uri.key,
178                        ctx_guard.credentials.clone(),
179                        ctx_guard.run_id.clone(),
180                    )
181                };
182
183                if bucket.is_empty() {
184                    eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185                    if shutting_down {
186                        break;
187                    }
188                    continue;
189                }
190
191                let region = region_cache.get_or_detect(&bucket);
192                let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194                // Upload with exponential backoff.
195                // Attempt 0 is immediate; attempt i (i > 0) sleeps 2^i seconds first.
196                // With MAX_UPLOAD_ATTEMPTS=3: delays are 2 s and 4 s before retries.
197                let result: Result<String, String> = {
198                    let mut last_err = String::new();
199                    let mut uploaded_uri: Option<String> = None;
200                    for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201                        if attempt > 0 {
202                            std::thread::sleep(Duration::from_secs(1u64 << attempt));
203                        }
204                        match s3_put(&agent, &bucket, &key, &region, &compressed, &creds) {
205                            Ok(uri) => {
206                                uploaded_uri = Some(uri);
207                                break;
208                            }
209                            Err(e) => {
210                                last_err = if last_err.is_empty() {
211                                    e
212                                } else {
213                                    format!("{last_err}; retry{attempt}: {e}")
214                                };
215                            }
216                        }
217                    }
218                    match uploaded_uri {
219                        Some(uri) => Ok(uri),
220                        None => Err(last_err),
221                    }
222                };
223
224                match result {
225                    Ok(uri) => {
226                        uploaded_uris.push(uri);
227                        seq += 1;
228                        consecutive_failures = 0;
229                    }
230                    Err(e) => {
231                        consecutive_failures += 1;
232                        eprintln!(
233                            "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234                        );
235                        if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236                            eprintln!(
237                                "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238                            );
239                            consecutive_failures = 0;
240                        }
241                    }
242                }
243
244                if shutting_down {
245                    break;
246                }
247            }
248            uploaded_uris
249        })
250    }
251}
252
253// ---------------------------------------------------------------------------
254// Unit tests
255// ---------------------------------------------------------------------------
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261    use crate::output::csv::csv_header;
262    use std::io::Read;
263
264    fn minimal_sample() -> Sample {
265        Sample {
266            timestamp_secs: 1_000_000,
267            actual_interval_ms: None,
268            job_name: None,
269            tracked_pid: None,
270            cpu: CpuMetrics {
271                utilization_pct: 1.0,
272                process_utime_secs: None,
273                process_stime_secs: None,
274                process_pss_mib: None,
275                process_rss_mib: None,
276                process_disk_read_bytes: None,
277                process_disk_write_bytes: None,
278                process_gpu_vram_mib: None,
279                process_gpu_utilized: None,
280                process_tree_pids: vec![],
281                ..Default::default()
282            },
283            memory: MemoryMetrics {
284                free_mib: 512,
285                used_mib: 512,
286                ..Default::default()
287            },
288            network: vec![],
289            disk: vec![],
290            gpu: vec![],
291        }
292    }
293
294    // T-STR-01: upload thread exits within 2 s of the shutdown flag being set,
295    // even when the upload interval is 60 s.
296    // Without the tick-based sleep the thread would block for the full interval.
297    #[test]
298    fn test_upload_thread_shuts_down_promptly() {
299        use crate::sentinel::run::RunContext;
300        use crate::sentinel::s3::UploadCredentials;
301        use std::sync::{Arc, Mutex};
302        use std::time::{Duration, Instant};
303
304        let (uploader, _buf) = BatchUploader::new(60, 1);
305        let flag = uploader.shutdown_flag();
306
307        let ctx = Arc::new(Mutex::new(RunContext {
308            run_id: "r".to_string(),
309            upload_uri_prefix: "s3://b/p".to_string(),
310            credentials: UploadCredentials {
311                access_key_id: "k".to_string(),
312                secret_access_key: "s".to_string(),
313                session_token: "t".to_string(),
314                expires_at: "2099-01-01T00:00:00Z".to_string(),
315            },
316        }));
317
318        let agent = ureq::config::Config::builder()
319            .timeout_global(Some(Duration::from_secs(1)))
320            .build()
321            .new_agent();
322
323        // Buffer is empty so the thread will never attempt an upload.
324        let handle = uploader
325            .spawn(
326                ctx,
327                agent,
328                "http://127.0.0.1:1".to_string(),
329                "token".to_string(),
330            )
331            .expect("upload thread must spawn in test");
332
333        // Signal shutdown immediately and measure how long join takes.
334        let t0 = Instant::now();
335        flag.store(true, Ordering::Relaxed);
336        handle.join().expect("upload thread panicked");
337        let elapsed = t0.elapsed();
338
339        assert!(
340            elapsed < Duration::from_secs(2),
341            "upload thread took {elapsed:?} to shut down; expected < 2 s"
342        );
343    }
344
345    // T-STR-02: batch body decompresses to valid CSV (header + data rows).
346    //
347    // Spec Section 9.2.2: "A batch upload request contains Content-Encoding: gzip
348    // and the body decompresses to valid CSV or JSONL."
349    // This test verifies the compress/decompress round-trip and CSV structure.
350    #[test]
351    fn test_gzip_compress_decompresses_to_valid_csv() {
352        let samples = vec![minimal_sample(), minimal_sample()];
353        let csv = samples_to_csv(&samples, 1);
354        let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
355
356        // Gzip magic bytes (RFC 1952 Section 2.3.1).
357        assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
358
359        // Decompress must round-trip to identical bytes.
360        let mut decoder = GzDecoder::new(&compressed[..]);
361        let mut decompressed = String::new();
362        decoder
363            .read_to_string(&mut decompressed)
364            .expect("gzip decompression failed");
365        assert_eq!(
366            decompressed, csv,
367            "decompressed content does not match original CSV"
368        );
369
370        // First line must be the CSV header.
371        let first_line = decompressed.lines().next().expect("empty output");
372        assert_eq!(first_line, csv_header(), "first line is not the CSV header");
373
374        // Every data row must have the same column count as the header.
375        let header_cols = csv_header().split(',').count();
376        decompressed
377            .lines()
378            .skip(1)
379            .enumerate()
380            .for_each(|(i, line)| {
381                assert!(!line.is_empty(), "unexpected empty data line at index {i}");
382                let cols = line.split(',').count();
383                assert_eq!(
384                    cols, header_cols,
385                    "data row {i} has {cols} columns, expected {header_cols}: {line}"
386                );
387            });
388    }
389
390    // Every line produced by samples_to_csv (header and data rows) ends with '\n'.
391    #[test]
392    fn test_samples_to_csv_all_lines_end_with_newline() {
393        let samples = vec![minimal_sample()];
394        let csv = samples_to_csv(&samples, 1);
395        csv.split_inclusive('\n').for_each(|chunk| {
396            assert!(
397                chunk.ends_with('\n'),
398                "line does not end with newline: {chunk:?}"
399            );
400        });
401    }
402
403    // T-STR-05: when the buffer is non-empty at shutdown and the upload URI is
404    // invalid (cannot be parsed into bucket+key), the thread serializes the batch,
405    // compresses it, then skips the S3 put and exits cleanly.
406    //
407    // This test covers the CSV-serialize → gzip → bucket-empty → shutdown path
408    // inside the upload thread without requiring a real S3 endpoint.
409    #[test]
410    fn test_upload_thread_processes_batch_with_invalid_uri() {
411        use crate::sentinel::run::RunContext;
412        use crate::sentinel::s3::UploadCredentials;
413        use std::sync::atomic::Ordering;
414        use std::sync::{Arc, Mutex};
415        use std::time::{Duration, Instant};
416
417        let (uploader, buf) = BatchUploader::new(1, 1);
418        let flag = uploader.shutdown_flag();
419
420        // Push a sample into the buffer so the thread has a non-empty batch.
421        {
422            let mut guard = buf.lock().unwrap();
423            guard.push(minimal_sample());
424        }
425
426        // Invalid upload_uri_prefix: parse_s3_uri will fail and bucket will be empty.
427        let ctx = Arc::new(Mutex::new(RunContext {
428            run_id: "r".to_string(),
429            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
430            credentials: UploadCredentials {
431                access_key_id: "k".to_string(),
432                secret_access_key: "s".to_string(),
433                session_token: "t".to_string(),
434                expires_at: "2099-01-01T00:00:00Z".to_string(),
435            },
436        }));
437
438        let agent = ureq::config::Config::builder()
439            .timeout_global(Some(Duration::from_millis(100)))
440            .build()
441            .new_agent();
442
443        // Signal shutdown before spawning so the thread skips the sleep phase.
444        flag.store(true, Ordering::Relaxed);
445
446        let handle = uploader
447            .spawn(
448                ctx,
449                agent,
450                "http://127.0.0.1:1".to_string(),
451                "token".to_string(),
452            )
453            .expect("upload thread must spawn in test");
454
455        let t0 = Instant::now();
456        handle.join().expect("upload thread panicked");
457        let elapsed = t0.elapsed();
458
459        assert!(
460            elapsed < Duration::from_secs(2),
461            "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
462        );
463    }
464
465    // T-UPL-INT-01: full roundtrip -- sample serialized, gzip-compressed, and
466    // PUT to the real Sentinel S3 bucket.  Covers the Ok(uri) success arm inside
467    // spawn() (uploaded_uris.push, seq += 1, consecutive_failures = 0).
468    // Skips automatically when SENTINEL_API_TOKEN is absent.
469    #[test]
470    fn test_upload_roundtrip_real_api() {
471        use crate::config::JobMetadata;
472        use crate::metrics::{CloudInfo, HostInfo};
473        use crate::sentinel::run::{close_run, start_run};
474        use std::sync::atomic::Ordering;
475        use std::sync::{Arc, Mutex};
476        use std::time::Duration;
477
478        let token = match std::env::var("SENTINEL_API_TOKEN") {
479            Ok(t) if !t.is_empty() => t,
480            _ => {
481                eprintln!("skip: SENTINEL_API_TOKEN not set");
482                return;
483            }
484        };
485        let api_base = std::env::var("SENTINEL_API_BASE")
486            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
487        eprintln!("T-UPL-INT-01: api_base={api_base}");
488
489        let agent = ureq::config::Config::builder()
490            .timeout_global(Some(Duration::from_secs(30)))
491            .build()
492            .new_agent();
493
494        let ctx = start_run(
495            &agent,
496            &api_base,
497            &token,
498            &JobMetadata {
499                job_name: Some("upload-roundtrip-test".to_string()),
500                ..Default::default()
501            },
502            None,
503            &HostInfo::default(),
504            &CloudInfo::default(),
505        )
506        .expect("start_run failed");
507        eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
508
509        let ctx_arc = Arc::new(Mutex::new(ctx));
510        let (uploader, buf) = BatchUploader::new(1, 1);
511        let flag = uploader.shutdown_flag();
512
513        buf.lock().unwrap().push(minimal_sample());
514
515        // Signal shutdown before spawning: the thread processes one batch then exits.
516        flag.store(true, Ordering::Relaxed);
517
518        let handle = uploader
519            .spawn(
520                Arc::clone(&ctx_arc),
521                agent.clone(),
522                api_base.clone(),
523                token.clone(),
524            )
525            .expect("upload thread must spawn in test");
526        let uris = handle.join().expect("upload thread panicked");
527        eprintln!("T-UPL-INT-01: uris={uris:?}");
528
529        assert!(
530            !uris.is_empty(),
531            "expected at least one S3 URI; S3 upload may have failed"
532        );
533        assert!(
534            uris[0].starts_with("s3://"),
535            "URI must have s3:// scheme: {}",
536            uris[0]
537        );
538
539        // Close the run via the S3 route.
540        let ctx_guard = ctx_arc.lock().unwrap();
541        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
542        assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
543        eprintln!("T-UPL-INT-01: close_run ok");
544    }
545
546    // T-UPL-INT-02: upload thread calls refresh_credentials when expires_at is in the
547    // past.  Covers the creds_expiring_soon() -> refresh_credentials block.
548    // The actual S3 credentials issued by start_run are still valid; setting
549    // expires_at to 1970 only causes our code to request a refresh -- the server
550    // returns fresh credentials and the upload proceeds normally.
551    // Skips automatically when SENTINEL_API_TOKEN is absent.
552    #[test]
553    fn test_upload_thread_refreshes_expiring_credentials() {
554        use crate::config::JobMetadata;
555        use crate::metrics::{CloudInfo, HostInfo};
556        use crate::sentinel::run::{close_run, start_run};
557        use std::sync::atomic::Ordering;
558        use std::sync::{Arc, Mutex};
559        use std::time::Duration;
560
561        let token = match std::env::var("SENTINEL_API_TOKEN") {
562            Ok(t) if !t.is_empty() => t,
563            _ => {
564                eprintln!("skip: SENTINEL_API_TOKEN not set");
565                return;
566            }
567        };
568        let api_base = std::env::var("SENTINEL_API_BASE")
569            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
570        eprintln!("T-UPL-INT-02: api_base={api_base}");
571
572        let agent = ureq::config::Config::builder()
573            .timeout_global(Some(Duration::from_secs(30)))
574            .build()
575            .new_agent();
576
577        let mut ctx = start_run(
578            &agent,
579            &api_base,
580            &token,
581            &JobMetadata {
582                job_name: Some("cred-refresh-test".to_string()),
583                ..Default::default()
584            },
585            None,
586            &HostInfo::default(),
587            &CloudInfo::default(),
588        )
589        .expect("start_run failed");
590        eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
591
592        // Force expires_at into the past so creds_expiring_soon() returns true.
593        // The underlying S3 session token is still valid; this only triggers the
594        // refresh call path in the upload thread.
595        ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
596        eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
597
598        let ctx_arc = Arc::new(Mutex::new(ctx));
599        let (uploader, buf) = BatchUploader::new(1, 1);
600        let flag = uploader.shutdown_flag();
601
602        buf.lock().unwrap().push(minimal_sample());
603
604        flag.store(true, Ordering::Relaxed);
605
606        let handle = uploader
607            .spawn(
608                Arc::clone(&ctx_arc),
609                agent.clone(),
610                api_base.clone(),
611                token.clone(),
612            )
613            .expect("upload thread must spawn in test");
614        let uris = handle.join().expect("upload thread panicked");
615        eprintln!("T-UPL-INT-02: uris={uris:?}");
616
617        // After refresh the upload should succeed via S3 or fall back to inline.
618        let ctx_guard = ctx_arc.lock().unwrap();
619        let csv = if uris.is_empty() {
620            Some(samples_to_csv(&[minimal_sample()], 1))
621        } else {
622            None
623        };
624        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
625        assert!(
626            result.is_ok(),
627            "close_run after credential refresh failed: {result:?}"
628        );
629        eprintln!("T-UPL-INT-02: close_run ok");
630    }
631
632    // T-STR-06: when a valid-looking S3 endpoint is unreachable, the thread
633    // retries up to MAX_CONSECUTIVE_FAILURES times then resets and continues.
634    // Shutdown flag is set after the first failed batch so the thread exits.
635    // NOTE: this test takes ~7 s because the retry back-off sleeps 2 s + 4 s.
636    #[test]
637    fn test_upload_thread_handles_s3_failure_gracefully() {
638        use crate::sentinel::run::RunContext;
639        use crate::sentinel::s3::UploadCredentials;
640        use std::sync::atomic::Ordering;
641        use std::sync::{Arc, Mutex};
642        use std::time::{Duration, Instant};
643
644        let (uploader, buf) = BatchUploader::new(1, 1);
645        let flag = uploader.shutdown_flag();
646
647        // Push a sample into the buffer.
648        {
649            let mut guard = buf.lock().unwrap();
650            guard.push(minimal_sample());
651        }
652
653        // A real-looking S3 URI but bucket host is unreachable (port 1 = closed).
654        let ctx = Arc::new(Mutex::new(RunContext {
655            run_id: "r".to_string(),
656            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
657            credentials: UploadCredentials {
658                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
659                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
660                session_token: "token".to_string(),
661                expires_at: "2099-01-01T00:00:00Z".to_string(),
662            },
663        }));
664
665        // Very short timeout so the S3 attempt fails fast.
666        let agent = ureq::config::Config::builder()
667            .timeout_global(Some(Duration::from_millis(200)))
668            .build()
669            .new_agent();
670
671        // Pre-set shutdown: the thread will process one batch, fail the upload,
672        // then exit on the shutdown check.
673        flag.store(true, Ordering::Relaxed);
674
675        let handle = uploader
676            .spawn(
677                ctx,
678                agent,
679                "http://127.0.0.1:1".to_string(),
680                "token".to_string(),
681            )
682            .expect("upload thread must spawn in test");
683
684        let t0 = Instant::now();
685        handle.join().expect("upload thread panicked");
686        let elapsed = t0.elapsed();
687
688        // Should complete well within 5 s even with retry back-off (2+4 s),
689        // because the retries are skipped when timeout is 200 ms and the
690        // first attempt fails near-instantly.
691        // Timing breakdown: region detection (up to 2 s TCP timeout) +
692        // 3 × 200 ms agent timeouts + 2 s + 4 s retry sleeps = ~8.6 s max.
693        // Allow 20 s to accommodate slow DNS on any CI/dev host.
694        assert!(
695            elapsed < Duration::from_secs(20),
696            "upload thread took {elapsed:?}; expected < 20 s"
697        );
698    }
699
700    // T-COV-01: empty sample slice produces only the CSV header line.
701    #[test]
702    fn test_samples_to_csv_empty_slice() {
703        let csv = samples_to_csv(&[], 1);
704        let mut lines = csv.lines();
705        assert_eq!(
706            lines.next(),
707            Some(csv_header()),
708            "first line must be the CSV header"
709        );
710        assert_eq!(lines.next(), None, "empty slice must produce no data rows");
711        assert!(csv.ends_with('\n'), "output must end with a newline");
712    }
713
714    // T-STR-07: the upload thread hits the empty-batch `continue` path at least
715    // twice before processing a sample pushed after those cycles, then exits
716    // on the shutdown signal.  Covers lines 135-139 in the non-shutdown branch.
717    #[test]
718    fn test_upload_thread_skips_empty_batch_then_processes() {
719        use crate::sentinel::run::RunContext;
720        use crate::sentinel::s3::UploadCredentials;
721        use std::sync::atomic::Ordering;
722        use std::sync::{Arc, Mutex};
723        use std::time::{Duration, Instant};
724
725        // upload_interval=0 → ticks_per_interval = (0*4).max(1) = 1 → 250 ms per cycle.
726        let (uploader, buf) = BatchUploader::new(0, 1);
727        let flag = uploader.shutdown_flag();
728
729        // Invalid URI → bucket empty after parse → S3 call is skipped entirely.
730        let ctx = Arc::new(Mutex::new(RunContext {
731            run_id: "r".to_string(),
732            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
733            credentials: UploadCredentials {
734                access_key_id: "k".to_string(),
735                secret_access_key: "s".to_string(),
736                session_token: "t".to_string(),
737                expires_at: "2099-01-01T00:00:00Z".to_string(),
738            },
739        }));
740
741        let agent = ureq::config::Config::builder()
742            .timeout_global(Some(Duration::from_millis(100)))
743            .build()
744            .new_agent();
745
746        let handle = uploader
747            .spawn(
748                ctx,
749                agent,
750                "http://127.0.0.1:1".to_string(),
751                "token".to_string(),
752            )
753            .expect("upload thread must spawn in test");
754
755        // Let the thread execute at least two empty-buffer iterations (2 × 250 ms).
756        std::thread::sleep(Duration::from_millis(700));
757
758        // Push a sample then signal shutdown.  The thread drains it on the next wake,
759        // serializes, gzip-compresses, skips S3 (empty bucket), and exits.
760        buf.lock().unwrap().push(minimal_sample());
761        flag.store(true, Ordering::Relaxed);
762
763        let t0 = Instant::now();
764        handle.join().expect("upload thread panicked");
765        let elapsed = t0.elapsed();
766
767        assert!(
768            elapsed < Duration::from_secs(2),
769            "thread took {elapsed:?} after sample push; expected < 2 s"
770        );
771    }
772
773    // T-STR-08: after MAX_CONSECUTIVE_FAILURES (3) consecutive batch failures the
774    // thread resets the counter to 0 and continues rather than exiting.
775    //
776    // Three distinct batches are forced by pushing one sample at a time and
777    // sleeping between pushes so each drain is a separate upload attempt.
778    // Shutdown is signaled together with the third push so the thread processes
779    // batch 3 (hits the reset branch) and then exits on the shutdown check.
780    //
781    // NOTE: each failed batch runs through MAX_UPLOAD_ATTEMPTS retries with
782    // exponential back-off (2 s + 4 s = 6 s) plus region detection (~2 s on
783    // the first batch).  Total wall-clock time is approximately 26 s.
784    #[test]
785    fn test_upload_thread_resets_consecutive_failures() {
786        use crate::sentinel::run::RunContext;
787        use crate::sentinel::s3::UploadCredentials;
788        use std::sync::atomic::Ordering;
789        use std::sync::{Arc, Mutex};
790        use std::time::{Duration, Instant};
791
792        let ctx = Arc::new(Mutex::new(RunContext {
793            run_id: "r".to_string(),
794            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
795            credentials: UploadCredentials {
796                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
797                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
798                session_token: "token".to_string(),
799                expires_at: "2099-01-01T00:00:00Z".to_string(),
800            },
801        }));
802
803        let agent = ureq::config::Config::builder()
804            .timeout_global(Some(Duration::from_millis(50)))
805            .build()
806            .new_agent();
807
808        // upload_interval=0 → thread wakes every 250 ms between upload cycles.
809        let (uploader, buf) = BatchUploader::new(0, 1);
810        let flag = uploader.shutdown_flag();
811
812        let handle = uploader
813            .spawn(
814                Arc::clone(&ctx),
815                agent,
816                "http://127.0.0.1:1".to_string(),
817                "token".to_string(),
818            )
819            .expect("upload thread must spawn in test");
820
821        // Each batch takes ≤10 s: region detection (~2 s first time) +
822        // 3 × 50 ms agent timeout + 2 s + 4 s retry back-off.
823        // Push one sample per batch; sleep between pushes so each drain is
824        // a distinct batch (batch N drains before sample N+1 arrives).
825        //
826        // Batch 1 → consecutive_failures = 1
827        // Batch 2 → consecutive_failures = 2
828        // Batch 3 → consecutive_failures = 3 → RESET to 0 (lines 233-237)
829        let batch_wait = Duration::from_secs(10);
830        for i in 0..3u32 {
831            buf.lock().unwrap().push(minimal_sample());
832            if i < 2 {
833                std::thread::sleep(batch_wait);
834            }
835        }
836
837        // Signal shutdown together with the third sample so the thread
838        // processes batch 3, resets the counter, then exits.
839        flag.store(true, Ordering::Relaxed);
840
841        let t0 = Instant::now();
842        handle.join().expect("upload thread panicked");
843        let elapsed = t0.elapsed();
844
845        // From the shutdown signal: batch 3 takes ≤10 s, then the thread exits.
846        assert!(
847            elapsed < Duration::from_secs(15),
848            "thread did not exit after consecutive-failure reset: {elapsed:?}"
849        );
850    }
851
852    // T-STR-09: BatchUploader::new wires the uploader's internal buffer and the
853    // returned SampleBuffer to the same Arc allocation.
854    #[test]
855    fn test_batch_uploader_new_shares_buffer() {
856        use std::sync::Arc;
857        let (uploader, buf) = BatchUploader::new(60, 1);
858        assert!(
859            Arc::ptr_eq(&uploader.buffer, &buf),
860            "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
861        );
862    }
863}