Skip to main content

resource_tracker/sentinel/
upload.rs

1//! Background batch upload thread: buffers samples, serializes as CSV,
2//! gzip-compresses, and uploads to S3 every 60 seconds (configurable).
3
4use crate::metrics::Sample;
5use crate::output::csv::{csv_header, sample_to_csv_row};
6use crate::sentinel::run::{RunContext, refresh_credentials};
7use crate::sentinel::s3::{RegionCache, parse_s3_uri, s3_put};
8use flate2::{Compression, write::GzEncoder};
9use std::io::Write;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14#[cfg(test)]
15use flate2::read::GzDecoder;
16
17/// Shared sample buffer: main thread pushes, upload thread drains.
18pub type SampleBuffer = Arc<Mutex<Vec<Sample>>>;
19
20// Maximum consecutive upload failures before the thread stops retrying
21// for the current batch and logs a warning.
22const MAX_CONSECUTIVE_FAILURES: u32 = 3;
23
24// Total upload attempts per batch (1 initial + N-1 retries).
25// Delay before attempt i (i > 0) is 2^i seconds: 2 s, 4 s, 8 s, …
26const MAX_UPLOAD_ATTEMPTS: u32 = 3;
27
28// ---------------------------------------------------------------------------
29// CSV serialization helper
30// ---------------------------------------------------------------------------
31
32/// Serialize a slice of samples as a complete CSV string (header + rows).
33pub fn samples_to_csv(samples: &[Sample], interval_secs: u64) -> String {
34    let mut out = String::with_capacity(samples.len() * 256);
35    out.push_str(csv_header());
36    out.push('\n');
37    samples.iter().for_each(|s| {
38        out.push_str(&sample_to_csv_row(s, interval_secs));
39        out.push('\n');
40    });
41    out
42}
43
44/// Gzip-compress `data` using the default compression level.
45pub fn gzip_compress(data: &[u8]) -> Result<Vec<u8>, String> {
46    let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
47    encoder
48        .write_all(data)
49        .map_err(|e| format!("gzip write failed: {e}"))?;
50    encoder
51        .finish()
52        .map_err(|e| format!("gzip finish failed: {e}"))
53}
54
55// ---------------------------------------------------------------------------
56// BatchUploader
57// ---------------------------------------------------------------------------
58
59pub struct BatchUploader {
60    /// Buffer shared with the main thread.
61    pub buffer: SampleBuffer,
62    /// Set to true by `request_shutdown()` to trigger a final flush.
63    shutdown: Arc<AtomicBool>,
64    /// Polling interval for the upload thread (seconds, default 60).
65    upload_interval_secs: u64,
66    /// Sampling interval (seconds) -- needed to compute per-interval byte counts in CSV.
67    sample_interval_secs: u64,
68}
69
70impl BatchUploader {
71    /// Create a new `BatchUploader` and return the shared `SampleBuffer`
72    /// so the main thread can push samples into it.
73    pub fn new(upload_interval_secs: u64, sample_interval_secs: u64) -> (Self, SampleBuffer) {
74        let buffer = Arc::new(Mutex::new(Vec::<Sample>::new()));
75        let uploader = Self {
76            buffer: Arc::clone(&buffer),
77            shutdown: Arc::new(AtomicBool::new(false)),
78            upload_interval_secs,
79            sample_interval_secs,
80        };
81        (uploader, buffer)
82    }
83
84    /// Clone the shutdown flag so `main.rs` can signal the upload thread to
85    /// flush and exit after moving `self` into the spawned thread.
86    pub fn shutdown_flag(&self) -> Arc<AtomicBool> {
87        Arc::clone(&self.shutdown)
88    }
89
90    /// Spawn the background upload thread.
91    ///
92    /// The thread wakes every `upload_interval_secs`, drains the buffer, builds
93    /// a `.csv.gz` batch (gzip-compressed CSV, `Content-Type: application/gzip`),
94    /// and uploads it to S3.  On shutdown signal it performs one final flush
95    /// before exiting.
96    ///
97    /// Returns a `JoinHandle<Vec<String>>` whose value is the list of all
98    /// successfully uploaded S3 URIs (e.g. `"s3://bucket/prefix/run-id/000000.csv.gz"`).
99    /// The caller uses this list to decide the `/finish` route:
100    /// - non-empty → `data_source: "s3"` with `data_uris`
101    /// - empty     → `data_source: "inline"` with `data_csv`
102    pub fn spawn(
103        self,
104        ctx: Arc<Mutex<RunContext>>,
105        agent: ureq::Agent,
106        api_base: String,
107        token: String,
108    ) -> std::thread::JoinHandle<Vec<String>> {
109        std::thread::spawn(move || {
110            let mut region_cache = RegionCache::new();
111            let mut seq: u32 = 0;
112            let mut consecutive_failures: u32 = 0;
113            let mut uploaded_uris: Vec<String> = Vec::new();
114
115            // Break the upload interval into 250 ms ticks so a shutdown signal
116            // is noticed within 250 ms rather than waiting a full 60 seconds.
117            let tick = Duration::from_millis(250);
118            let ticks_per_interval = (self.upload_interval_secs * 4).max(1);
119
120            loop {
121                let shutting_down = self.shutdown.load(Ordering::Relaxed);
122
123                if !shutting_down {
124                    (0..ticks_per_interval)
125                        .take_while(|_| !self.shutdown.load(Ordering::Relaxed))
126                        .for_each(|_| std::thread::sleep(tick));
127                }
128
129                // Drain buffer under a minimal lock window.
130                let batch: Vec<Sample> = {
131                    let mut guard = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
132                    std::mem::take(&mut *guard)
133                };
134
135                if batch.is_empty() {
136                    if shutting_down {
137                        break;
138                    }
139                    continue;
140                }
141
142                // Serialize and compress outside the lock.
143                let csv = samples_to_csv(&batch, self.sample_interval_secs);
144                let compressed = match gzip_compress(csv.as_bytes()) {
145                    Ok(b) => b,
146                    Err(e) => {
147                        eprintln!("warn: upload batch {seq} gzip failed: {e}");
148                        if shutting_down {
149                            break;
150                        }
151                        continue;
152                    }
153                };
154
155                // Check credentials near expires_at; refresh if needed.
156                {
157                    let mut ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
158                    if ctx_guard.creds_expiring_soon()
159                        && let Err(e) =
160                            refresh_credentials(&agent, &api_base, &token, &mut ctx_guard)
161                    {
162                        eprintln!("warn: credential refresh failed: {e}");
163                    }
164                }
165
166                // Build the S3 key.
167                let (bucket, prefix, creds, run_id) = {
168                    let ctx_guard = ctx.lock().unwrap_or_else(|e| e.into_inner());
169                    let uri = parse_s3_uri(&ctx_guard.upload_uri_prefix).unwrap_or_else(|_| {
170                        crate::sentinel::s3::S3Uri {
171                            bucket: String::new(),
172                            key: String::new(),
173                        }
174                    });
175                    (
176                        uri.bucket,
177                        uri.key,
178                        ctx_guard.credentials.clone(),
179                        ctx_guard.run_id.clone(),
180                    )
181                };
182
183                if bucket.is_empty() {
184                    eprintln!("warn: upload_uri_prefix could not be parsed; skipping batch {seq}");
185                    if shutting_down {
186                        break;
187                    }
188                    continue;
189                }
190
191                let region = region_cache.get_or_detect(&bucket);
192                let key = format!("{prefix}/{run_id}/{seq:06}.csv.gz");
193
194                // Upload with exponential backoff.
195                // Attempt 0 is immediate; attempt i (i > 0) sleeps 2^i seconds first.
196                // With MAX_UPLOAD_ATTEMPTS=3: delays are 2 s and 4 s before retries.
197                let result: Result<String, String> = {
198                    let mut last_err = String::new();
199                    let mut uploaded_uri: Option<String> = None;
200                    for attempt in 0..MAX_UPLOAD_ATTEMPTS {
201                        if attempt > 0 {
202                            std::thread::sleep(Duration::from_secs(1u64 << attempt));
203                        }
204                        match s3_put(&agent, &bucket, &key, &region, &compressed, &creds) {
205                            Ok(uri) => {
206                                uploaded_uri = Some(uri);
207                                break;
208                            }
209                            Err(e) => {
210                                last_err = if last_err.is_empty() {
211                                    e
212                                } else {
213                                    format!("{last_err}; retry{attempt}: {e}")
214                                };
215                            }
216                        }
217                    }
218                    match uploaded_uri {
219                        Some(uri) => Ok(uri),
220                        None => Err(last_err),
221                    }
222                };
223
224                match result {
225                    Ok(uri) => {
226                        uploaded_uris.push(uri);
227                        seq += 1;
228                        consecutive_failures = 0;
229                    }
230                    Err(e) => {
231                        consecutive_failures += 1;
232                        eprintln!(
233                            "warn: S3 upload failed (attempt {consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {e}"
234                        );
235                        if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
236                            eprintln!(
237                                "warn: {MAX_CONSECUTIVE_FAILURES} consecutive upload failures; buffering continues but data may be lost"
238                            );
239                            consecutive_failures = 0;
240                        }
241                    }
242                }
243
244                if shutting_down {
245                    break;
246                }
247            }
248            uploaded_uris
249        })
250    }
251}
252
253// ---------------------------------------------------------------------------
254// Unit tests
255// ---------------------------------------------------------------------------
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::metrics::{CpuMetrics, MemoryMetrics, Sample};
261    use crate::output::csv::csv_header;
262    use std::io::Read;
263
264    fn minimal_sample() -> Sample {
265        Sample {
266            timestamp_secs: 1_000_000,
267            job_name: None,
268            tracked_pid: None,
269            cpu: CpuMetrics {
270                utilization_pct: 1.0,
271                process_utime_secs: None,
272                process_stime_secs: None,
273                process_pss_mib: None,
274                process_rss_mib: None,
275                process_disk_read_bytes: None,
276                process_disk_write_bytes: None,
277                process_gpu_vram_mib: None,
278                process_gpu_utilized: None,
279                process_tree_pids: vec![],
280                ..Default::default()
281            },
282            memory: MemoryMetrics {
283                free_mib: 512,
284                used_mib: 512,
285                ..Default::default()
286            },
287            network: vec![],
288            disk: vec![],
289            gpu: vec![],
290        }
291    }
292
293    // T-STR-01: upload thread exits within 2 s of the shutdown flag being set,
294    // even when the upload interval is 60 s.
295    // Without the tick-based sleep the thread would block for the full interval.
296    #[test]
297    fn test_upload_thread_shuts_down_promptly() {
298        use crate::sentinel::run::RunContext;
299        use crate::sentinel::s3::UploadCredentials;
300        use std::sync::{Arc, Mutex};
301        use std::time::{Duration, Instant};
302
303        let (uploader, _buf) = BatchUploader::new(60, 1);
304        let flag = uploader.shutdown_flag();
305
306        let ctx = Arc::new(Mutex::new(RunContext {
307            run_id: "r".to_string(),
308            upload_uri_prefix: "s3://b/p".to_string(),
309            credentials: UploadCredentials {
310                access_key_id: "k".to_string(),
311                secret_access_key: "s".to_string(),
312                session_token: "t".to_string(),
313                expires_at: "2099-01-01T00:00:00Z".to_string(),
314            },
315        }));
316
317        let agent = ureq::config::Config::builder()
318            .timeout_global(Some(Duration::from_secs(1)))
319            .build()
320            .new_agent();
321
322        // Buffer is empty so the thread will never attempt an upload.
323        let handle = uploader.spawn(
324            ctx,
325            agent,
326            "http://127.0.0.1:1".to_string(),
327            "token".to_string(),
328        );
329
330        // Signal shutdown immediately and measure how long join takes.
331        let t0 = Instant::now();
332        flag.store(true, Ordering::Relaxed);
333        handle.join().expect("upload thread panicked");
334        let elapsed = t0.elapsed();
335
336        assert!(
337            elapsed < Duration::from_secs(2),
338            "upload thread took {elapsed:?} to shut down; expected < 2 s"
339        );
340    }
341
342    // T-STR-02: batch body decompresses to valid CSV (header + data rows).
343    //
344    // Spec Section 9.2.2: "A batch upload request contains Content-Encoding: gzip
345    // and the body decompresses to valid CSV or JSONL."
346    // This test verifies the compress/decompress round-trip and CSV structure.
347    #[test]
348    fn test_gzip_compress_decompresses_to_valid_csv() {
349        let samples = vec![minimal_sample(), minimal_sample()];
350        let csv = samples_to_csv(&samples, 1);
351        let compressed = gzip_compress(csv.as_bytes()).expect("gzip_compress failed");
352
353        // Gzip magic bytes (RFC 1952 Section 2.3.1).
354        assert_eq!(&compressed[..2], b"\x1f\x8b", "missing gzip magic bytes");
355
356        // Decompress must round-trip to identical bytes.
357        let mut decoder = GzDecoder::new(&compressed[..]);
358        let mut decompressed = String::new();
359        decoder
360            .read_to_string(&mut decompressed)
361            .expect("gzip decompression failed");
362        assert_eq!(
363            decompressed, csv,
364            "decompressed content does not match original CSV"
365        );
366
367        // First line must be the CSV header.
368        let first_line = decompressed.lines().next().expect("empty output");
369        assert_eq!(first_line, csv_header(), "first line is not the CSV header");
370
371        // Every data row must have the same column count as the header.
372        let header_cols = csv_header().split(',').count();
373        decompressed
374            .lines()
375            .skip(1)
376            .enumerate()
377            .for_each(|(i, line)| {
378                assert!(!line.is_empty(), "unexpected empty data line at index {i}");
379                let cols = line.split(',').count();
380                assert_eq!(
381                    cols, header_cols,
382                    "data row {i} has {cols} columns, expected {header_cols}: {line}"
383                );
384            });
385    }
386
387    // Every line produced by samples_to_csv (header and data rows) ends with '\n'.
388    #[test]
389    fn test_samples_to_csv_all_lines_end_with_newline() {
390        let samples = vec![minimal_sample()];
391        let csv = samples_to_csv(&samples, 1);
392        csv.split_inclusive('\n').for_each(|chunk| {
393            assert!(
394                chunk.ends_with('\n'),
395                "line does not end with newline: {chunk:?}"
396            );
397        });
398    }
399
400    // T-STR-05: when the buffer is non-empty at shutdown and the upload URI is
401    // invalid (cannot be parsed into bucket+key), the thread serializes the batch,
402    // compresses it, then skips the S3 put and exits cleanly.
403    //
404    // This test covers the CSV-serialize → gzip → bucket-empty → shutdown path
405    // inside the upload thread without requiring a real S3 endpoint.
406    #[test]
407    fn test_upload_thread_processes_batch_with_invalid_uri() {
408        use crate::sentinel::run::RunContext;
409        use crate::sentinel::s3::UploadCredentials;
410        use std::sync::atomic::Ordering;
411        use std::sync::{Arc, Mutex};
412        use std::time::{Duration, Instant};
413
414        let (uploader, buf) = BatchUploader::new(1, 1);
415        let flag = uploader.shutdown_flag();
416
417        // Push a sample into the buffer so the thread has a non-empty batch.
418        {
419            let mut guard = buf.lock().unwrap();
420            guard.push(minimal_sample());
421        }
422
423        // Invalid upload_uri_prefix: parse_s3_uri will fail and bucket will be empty.
424        let ctx = Arc::new(Mutex::new(RunContext {
425            run_id: "r".to_string(),
426            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
427            credentials: UploadCredentials {
428                access_key_id: "k".to_string(),
429                secret_access_key: "s".to_string(),
430                session_token: "t".to_string(),
431                expires_at: "2099-01-01T00:00:00Z".to_string(),
432            },
433        }));
434
435        let agent = ureq::config::Config::builder()
436            .timeout_global(Some(Duration::from_millis(100)))
437            .build()
438            .new_agent();
439
440        // Signal shutdown before spawning so the thread skips the sleep phase.
441        flag.store(true, Ordering::Relaxed);
442
443        let handle = uploader.spawn(
444            ctx,
445            agent,
446            "http://127.0.0.1:1".to_string(),
447            "token".to_string(),
448        );
449
450        let t0 = Instant::now();
451        handle.join().expect("upload thread panicked");
452        let elapsed = t0.elapsed();
453
454        assert!(
455            elapsed < Duration::from_secs(2),
456            "upload thread with non-empty batch took {elapsed:?}; expected < 2 s"
457        );
458    }
459
460    // T-UPL-INT-01: full roundtrip -- sample serialized, gzip-compressed, and
461    // PUT to the real Sentinel S3 bucket.  Covers the Ok(uri) success arm inside
462    // spawn() (uploaded_uris.push, seq += 1, consecutive_failures = 0).
463    // Skips automatically when SENTINEL_API_TOKEN is absent.
464    #[test]
465    fn test_upload_roundtrip_real_api() {
466        use crate::config::JobMetadata;
467        use crate::metrics::{CloudInfo, HostInfo};
468        use crate::sentinel::run::{close_run, start_run};
469        use std::sync::atomic::Ordering;
470        use std::sync::{Arc, Mutex};
471        use std::time::Duration;
472
473        let token = match std::env::var("SENTINEL_API_TOKEN") {
474            Ok(t) if !t.is_empty() => t,
475            _ => {
476                eprintln!("skip: SENTINEL_API_TOKEN not set");
477                return;
478            }
479        };
480        let api_base = std::env::var("SENTINEL_API_BASE")
481            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
482        eprintln!("T-UPL-INT-01: api_base={api_base}");
483
484        let agent = ureq::config::Config::builder()
485            .timeout_global(Some(Duration::from_secs(30)))
486            .build()
487            .new_agent();
488
489        let ctx = start_run(
490            &agent,
491            &api_base,
492            &token,
493            &JobMetadata {
494                job_name: Some("upload-roundtrip-test".to_string()),
495                ..Default::default()
496            },
497            None,
498            &HostInfo::default(),
499            &CloudInfo::default(),
500        )
501        .expect("start_run failed");
502        eprintln!("T-UPL-INT-01: run_id={}", ctx.run_id);
503
504        let ctx_arc = Arc::new(Mutex::new(ctx));
505        let (uploader, buf) = BatchUploader::new(1, 1);
506        let flag = uploader.shutdown_flag();
507
508        buf.lock().unwrap().push(minimal_sample());
509
510        // Signal shutdown before spawning: the thread processes one batch then exits.
511        flag.store(true, Ordering::Relaxed);
512
513        let handle = uploader.spawn(
514            Arc::clone(&ctx_arc),
515            agent.clone(),
516            api_base.clone(),
517            token.clone(),
518        );
519        let uris = handle.join().expect("upload thread panicked");
520        eprintln!("T-UPL-INT-01: uris={uris:?}");
521
522        assert!(
523            !uris.is_empty(),
524            "expected at least one S3 URI; S3 upload may have failed"
525        );
526        assert!(
527            uris[0].starts_with("s3://"),
528            "URI must have s3:// scheme: {}",
529            uris[0]
530        );
531
532        // Close the run via the S3 route.
533        let ctx_guard = ctx_arc.lock().unwrap();
534        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), None, &uris);
535        assert!(result.is_ok(), "close_run (S3 route) failed: {result:?}");
536        eprintln!("T-UPL-INT-01: close_run ok");
537    }
538
539    // T-UPL-INT-02: upload thread calls refresh_credentials when expires_at is in the
540    // past.  Covers the creds_expiring_soon() -> refresh_credentials block.
541    // The actual S3 credentials issued by start_run are still valid; setting
542    // expires_at to 1970 only causes our code to request a refresh -- the server
543    // returns fresh credentials and the upload proceeds normally.
544    // Skips automatically when SENTINEL_API_TOKEN is absent.
545    #[test]
546    fn test_upload_thread_refreshes_expiring_credentials() {
547        use crate::config::JobMetadata;
548        use crate::metrics::{CloudInfo, HostInfo};
549        use crate::sentinel::run::{close_run, start_run};
550        use std::sync::atomic::Ordering;
551        use std::sync::{Arc, Mutex};
552        use std::time::Duration;
553
554        let token = match std::env::var("SENTINEL_API_TOKEN") {
555            Ok(t) if !t.is_empty() => t,
556            _ => {
557                eprintln!("skip: SENTINEL_API_TOKEN not set");
558                return;
559            }
560        };
561        let api_base = std::env::var("SENTINEL_API_BASE")
562            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
563        eprintln!("T-UPL-INT-02: api_base={api_base}");
564
565        let agent = ureq::config::Config::builder()
566            .timeout_global(Some(Duration::from_secs(30)))
567            .build()
568            .new_agent();
569
570        let mut ctx = start_run(
571            &agent,
572            &api_base,
573            &token,
574            &JobMetadata {
575                job_name: Some("cred-refresh-test".to_string()),
576                ..Default::default()
577            },
578            None,
579            &HostInfo::default(),
580            &CloudInfo::default(),
581        )
582        .expect("start_run failed");
583        eprintln!("T-UPL-INT-02: run_id={}", ctx.run_id);
584
585        // Force expires_at into the past so creds_expiring_soon() returns true.
586        // The underlying S3 session token is still valid; this only triggers the
587        // refresh call path in the upload thread.
588        ctx.credentials.expires_at = "1970-01-01T00:00:00Z".to_string();
589        eprintln!("T-UPL-INT-02: expires_at forced to epoch; creds_expiring_soon() will be true");
590
591        let ctx_arc = Arc::new(Mutex::new(ctx));
592        let (uploader, buf) = BatchUploader::new(1, 1);
593        let flag = uploader.shutdown_flag();
594
595        buf.lock().unwrap().push(minimal_sample());
596
597        flag.store(true, Ordering::Relaxed);
598
599        let handle = uploader.spawn(
600            Arc::clone(&ctx_arc),
601            agent.clone(),
602            api_base.clone(),
603            token.clone(),
604        );
605        let uris = handle.join().expect("upload thread panicked");
606        eprintln!("T-UPL-INT-02: uris={uris:?}");
607
608        // After refresh the upload should succeed via S3 or fall back to inline.
609        let ctx_guard = ctx_arc.lock().unwrap();
610        let csv = if uris.is_empty() {
611            Some(samples_to_csv(&[minimal_sample()], 1))
612        } else {
613            None
614        };
615        let result = close_run(&agent, &api_base, &token, &ctx_guard, Some(0), csv, &uris);
616        assert!(
617            result.is_ok(),
618            "close_run after credential refresh failed: {result:?}"
619        );
620        eprintln!("T-UPL-INT-02: close_run ok");
621    }
622
623    // T-STR-06: when a valid-looking S3 endpoint is unreachable, the thread
624    // retries up to MAX_CONSECUTIVE_FAILURES times then resets and continues.
625    // Shutdown flag is set after the first failed batch so the thread exits.
626    // NOTE: this test takes ~7 s because the retry back-off sleeps 2 s + 4 s.
627    #[test]
628    fn test_upload_thread_handles_s3_failure_gracefully() {
629        use crate::sentinel::run::RunContext;
630        use crate::sentinel::s3::UploadCredentials;
631        use std::sync::atomic::Ordering;
632        use std::sync::{Arc, Mutex};
633        use std::time::{Duration, Instant};
634
635        let (uploader, buf) = BatchUploader::new(1, 1);
636        let flag = uploader.shutdown_flag();
637
638        // Push a sample into the buffer.
639        {
640            let mut guard = buf.lock().unwrap();
641            guard.push(minimal_sample());
642        }
643
644        // A real-looking S3 URI but bucket host is unreachable (port 1 = closed).
645        let ctx = Arc::new(Mutex::new(RunContext {
646            run_id: "r".to_string(),
647            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
648            credentials: UploadCredentials {
649                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
650                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
651                session_token: "token".to_string(),
652                expires_at: "2099-01-01T00:00:00Z".to_string(),
653            },
654        }));
655
656        // Very short timeout so the S3 attempt fails fast.
657        let agent = ureq::config::Config::builder()
658            .timeout_global(Some(Duration::from_millis(200)))
659            .build()
660            .new_agent();
661
662        // Pre-set shutdown: the thread will process one batch, fail the upload,
663        // then exit on the shutdown check.
664        flag.store(true, Ordering::Relaxed);
665
666        let handle = uploader.spawn(
667            ctx,
668            agent,
669            "http://127.0.0.1:1".to_string(),
670            "token".to_string(),
671        );
672
673        let t0 = Instant::now();
674        handle.join().expect("upload thread panicked");
675        let elapsed = t0.elapsed();
676
677        // Should complete well within 5 s even with retry back-off (2+4 s),
678        // because the retries are skipped when timeout is 200 ms and the
679        // first attempt fails near-instantly.
680        // Timing breakdown: region detection (up to 2 s TCP timeout) +
681        // 3 × 200 ms agent timeouts + 2 s + 4 s retry sleeps = ~8.6 s max.
682        // Allow 20 s to accommodate slow DNS on any CI/dev host.
683        assert!(
684            elapsed < Duration::from_secs(20),
685            "upload thread took {elapsed:?}; expected < 20 s"
686        );
687    }
688
689    // T-COV-01: empty sample slice produces only the CSV header line.
690    #[test]
691    fn test_samples_to_csv_empty_slice() {
692        let csv = samples_to_csv(&[], 1);
693        let mut lines = csv.lines();
694        assert_eq!(
695            lines.next(),
696            Some(csv_header()),
697            "first line must be the CSV header"
698        );
699        assert_eq!(lines.next(), None, "empty slice must produce no data rows");
700        assert!(csv.ends_with('\n'), "output must end with a newline");
701    }
702
703    // T-STR-07: the upload thread hits the empty-batch `continue` path at least
704    // twice before processing a sample pushed after those cycles, then exits
705    // on the shutdown signal.  Covers lines 135-139 in the non-shutdown branch.
706    #[test]
707    fn test_upload_thread_skips_empty_batch_then_processes() {
708        use crate::sentinel::run::RunContext;
709        use crate::sentinel::s3::UploadCredentials;
710        use std::sync::atomic::Ordering;
711        use std::sync::{Arc, Mutex};
712        use std::time::{Duration, Instant};
713
714        // upload_interval=0 → ticks_per_interval = (0*4).max(1) = 1 → 250 ms per cycle.
715        let (uploader, buf) = BatchUploader::new(0, 1);
716        let flag = uploader.shutdown_flag();
717
718        // Invalid URI → bucket empty after parse → S3 call is skipped entirely.
719        let ctx = Arc::new(Mutex::new(RunContext {
720            run_id: "r".to_string(),
721            upload_uri_prefix: "invalid-not-an-s3-uri".to_string(),
722            credentials: UploadCredentials {
723                access_key_id: "k".to_string(),
724                secret_access_key: "s".to_string(),
725                session_token: "t".to_string(),
726                expires_at: "2099-01-01T00:00:00Z".to_string(),
727            },
728        }));
729
730        let agent = ureq::config::Config::builder()
731            .timeout_global(Some(Duration::from_millis(100)))
732            .build()
733            .new_agent();
734
735        let handle = uploader.spawn(
736            ctx,
737            agent,
738            "http://127.0.0.1:1".to_string(),
739            "token".to_string(),
740        );
741
742        // Let the thread execute at least two empty-buffer iterations (2 × 250 ms).
743        std::thread::sleep(Duration::from_millis(700));
744
745        // Push a sample then signal shutdown.  The thread drains it on the next wake,
746        // serializes, gzip-compresses, skips S3 (empty bucket), and exits.
747        buf.lock().unwrap().push(minimal_sample());
748        flag.store(true, Ordering::Relaxed);
749
750        let t0 = Instant::now();
751        handle.join().expect("upload thread panicked");
752        let elapsed = t0.elapsed();
753
754        assert!(
755            elapsed < Duration::from_secs(2),
756            "thread took {elapsed:?} after sample push; expected < 2 s"
757        );
758    }
759
760    // T-STR-08: after MAX_CONSECUTIVE_FAILURES (3) consecutive batch failures the
761    // thread resets the counter to 0 and continues rather than exiting.
762    //
763    // Three distinct batches are forced by pushing one sample at a time and
764    // sleeping between pushes so each drain is a separate upload attempt.
765    // Shutdown is signaled together with the third push so the thread processes
766    // batch 3 (hits the reset branch) and then exits on the shutdown check.
767    //
768    // NOTE: each failed batch runs through MAX_UPLOAD_ATTEMPTS retries with
769    // exponential back-off (2 s + 4 s = 6 s) plus region detection (~2 s on
770    // the first batch).  Total wall-clock time is approximately 26 s.
771    #[test]
772    fn test_upload_thread_resets_consecutive_failures() {
773        use crate::sentinel::run::RunContext;
774        use crate::sentinel::s3::UploadCredentials;
775        use std::sync::atomic::Ordering;
776        use std::sync::{Arc, Mutex};
777        use std::time::{Duration, Instant};
778
779        let ctx = Arc::new(Mutex::new(RunContext {
780            run_id: "r".to_string(),
781            upload_uri_prefix: "s3://fake-nonexistent-bucket-xyz/prefix".to_string(),
782            credentials: UploadCredentials {
783                access_key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
784                secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
785                session_token: "token".to_string(),
786                expires_at: "2099-01-01T00:00:00Z".to_string(),
787            },
788        }));
789
790        let agent = ureq::config::Config::builder()
791            .timeout_global(Some(Duration::from_millis(50)))
792            .build()
793            .new_agent();
794
795        // upload_interval=0 → thread wakes every 250 ms between upload cycles.
796        let (uploader, buf) = BatchUploader::new(0, 1);
797        let flag = uploader.shutdown_flag();
798
799        let handle = uploader.spawn(
800            Arc::clone(&ctx),
801            agent,
802            "http://127.0.0.1:1".to_string(),
803            "token".to_string(),
804        );
805
806        // Each batch takes ≤10 s: region detection (~2 s first time) +
807        // 3 × 50 ms agent timeout + 2 s + 4 s retry back-off.
808        // Push one sample per batch; sleep between pushes so each drain is
809        // a distinct batch (batch N drains before sample N+1 arrives).
810        //
811        // Batch 1 → consecutive_failures = 1
812        // Batch 2 → consecutive_failures = 2
813        // Batch 3 → consecutive_failures = 3 → RESET to 0 (lines 233-237)
814        let batch_wait = Duration::from_secs(10);
815        for i in 0..3u32 {
816            buf.lock().unwrap().push(minimal_sample());
817            if i < 2 {
818                std::thread::sleep(batch_wait);
819            }
820        }
821
822        // Signal shutdown together with the third sample so the thread
823        // processes batch 3, resets the counter, then exits.
824        flag.store(true, Ordering::Relaxed);
825
826        let t0 = Instant::now();
827        handle.join().expect("upload thread panicked");
828        let elapsed = t0.elapsed();
829
830        // From the shutdown signal: batch 3 takes ≤10 s, then the thread exits.
831        assert!(
832            elapsed < Duration::from_secs(15),
833            "thread did not exit after consecutive-failure reset: {elapsed:?}"
834        );
835    }
836
837    // T-STR-09: BatchUploader::new wires the uploader's internal buffer and the
838    // returned SampleBuffer to the same Arc allocation.
839    #[test]
840    fn test_batch_uploader_new_shares_buffer() {
841        use std::sync::Arc;
842        let (uploader, buf) = BatchUploader::new(60, 1);
843        assert!(
844            Arc::ptr_eq(&uploader.buffer, &buf),
845            "uploader.buffer and the returned SampleBuffer must point to the same Arc allocation"
846        );
847    }
848}