Skip to main content

resource_tracker/sentinel/
upload.rs

1//! Background batch upload thread: buffers samples, serializes as CSV,
2//! gzip-compresses, and uploads to S3 every 60 seconds (configurable).
3
4use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17/// Shared sample buffer: main thread pushes, upload thread drains.
18pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20// Maximum consecutive upload failures before the thread stops retrying
21// for the current batch and logs a warning.
22const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24// Total upload attempts per batch (1 initial + N-1 retries).
25// Delay before attempt i (i > 0) is 2^i seconds: 2 s, 4 s, 8 s, …
26const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28// ---------------------------------------------------------------------------
29// CSV serialization helper
30// ---------------------------------------------------------------------------
31
32/// Serialize a slice of samples as a complete CSV string (header + rows).
33pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34    let mut out = String::with_capacity(samples.len() * 256);
35    out.push_str(csv_header());
36    out.push('\n');
37    samples.iter().for_each(|s| {
38        out.push_str(&sample_to_csv_row(s, interval_secs));
39        out.push('\n');
40    });
41    out
42}
43
44/// Gzip-compress `data` using the default compression level.
45pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47    encoder
48        .write_all(data)
49        .map_err(|e| format!("gzip write failed: {e}"))?;
50    encoder
51        .finish()
52        .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55// ---------------------------------------------------------------------------
56// BatchUploader
57// ---------------------------------------------------------------------------
58
59pub struct BatchUploader {
60    /// Buffer shared with the main thread.
61    pub buffer: SampleBuffer,
62    /// Set to true by `request_shutdown()` to trigger a final flush.
63    shutdown: Arc<AtomicBool>,
64    /// Polling interval for the upload thread (seconds, default 60).
65    upload_interval_secs: u64,
66    /// Sampling interval (seconds) -- needed to compute per-interval byte counts in CSV.
67    sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71    /// Create a new `BatchUploader` and return the shared `SampleBuffer`
72    /// so the main thread can push samples into it.
73    pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74        let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75        let uploader = Self {
76            buffer: Arc::clone(&buffer),
77            shutdown: Arc::new(AtomicBool::new(false)),
78            upload_interval_secs,
79            sample_interval_secs,
80        };
81        (uploader, buffer)
82    }
83
84    /// Clone the shutdown flag so `main.rs` can signal the upload thread to
85    /// flush and exit after moving `self` into the spawned thread.
86    pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87        Arc::clone(&self.shutdown)
88    }
89
90    /// Spawn the background upload thread.
91    ///
92    /// The thread wakes every `upload_interval_secs`, drains the buffer, builds
93    /// a `.csv.gz` batch (gzip-compressed CSV, `Content-Type: application/gzip`),
94    /// and uploads it to S3.  On shutdown signal it performs one final flush
95    /// before exiting.
96    ///
97    /// Returns a `JoinHandle<Vec<String>>` whose value is the list of all
98    /// successfully uploaded S3 URIs (e.g. `"s3://bucket/prefix/run-id/000000.csv.gz"`).
99    /// The caller uses this list to decide the `/finish` route:
100    /// - non-empty → `data_source: "s3"` with `data_uris`
101    /// - empty     → `data_source: "inline"` with `data_csv`
102    pub fn spawn(
103        self,
104        ctx: Arc<Mutex<RunContext>>,
105        agent: ureq::Agent,
106        api_base: String,
107        token: String,
108    ) -> std::thread::JoinHandle<Vec<String>> {
109        std::thread::spawn(move || {
110            let mut region_cache = RegionCache::new();
111            let mut seq: u32 = 0;
112            let mut consecutive_failures: u32 = 0;
113            let mut uploaded_uris: Vec<String> = Vec::new();
114
115            // Break the upload interval into 250 ms ticks so a shutdown signal
116            // is noticed within 250 ms rather than waiting a full 60 seconds.
117            let tick = Duration::from_millis(250);
118            let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120            loop {
121                let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123                if !shutting_down {
124                    (0..ticks_per_interval)
125                        .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126                        .for_each(|_| std::thread::sleep(tick));
127                }
128
129                // Drain buffer under a minimal lock window.
130                let batch: Vec<Sample> = {
131                    let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132                    std::mem::take(&mut *guard)
133                };
134
135                if batch.is_empty() {
136                    if shutting_down {
137                        break;
138                    }
139                    continue;
140                }
141
142                // Serialize and compress outside the lock.
143                let csv = samples_to_csv(&batch, self.sample_interval_secs);
144                let compressed = match gzip_compress(csv.as_bytes()) {
145                    Ok(b) => b,
146                    Err(e) => {
147                        eprintln!("warn: upload batch {seq} gzip failed: {e}");
148                        if shutting_down {
149                            break;
150                        }
151                        continue;
152                    }
153                };
154
155                // Check credentials near expires_at; refresh if needed.
156                {
157                    let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158                    if ctx_guard.creds_expiring_soon()
159                        && let Err(e) =
160                            refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161                    {
162                        eprintln!("warn: credential refresh failed: {e}");
163                    }
164                }
165
166                // Build the S3 key.
167                let (bucket, prefix, creds, run_id) = {
168                    let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169                    let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170                        crate::sentinel::s3::S3Uri {
171                            bucket: String::new(),
172                            key: String::new(),
173                        }
174                    });
175                    (
176                        uri.bucket,
177                        uri.key,
178                        ctx_guard.credentials.clone(),
179                        ctx_guard.run_id.clone(),
180                    )
181                };
182
183                if bucket.is_empty() {
184                    eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185                    if shutting_down {
186                        break;
187                    }
188                    continue;
189                }
190
191                let region = region_cache.get_or_detect(&bucket);
192                let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194                // Upload with exponential backoff.
195                // Attempt 0 is immediate; attempt i (i > 0) sleeps 2^i seconds first.
196                // With MAX_UPLOAD_ATTEMPTS=3: delays are 2 s and 4 s before retries.
197                let result: Result<String, String> = {
198                    let mut last_err = String::new();
199                    let mut uploaded_uri: Option<String> = None;
200                    for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201                        if attempt > 0 {
202                            std::thread::sleep(Duration::from_secs(1u64 << attempt));
203                        }
204                        match s3_put(&agent, &bucket, &key, &region, &compressed, &creds) {
205                            Ok(uri) => {
206                                uploaded_uri = Some(uri);
207                                break;
208                            }
209                            Err(e) => {
210                                last_err = if last_err.is_empty() {
211                                    e
212                                } else {
213                                    format!("{last_err}; retry{attempt}: {e}")
214                                };
215                            }
216                        }
217                    }
218                    match uploaded_uri {
219                        Some(uri) => Ok(uri),
220                        None => Err(last_err),
221                    }
222                };
223
224                match result {
225                    Ok(uri) => {
226                        uploaded_uris.push(uri);
227                        seq += 1;
228                        consecutive_failures = 0;
229                    }
230                    Err(e) => {
231                        consecutive_failures += 1;
232                        eprintln!(
233                            "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234                        );
235                        if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236                            eprintln!(
237                                "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238                            );
239                            consecutive_failures = 0;
240                        }
241                    }
242                }
243
244                if shutting_down {
245                    break;
246                }
247            }
248            uploaded_uris
249        })
250    }
251}
252
253// ---------------------------------------------------------------------------
254// Unit tests
255// ---------------------------------------------------------------------------
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261    use crate::output::csv::csv_header;
262    use std::io::Read;
263
264    fn minimal_sample() -> Sample {
265        Sample {
266            timestamp_secs: 1_000_000,
267            job_name: None,
268            tracked_pid: None,
269            cpu: CpuMetrics {
270                utilization_pct: 1.0,
271                process_utime_secs: None,
272                process_stime_secs: None,
273                process_rss_mib: None,
274                process_disk_read_bytes: None,
275                process_disk_write_bytes: None,
276                process_gpu_vram_mib: None,
277                process_gpu_utilized: None,
278                process_tree_pids: vec![],
279                ..Default::default()
280            },
281            memory: MemoryMetrics {
282                free_mib: 512,
283                used_mib: 512,
284                ..Default::default()
285            },
286            network: vec![],
287            disk: vec![],
288            gpu: vec![],
289        }
290    }
291
292    // T-STR-01: upload thread exits within 2 s of the shutdown flag being set,
293    // even when the upload interval is 60 s.
294    // Without the tick-based sleep the thread would block for the full interval.
295    #[test]
296    fn test_upload_thread_shuts_down_promptly() {
297        use crate::sentinel::run::RunContext;
298        use crate::sentinel::s3::UploadCredentials;
299        use std::sync::{Arc, Mutex};
300        use std::time::{Duration, Instant};
301
302        let (uploader, _buf) = BatchUploader::new(60, 1);
303        let flag = uploader.shutdown_flag();
304
305        let ctx = Arc::new(Mutex::new(RunContext {
306            run_id: "r".to_string(),
307            upload_uri_prefix: "s3://b/p".to_string(),
308            credentials: UploadCredentials {
309                access_key_id: "k".to_string(),
310                secret_access_key: "s".to_string(),
311                session_token: "t".to_string(),
312                expires_at: "2099-01-01T00:00:00Z".to_string(),
313            },
314        }));
315
316        let agent = ureq::config::Config::builder()
317            .timeout_global(Some(Duration::from_secs(1)))
318            .build()
319            .new_agent();
320
321        // Buffer is empty so the thread will never attempt an upload.
322        let handle = uploader.spawn(
323            ctx,
324            agent,
325            "http://127.0.0.1:1".to_string(),
326            "token".to_string(),
327        );
328
329        // Signal shutdown immediately and measure how long join takes.
330        let t0 = Instant::now();
331        flag.store(true, Ordering::Relaxed);
332        handle.join().expect("upload thread panicked");
333        let elapsed = t0.elapsed();
334
335        assert!(
336            elapsed < Duration::from_secs(2),
337            "upload thread took {elapsed:?} to shut down; expected < 2 s"
338        );
339    }
340
341    // T-STR-02: batch body decompresses to valid CSV (header + data rows).
342    //
343    // Spec Section 9.2.2: "A batch upload request contains Content-Encoding: gzip
344    // and the body decompresses to valid CSV or JSONL."
345    // This test verifies the compress/decompress round-trip and CSV structure.
346    #[test]
347    fn test_gzip_compress_decompresses_to_valid_csv() {
348        let samples = vec![minimal_sample(), minimal_sample()];
349        let csv = samples_to_csv(&samples, 1);
350        let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
351
352        // Gzip magic bytes (RFC 1952 Section 2.3.1).
353        assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
354
355        // Decompress must round-trip to identical bytes.
356        let mut decoder = GzDecoder::new(&compressed[..]);
357        let mut decompressed = String::new();
358        decoder
359            .read_to_string(&mut decompressed)
360            .expect("gzip decompression failed");
361        assert_eq!(
362            decompressed, csv,
363            "decompressed content does not match original CSV"
364        );
365
366        // First line must be the CSV header.
367        let first_line = decompressed.lines().next().expect("empty output");
368        assert_eq!(first_line, csv_header(), "first line is not the CSV header");
369
370        // Every data row must have the same column count as the header.
371        let header_cols = csv_header().split(',').count();
372        decompressed
373            .lines()
374            .skip(1)
375            .enumerate()
376            .for_each(|(i, line)| {
377                assert!(!line.is_empty(), "unexpected empty data line at index {i}");
378                let cols = line.split(',').count();
379                assert_eq!(
380                    cols, header_cols,
381                    "data row {i} has {cols} columns, expected {header_cols}: {line}"
382                );
383            });
384    }
385
386    // Every line produced by samples_to_csv (header and data rows) ends with '\n'.
387    #[test]
388    fn test_samples_to_csv_all_lines_end_with_newline() {
389        let samples = vec![minimal_sample()];
390        let csv = samples_to_csv(&samples, 1);
391        csv.split_inclusive('\n').for_each(|chunk| {
392            assert!(
393                chunk.ends_with('\n'),
394                "line does not end with newline: {chunk:?}"
395            );
396        });
397    }
398
399    // T-STR-05: when the buffer is non-empty at shutdown and the upload URI is
400    // invalid (cannot be parsed into bucket+key), the thread serializes the batch,
401    // compresses it, then skips the S3 put and exits cleanly.
402    //
403    // This test covers the CSV-serialize → gzip → bucket-empty → shutdown path
404    // inside the upload thread without requiring a real S3 endpoint.
405    #[test]
406    fn test_upload_thread_processes_batch_with_invalid_uri() {
407        use crate::sentinel::run::RunContext;
408        use crate::sentinel::s3::UploadCredentials;
409        use std::sync::atomic::Ordering;
410        use std::sync::{Arc, Mutex};
411        use std::time::{Duration, Instant};
412
413        let (uploader, buf) = BatchUploader::new(1, 1);
414        let flag = uploader.shutdown_flag();
415
416        // Push a sample into the buffer so the thread has a non-empty batch.
417        {
418            let mut guard = buf.lock().unwrap();
419            guard.push(minimal_sample());
420        }
421
422        // Invalid upload_uri_prefix: parse_s3_uri will fail and bucket will be empty.
423        let ctx = Arc::new(Mutex::new(RunContext {
424            run_id: "r".to_string(),
425            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
426            credentials: UploadCredentials {
427                access_key_id: "k".to_string(),
428                secret_access_key: "s".to_string(),
429                session_token: "t".to_string(),
430                expires_at: "2099-01-01T00:00:00Z".to_string(),
431            },
432        }));
433
434        let agent = ureq::config::Config::builder()
435            .timeout_global(Some(Duration::from_millis(100)))
436            .build()
437            .new_agent();
438
439        // Signal shutdown before spawning so the thread skips the sleep phase.
440        flag.store(true, Ordering::Relaxed);
441
442        let handle = uploader.spawn(
443            ctx,
444            agent,
445            "http://127.0.0.1:1".to_string(),
446            "token".to_string(),
447        );
448
449        let t0 = Instant::now();
450        handle.join().expect("upload thread panicked");
451        let elapsed = t0.elapsed();
452
453        assert!(
454            elapsed < Duration::from_secs(2),
455            "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
456        );
457    }
458
459    // T-UPL-INT-01: full roundtrip -- sample serialized, gzip-compressed, and
460    // PUT to the real Sentinel S3 bucket.  Covers the Ok(uri) success arm inside
461    // spawn() (uploaded_uris.push, seq += 1, consecutive_failures = 0).
462    // Skips automatically when SENTINEL_API_TOKEN is absent.
463    #[test]
464    fn test_upload_roundtrip_real_api() {
465        use crate::config::JobMetadata;
466        use crate::metrics::{CloudInfo, HostInfo};
467        use crate::sentinel::run::{close_run, start_run};
468        use std::sync::atomic::Ordering;
469        use std::sync::{Arc, Mutex};
470        use std::time::Duration;
471
472        let token = match std::env::var("SENTINEL_API_TOKEN") {
473            Ok(t) if !t.is_empty() => t,
474            _ => {
475                eprintln!("skip: SENTINEL_API_TOKEN not set");
476                return;
477            }
478        };
479        let api_base = std::env::var("SENTINEL_API_BASE")
480            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
481        eprintln!("T-UPL-INT-01: api_base={api_base}");
482
483        let agent = ureq::config::Config::builder()
484            .timeout_global(Some(Duration::from_secs(30)))
485            .build()
486            .new_agent();
487
488        let ctx = start_run(
489            &agent,
490            &api_base,
491            &token,
492            &JobMetadata {
493                job_name: Some("upload-roundtrip-test".to_string()),
494                ..Default::default()
495            },
496            None,
497            &HostInfo::default(),
498            &CloudInfo::default(),
499        )
500        .expect("start_run failed");
501        eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
502
503        let ctx_arc = Arc::new(Mutex::new(ctx));
504        let (uploader, buf) = BatchUploader::new(1, 1);
505        let flag = uploader.shutdown_flag();
506
507        buf.lock().unwrap().push(minimal_sample());
508
509        // Signal shutdown before spawning: the thread processes one batch then exits.
510        flag.store(true, Ordering::Relaxed);
511
512        let handle = uploader.spawn(
513            Arc::clone(&ctx_arc),
514            agent.clone(),
515            api_base.clone(),
516            token.clone(),
517        );
518        let uris = handle.join().expect("upload thread panicked");
519        eprintln!("T-UPL-INT-01: uris={uris:?}");
520
521        assert!(
522            !uris.is_empty(),
523            "expected at least one S3 URI; S3 upload may have failed"
524        );
525        assert!(
526            uris[0].starts_with("s3://"),
527            "URI must have s3:// scheme: {}",
528            uris[0]
529        );
530
531        // Close the run via the S3 route.
532        let ctx_guard = ctx_arc.lock().unwrap();
533        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
534        assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
535        eprintln!("T-UPL-INT-01: close_run ok");
536    }
537
538    // T-UPL-INT-02: upload thread calls refresh_credentials when expires_at is in the
539    // past.  Covers the creds_expiring_soon() -> refresh_credentials block.
540    // The actual S3 credentials issued by start_run are still valid; setting
541    // expires_at to 1970 only causes our code to request a refresh -- the server
542    // returns fresh credentials and the upload proceeds normally.
543    // Skips automatically when SENTINEL_API_TOKEN is absent.
544    #[test]
545    fn test_upload_thread_refreshes_expiring_credentials() {
546        use crate::config::JobMetadata;
547        use crate::metrics::{CloudInfo, HostInfo};
548        use crate::sentinel::run::{close_run, start_run};
549        use std::sync::atomic::Ordering;
550        use std::sync::{Arc, Mutex};
551        use std::time::Duration;
552
553        let token = match std::env::var("SENTINEL_API_TOKEN") {
554            Ok(t) if !t.is_empty() => t,
555            _ => {
556                eprintln!("skip: SENTINEL_API_TOKEN not set");
557                return;
558            }
559        };
560        let api_base = std::env::var("SENTINEL_API_BASE")
561            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
562        eprintln!("T-UPL-INT-02: api_base={api_base}");
563
564        let agent = ureq::config::Config::builder()
565            .timeout_global(Some(Duration::from_secs(30)))
566            .build()
567            .new_agent();
568
569        let mut ctx = start_run(
570            &agent,
571            &api_base,
572            &token,
573            &JobMetadata {
574                job_name: Some("cred-refresh-test".to_string()),
575                ..Default::default()
576            },
577            None,
578            &HostInfo::default(),
579            &CloudInfo::default(),
580        )
581        .expect("start_run failed");
582        eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
583
584        // Force expires_at into the past so creds_expiring_soon() returns true.
585        // The underlying S3 session token is still valid; this only triggers the
586        // refresh call path in the upload thread.
587        ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
588        eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
589
590        let ctx_arc = Arc::new(Mutex::new(ctx));
591        let (uploader, buf) = BatchUploader::new(1, 1);
592        let flag = uploader.shutdown_flag();
593
594        buf.lock().unwrap().push(minimal_sample());
595
596        flag.store(true, Ordering::Relaxed);
597
598        let handle = uploader.spawn(
599            Arc::clone(&ctx_arc),
600            agent.clone(),
601            api_base.clone(),
602            token.clone(),
603        );
604        let uris = handle.join().expect("upload thread panicked");
605        eprintln!("T-UPL-INT-02: uris={uris:?}");
606
607        // After refresh the upload should succeed via S3 or fall back to inline.
608        let ctx_guard = ctx_arc.lock().unwrap();
609        let csv = if uris.is_empty() {
610            Some(samples_to_csv(&[minimal_sample()], 1))
611        } else {
612            None
613        };
614        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
615        assert!(
616            result.is_ok(),
617            "close_run after credential refresh failed: {result:?}"
618        );
619        eprintln!("T-UPL-INT-02: close_run ok");
620    }
621
622    // T-STR-06: when a valid-looking S3 endpoint is unreachable, the thread
623    // retries up to MAX_CONSECUTIVE_FAILURES times then resets and continues.
624    // Shutdown flag is set after the first failed batch so the thread exits.
625    // NOTE: this test takes ~7 s because the retry back-off sleeps 2 s + 4 s.
626    #[test]
627    fn test_upload_thread_handles_s3_failure_gracefully() {
628        use crate::sentinel::run::RunContext;
629        use crate::sentinel::s3::UploadCredentials;
630        use std::sync::atomic::Ordering;
631        use std::sync::{Arc, Mutex};
632        use std::time::{Duration, Instant};
633
634        let (uploader, buf) = BatchUploader::new(1, 1);
635        let flag = uploader.shutdown_flag();
636
637        // Push a sample into the buffer.
638        {
639            let mut guard = buf.lock().unwrap();
640            guard.push(minimal_sample());
641        }
642
643        // A real-looking S3 URI but bucket host is unreachable (port 1 = closed).
644        let ctx = Arc::new(Mutex::new(RunContext {
645            run_id: "r".to_string(),
646            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
647            credentials: UploadCredentials {
648                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
649                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
650                session_token: "token".to_string(),
651                expires_at: "2099-01-01T00:00:00Z".to_string(),
652            },
653        }));
654
655        // Very short timeout so the S3 attempt fails fast.
656        let agent = ureq::config::Config::builder()
657            .timeout_global(Some(Duration::from_millis(200)))
658            .build()
659            .new_agent();
660
661        // Pre-set shutdown: the thread will process one batch, fail the upload,
662        // then exit on the shutdown check.
663        flag.store(true, Ordering::Relaxed);
664
665        let handle = uploader.spawn(
666            ctx,
667            agent,
668            "http://127.0.0.1:1".to_string(),
669            "token".to_string(),
670        );
671
672        let t0 = Instant::now();
673        handle.join().expect("upload thread panicked");
674        let elapsed = t0.elapsed();
675
676        // Should complete well within 5 s even with retry back-off (2+4 s),
677        // because the retries are skipped when timeout is 200 ms and the
678        // first attempt fails near-instantly.
679        // Timing breakdown: region detection (up to 2 s TCP timeout) +
680        // 3 × 200 ms agent timeouts + 2 s + 4 s retry sleeps = ~8.6 s max.
681        // Allow 20 s to accommodate slow DNS on any CI/dev host.
682        assert!(
683            elapsed < Duration::from_secs(20),
684            "upload thread took {elapsed:?}; expected < 20 s"
685        );
686    }
687
688    // T-COV-01: empty sample slice produces only the CSV header line.
689    #[test]
690    fn test_samples_to_csv_empty_slice() {
691        let csv = samples_to_csv(&[], 1);
692        let mut lines = csv.lines();
693        assert_eq!(
694            lines.next(),
695            Some(csv_header()),
696            "first line must be the CSV header"
697        );
698        assert_eq!(lines.next(), None, "empty slice must produce no data rows");
699        assert!(csv.ends_with('\n'), "output must end with a newline");
700    }
701
702    // T-STR-07: the upload thread hits the empty-batch `continue` path at least
703    // twice before processing a sample pushed after those cycles, then exits
704    // on the shutdown signal.  Covers lines 135-139 in the non-shutdown branch.
705    #[test]
706    fn test_upload_thread_skips_empty_batch_then_processes() {
707        use crate::sentinel::run::RunContext;
708        use crate::sentinel::s3::UploadCredentials;
709        use std::sync::atomic::Ordering;
710        use std::sync::{Arc, Mutex};
711        use std::time::{Duration, Instant};
712
713        // upload_interval=0 → ticks_per_interval = (0*4).max(1) = 1 → 250 ms per cycle.
714        let (uploader, buf) = BatchUploader::new(0, 1);
715        let flag = uploader.shutdown_flag();
716
717        // Invalid URI → bucket empty after parse → S3 call is skipped entirely.
718        let ctx = Arc::new(Mutex::new(RunContext {
719            run_id: "r".to_string(),
720            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
721            credentials: UploadCredentials {
722                access_key_id: "k".to_string(),
723                secret_access_key: "s".to_string(),
724                session_token: "t".to_string(),
725                expires_at: "2099-01-01T00:00:00Z".to_string(),
726            },
727        }));
728
729        let agent = ureq::config::Config::builder()
730            .timeout_global(Some(Duration::from_millis(100)))
731            .build()
732            .new_agent();
733
734        let handle = uploader.spawn(
735            ctx,
736            agent,
737            "http://127.0.0.1:1".to_string(),
738            "token".to_string(),
739        );
740
741        // Let the thread execute at least two empty-buffer iterations (2 × 250 ms).
742        std::thread::sleep(Duration::from_millis(700));
743
744        // Push a sample then signal shutdown.  The thread drains it on the next wake,
745        // serializes, gzip-compresses, skips S3 (empty bucket), and exits.
746        buf.lock().unwrap().push(minimal_sample());
747        flag.store(true, Ordering::Relaxed);
748
749        let t0 = Instant::now();
750        handle.join().expect("upload thread panicked");
751        let elapsed = t0.elapsed();
752
753        assert!(
754            elapsed < Duration::from_secs(2),
755            "thread took {elapsed:?} after sample push; expected < 2 s"
756        );
757    }
758
759    // T-STR-08: after MAX_CONSECUTIVE_FAILURES (3) consecutive batch failures the
760    // thread resets the counter to 0 and continues rather than exiting.
761    //
762    // Three distinct batches are forced by pushing one sample at a time and
763    // sleeping between pushes so each drain is a separate upload attempt.
764    // Shutdown is signaled together with the third push so the thread processes
765    // batch 3 (hits the reset branch) and then exits on the shutdown check.
766    //
767    // NOTE: each failed batch runs through MAX_UPLOAD_ATTEMPTS retries with
768    // exponential back-off (2 s + 4 s = 6 s) plus region detection (~2 s on
769    // the first batch).  Total wall-clock time is approximately 26 s.
770    #[test]
771    fn test_upload_thread_resets_consecutive_failures() {
772        use crate::sentinel::run::RunContext;
773        use crate::sentinel::s3::UploadCredentials;
774        use std::sync::atomic::Ordering;
775        use std::sync::{Arc, Mutex};
776        use std::time::{Duration, Instant};
777
778        let ctx = Arc::new(Mutex::new(RunContext {
779            run_id: "r".to_string(),
780            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
781            credentials: UploadCredentials {
782                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
783                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
784                session_token: "token".to_string(),
785                expires_at: "2099-01-01T00:00:00Z".to_string(),
786            },
787        }));
788
789        let agent = ureq::config::Config::builder()
790            .timeout_global(Some(Duration::from_millis(50)))
791            .build()
792            .new_agent();
793
794        // upload_interval=0 → thread wakes every 250 ms between upload cycles.
795        let (uploader, buf) = BatchUploader::new(0, 1);
796        let flag = uploader.shutdown_flag();
797
798        let handle = uploader.spawn(
799            Arc::clone(&ctx),
800            agent,
801            "http://127.0.0.1:1".to_string(),
802            "token".to_string(),
803        );
804
805        // Each batch takes ≤10 s: region detection (~2 s first time) +
806        // 3 × 50 ms agent timeout + 2 s + 4 s retry back-off.
807        // Push one sample per batch; sleep between pushes so each drain is
808        // a distinct batch (batch N drains before sample N+1 arrives).
809        //
810        // Batch 1 → consecutive_failures = 1
811        // Batch 2 → consecutive_failures = 2
812        // Batch 3 → consecutive_failures = 3 → RESET to 0 (lines 233-237)
813        let batch_wait = Duration::from_secs(10);
814        for i in 0..3u32 {
815            buf.lock().unwrap().push(minimal_sample());
816            if i < 2 {
817                std::thread::sleep(batch_wait);
818            }
819        }
820
821        // Signal shutdown together with the third sample so the thread
822        // processes batch 3, resets the counter, then exits.
823        flag.store(true, Ordering::Relaxed);
824
825        let t0 = Instant::now();
826        handle.join().expect("upload thread panicked");
827        let elapsed = t0.elapsed();
828
829        // From the shutdown signal: batch 3 takes ≤10 s, then the thread exits.
830        assert!(
831            elapsed < Duration::from_secs(15),
832            "thread did not exit after consecutive-failure reset: {elapsed:?}"
833        );
834    }
835
836    // T-STR-09: BatchUploader::new wires the uploader's internal buffer and the
837    // returned SampleBuffer to the same Arc allocation.
838    #[test]
839    fn test_batch_uploader_new_shares_buffer() {
840        use std::sync::Arc;
841        let (uploader, buf) = BatchUploader::new(60, 1);
842        assert!(
843            Arc::ptr_eq(&uploader.buffer, &buf),
844            "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
845        );
846    }
847}