Skip to main content

resource_tracker/sentinel/
run.rs

1use crate::config::JobMetadata;
2use crate::metrics::{CloudInfo, HostInfo};
3use crate::sentinel::s3::UploadCredentials;
4use serde::{Deserialize, Serialize};
5
6fn slice_is_empty(v: &&[String]) -> bool {
7    v.is_empty()
8}
9
10// ---------------------------------------------------------------------------
11// Base64 encoding (stdlib only -- avoids a crate dependency)
12// ---------------------------------------------------------------------------
13
14#[cfg(test)]
15fn base64_encode(input: &[u8]) -> String {
16    const ALPHA: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
17    let mut out = String::with_capacity((input.len() + 2) / 3 * 4);
18    input.chunks(3).for_each(|chunk| {
19        let b0 = u32::from(chunk[0]);
20        let b1 = if chunk.len() > 1 {
21            u32::from(chunk[1])
22        } else {
23            0
24        };
25        let b2 = if chunk.len() > 2 {
26            u32::from(chunk[2])
27        } else {
28            0
29        };
30        let n = (b0 << 16) | (b1 << 8) | b2;
31        out.push(char::from(ALPHA[((n >> 18) & 0x3f) as usize]));
32        out.push(char::from(ALPHA[((n >> 12) & 0x3f) as usize]));
33        out.push(if chunk.len() > 1 {
34            char::from(ALPHA[((n >> 6) & 0x3f) as usize])
35        } else {
36            '='
37        });
38        out.push(if chunk.len() > 2 {
39            char::from(ALPHA[(n & 0x3f) as usize])
40        } else {
41            '='
42        });
43    });
44    out
45}
46
47// ---------------------------------------------------------------------------
48// API response types
49// ---------------------------------------------------------------------------
50
51#[derive(Debug, Deserialize)]
52struct StartRunResponse {
53    run_id: String,
54    upload_uri_prefix: String,
55    upload_credentials: RawCredentials,
56}
57
58/// Field names match the live Sentinel API response (Python reference:
59/// `sentinel_api.py` `register_run` docstring).
60/// `expiration` is the documented Python name; `expires_at` is accepted as an
61/// alias in case the live API uses a different casing.  The field is optional
62/// so a missing value does not abort the run -- it falls back to a far-future
63/// timestamp (credentials treated as always-fresh).
64#[derive(Debug, Deserialize)]
65struct RawCredentials {
66    access_key: String,
67    secret_key: String,
68    session_token: String,
69    #[serde(alias = "expires_at", default)]
70    expiration: Option<String>,
71}
72
73#[derive(Debug, Deserialize)]
74struct RefreshCredentialsResponse {
75    upload_credentials: RawCredentials,
76}
77
78// ---------------------------------------------------------------------------
79// RunContext
80// ---------------------------------------------------------------------------
81
82/// State returned by `start_run` and referenced by all subsequent API calls.
83#[derive(Debug, Clone)]
84pub struct RunContext {
85    pub run_id: String,
86    pub upload_uri_prefix: String,
87    pub credentials: UploadCredentials,
88}
89
90impl RunContext {
91    /// Returns `true` when the STS credentials expire within 5 minutes.
92    /// Satisfies T-STR-04.
93    pub fn creds_expiring_soon(&self) -> bool {
94        match parse_iso8601_secs(&self.credentials.expires_at) {
95            Some(expires_at_secs) => {
96                let now = std::time::SystemTime::now()
97                    .duration_since(std::time::UNIX_EPOCH)
98                    .unwrap_or_default()
99                    .as_secs();
100                expires_at_secs.saturating_sub(now) < 300
101            }
102            None => {
103                // Unparseable expires_at: treat as already expired to be safe.
104                true
105            }
106        }
107    }
108}
109
110/// Parse a subset of ISO 8601 UTC timestamps into Unix seconds.
111/// Handles the common AWS format `"YYYY-MM-DDTHH:MM:SSZ"`.
112fn parse_iso8601_secs(s: &str) -> Option<u64> {
113    // Expected: "2026-04-01T12:00:00Z" (20 chars, no sub-second, UTC only)
114    let s = s.trim_end_matches('Z');
115    let s = s.trim_end_matches("+00:00");
116    let nums: Vec<u64> = s
117        .split(|c: char| !c.is_ascii_digit())
118        .filter(|p| !p.is_empty())
119        .map(|p| p.parse().ok())
120        .collect::<Option<Vec<_>>>()?;
121    if nums.len() < 6 {
122        return None;
123    }
124    let (y, mo, d, h, mi, sec) = (nums[0], nums[1], nums[2], nums[3], nums[4], nums[5]);
125    // Approximate days since epoch (good enough for expires_at comparison).
126    let days = days_since_epoch(y, mo, d)?;
127    Some(days * 86400 + h * 3600 + mi * 60 + sec)
128}
129
130/// Format a Unix timestamp as an ISO 8601 UTC string (`"YYYY-MM-DDTHH:MM:SSZ"`).
131/// Accurate for 1970-2199.
132fn unix_secs_to_iso8601(secs: u64) -> String {
133    let tod = secs % 86400;
134    let mut days = secs / 86400;
135    let hh = tod / 3600;
136    let mm = (tod % 3600) / 60;
137    let ss = tod % 60;
138
139    let mut year = 1970u64;
140    loop {
141        let is_leap =
142            year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
143        let yd = if is_leap { 366u64 } else { 365u64 };
144        if days < yd {
145            break;
146        }
147        days -= yd;
148        year += 1;
149    }
150    let is_leap = year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
151    const MDAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
152    let mut month = 1u64;
153    loop {
154        let dim = if month == 2 && is_leap {
155            29u64
156        } else {
157            MDAYS[(month - 1) as usize]
158        };
159        if days < dim {
160            break;
161        }
162        days -= dim;
163        month += 1;
164    }
165    let day = days + 1;
166    format!("{year:04}-{month:02}-{day:02}T{hh:02}:{mm:02}:{ss:02}Z")
167}
168
169fn now_iso8601() -> String {
170    let secs = std::time::SystemTime::now()
171        .duration_since(std::time::UNIX_EPOCH)
172        .unwrap_or_default()
173        .as_secs();
174    unix_secs_to_iso8601(secs)
175}
176
177/// Days from 1970-01-01 to the given date (Gregorian, valid 1970–2099).
178fn days_since_epoch(y: u64, mo: u64, d: u64) -> Option<u64> {
179    if !(1..=12).contains(&mo) || !(1..=31).contains(&d) || y < 1970 {
180        return None;
181    }
182    // Days per month (non-leap); February adjusted below.
183    const DAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
184    let is_leap = y.is_multiple_of(4) && !y.is_multiple_of(100) || y.is_multiple_of(400);
185    let year_days: u64 = (1970..y)
186        .map(|yr| {
187            if (yr % 4 == 0 && yr % 100 != 0) || yr % 400 == 0 {
188                366
189            } else {
190                365
191            }
192        })
193        .sum();
194    let month_days: u64 = (1..mo)
195        .map(|m| {
196            if m == 2 && is_leap {
197                29
198            } else {
199                DAYS[(m - 1) as usize]
200            }
201        })
202        .sum();
203    Some(year_days + month_days + d - 1)
204}
205
206// ---------------------------------------------------------------------------
207// Request payload types
208// ---------------------------------------------------------------------------
209
210/// Python `register_run` sends a flat dict merging metadata, host_info, and
211/// cloud_info.  `#[serde(flatten)]` on all three fields reproduces that shape.
212#[derive(Debug, Serialize)]
213struct StartRunRequest<'a> {
214    #[serde(flatten)]
215    metadata: MetadataPayload<'a>,
216    #[serde(flatten)]
217    host: &'a HostInfo,
218    #[serde(flatten)]
219    cloud: &'a CloudInfo,
220}
221
222#[derive(Debug, Serialize)]
223struct MetadataPayload<'a> {
224    #[serde(skip_serializing_if = "Option::is_none")]
225    job_name: Option<&'a str>,
226    #[serde(skip_serializing_if = "Option::is_none")]
227    project_name: Option<&'a str>,
228    #[serde(skip_serializing_if = "Option::is_none")]
229    stage_name: Option<&'a str>,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    task_name: Option<&'a str>,
232    #[serde(skip_serializing_if = "Option::is_none")]
233    team: Option<&'a str>,
234    #[serde(skip_serializing_if = "Option::is_none")]
235    env: Option<&'a str>,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    language: Option<&'a str>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    orchestrator: Option<&'a str>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    executor: Option<&'a str>,
242    #[serde(skip_serializing_if = "Option::is_none")]
243    external_run_id: Option<&'a str>,
244    #[serde(skip_serializing_if = "Option::is_none")]
245    container_image: Option<&'a str>,
246    #[serde(skip_serializing_if = "slice_is_empty")]
247    tags: &'a [String],
248    // NOTE: `pid` was intentionally omitted.  The `RunCreate` schema at
249    // https://api.sentinel.sparecores.net/openapi.json does not include a `pid`
250    // field, so sending it caused Pydantic validation to reject it with HTTP 422.
251    /// Shell-wrapper command encoded as a JSON string, e.g. `"[\"stress\",\"--cpu\",\"4\"]"`.
252    /// The Sentinel API `command` field is `string | null` ("JSON array encoded in TEXT"),
253    /// not a JSON array.  Omitted when not running in shell-wrapper mode.
254    #[serde(skip_serializing_if = "Option::is_none")]
255    command: Option<String>,
256}
257
258// See https://api.sentinel.sparecores.net/docs#/Resource%20Tracker/finish_run_runs__run_id__finish_post
259// The /finish endpoint accepts two discriminated variants keyed on data_source:
260//   "inline" → RunFinishInline  (data_csv: raw CSV string, additionalProperties:false)
261//   "s3"     → RunFinishS3      (data_uris: [s3://...], additionalProperties:false)
262// run_id goes in the URL path only -- do not repeat in body.
263
264/// Payload for RunFinishInline: remaining samples sent as a raw CSV string.
265/// Used when no S3 batches were uploaded (short runs or all S3 failures).
266#[derive(Debug, Serialize)]
267struct CloseRunInlineRequest {
268    #[serde(skip_serializing_if = "Option::is_none")]
269    exit_code: Option<i32>,
270    run_status: &'static str,
271    /// Exact finish time in ISO 8601 UTC format, e.g. "2026-04-03T12:00:00Z".
272    #[serde(skip_serializing_if = "Option::is_none")]
273    finished_at: Option<String>,
274    /// Discriminator value -- must be exactly "inline".
275    data_source: &'static str,
276    /// Raw CSV string -- NOT base64-encoded.  The API schema type is plain string.
277    data_csv: String,
278}
279
280/// Payload for RunFinishS3: data was already uploaded to S3 in batches.
281/// Used when at least one S3 batch was successfully uploaded.
282/// The final remaining samples must have been flushed to S3 before calling
283/// close_run (the BatchUploader performs this flush on shutdown).
284#[derive(Debug, Serialize)]
285struct CloseRunS3Request {
286    #[serde(skip_serializing_if = "Option::is_none")]
287    exit_code: Option<i32>,
288    run_status: &'static str,
289    /// Exact finish time in ISO 8601 UTC format, e.g. "2026-04-03T12:00:00Z".
290    #[serde(skip_serializing_if = "Option::is_none")]
291    finished_at: Option<String>,
292    /// Discriminator value -- must be exactly "s3".
293    data_source: &'static str,
294    /// S3 URIs of all uploaded batch files (including the final flush).
295    data_uris: Vec<String>,
296}
297
298// ---------------------------------------------------------------------------
299// API calls
300// ---------------------------------------------------------------------------
301
302/// POST to `/runs` to register a new run with the Sentinel API.
303///
304/// On failure the caller should log a warning and disable streaming;
305/// local stdout output continues normally (Section 11 error handling).
306pub fn start_run(
307    agent: &ureq::Agent,
308    api_base: &str,
309    token: &str,
310    metadata: &JobMetadata,
311    pid: Option<i32>,
312    host: &HostInfo,
313    cloud: &CloudInfo,
314) -> Result<RunContext, String> {
315    // The API `command` field is `string | null` ("JSON array encoded in TEXT").
316    // Serialize the Vec<String> to a JSON string; send None when no command was given.
317    let command_json: Option<String> = if metadata.command.is_empty() {
318        None
319    } else {
320        serde_json::to_string(&metadata.command).ok()
321    };
322
323    // `pid` is tracked locally but is not part of the `RunCreate` API schema.
324    let _ = pid;
325
326    let payload = StartRunRequest {
327        metadata: MetadataPayload {
328            job_name: metadata.job_name.as_deref(),
329            project_name: metadata.project_name.as_deref(),
330            stage_name: metadata.stage_name.as_deref(),
331            task_name: metadata.task_name.as_deref(),
332            team: metadata.team.as_deref(),
333            env: metadata.env.as_deref(),
334            language: metadata.language.as_deref(),
335            orchestrator: metadata.orchestrator.as_deref(),
336            executor: metadata.executor.as_deref(),
337            external_run_id: metadata.external_run_id.as_deref(),
338            container_image: metadata.container_image.as_deref(),
339            tags: &metadata.tags,
340            command: command_json,
341        },
342        host,
343        cloud,
344    };
345
346    let url = format!("{api_base}/runs");
347    let body = serde_json::to_string(&payload)
348        .map_err(|e| format!("failed to serialize start_run payload: {e}"))?;
349
350    let mut response = agent
351        .post(&url)
352        .header("Authorization", &format!("Bearer {token}"))
353        .header("Content-Type", "application/json")
354        .send(body.as_bytes())
355        .map_err(|e| format!("start_run POST failed: {e}"))?;
356
357    let text = response
358        .body_mut()
359        .read_to_string()
360        .map_err(|e| format!("start_run read body failed: {e}"))?;
361
362    let resp: StartRunResponse = serde_json::from_str(&text).map_err(|e| {
363        format!(
364            "start_run parse response failed: {e} ({} bytes)",
365            text.len()
366        )
367    })?;
368
369    Ok(RunContext {
370        run_id: resp.run_id,
371        upload_uri_prefix: resp.upload_uri_prefix,
372        credentials: UploadCredentials {
373            access_key_id: resp.upload_credentials.access_key,
374            secret_access_key: resp.upload_credentials.secret_key,
375            session_token: resp.upload_credentials.session_token,
376            expires_at: resp
377                .upload_credentials
378                .expiration
379                .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
380        },
381    })
382}
383
384/// POST to `/runs/{run_id}/credentials/refresh` to obtain fresh STS credentials.
385///
386/// Updates `ctx.credentials` in place on success.
387pub fn refresh_credentials(
388    agent: &ureq::Agent,
389    api_base: &str,
390    token: &str,
391    ctx: &mut RunContext,
392) -> Result<(), String> {
393    let url = format!("{api_base}/runs/{}/refresh-credentials", ctx.run_id);
394    let mut response = agent
395        .post(&url)
396        .header("Authorization", &format!("Bearer {token}"))
397        .send(b"" as &[u8])
398        .map_err(|e| format!("credential refresh POST failed: {e}"))?;
399
400    let text = response
401        .body_mut()
402        .read_to_string()
403        .map_err(|e| format!("credential refresh read body failed: {e}"))?;
404
405    let resp: RefreshCredentialsResponse = serde_json::from_str(&text).map_err(|e| {
406        format!(
407            "credential refresh parse failed: {e} ({} bytes)",
408            text.len()
409        )
410    })?;
411
412    ctx.credentials = UploadCredentials {
413        access_key_id: resp.upload_credentials.access_key,
414        secret_access_key: resp.upload_credentials.secret_key,
415        session_token: resp.upload_credentials.session_token,
416        expires_at: resp
417            .upload_credentials
418            .expiration
419            .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
420    };
421    Ok(())
422}
423
424/// POST to `/runs/{run_id}/finish` to mark the run complete.
425///
426/// Dispatches to the correct schema variant based on whether S3 uploads occurred:
427///
428/// - `uploaded_uris` non-empty → `RunFinishS3` (`data_source="s3"`, `data_uris=[...]`).
429///   The `BatchUploader` already flushed the final remaining samples to S3 on
430///   shutdown before this function is called, so `uploaded_uris` contains every batch.
431///   `remaining_csv` is ignored in this path.
432///
433/// - `uploaded_uris` empty → `RunFinishInline` (`data_source="inline"`, `data_csv=...`).
434///   Used for short runs or when all S3 uploads failed.  `remaining_csv` is the
435///   raw CSV of all collected samples (never base64-encoded).
436pub fn close_run(
437    agent: &ureq::Agent,
438    api_base: &str,
439    token: &str,
440    ctx: &RunContext,
441    exit_code: Option<i32>,
442    remaining_csv: Option<String>,
443    uploaded_uris: &[String],
444) -> Result<(), String> {
445    // "finished" for success/clean exit (including SIGTERM), "failed" for non-zero.
446    let run_status = match exit_code {
447        Some(0) | None => "finished",
448        Some(_) => "failed",
449    };
450    let finished_at = Some(now_iso8601());
451
452    let url = format!("{api_base}/runs/{}/finish", ctx.run_id);
453    let body = if uploaded_uris.is_empty() {
454        // Inline route: no S3 batches uploaded.
455        // data_csv must be the raw CSV string (not base64) per the API schema.
456        let payload = CloseRunInlineRequest {
457            exit_code,
458            run_status,
459            finished_at,
460            data_source: "inline",
461            data_csv: remaining_csv.unwrap_or_default(),
462        };
463        serde_json::to_string(&payload)
464            .map_err(|e| format!("failed to serialize close_run inline payload: {e}"))?
465    } else {
466        // S3 route: all batches (including final flush) are in uploaded_uris.
467        let payload = CloseRunS3Request {
468            exit_code,
469            run_status,
470            finished_at,
471            data_source: "s3",
472            data_uris: uploaded_uris.to_vec(),
473        };
474        serde_json::to_string(&payload)
475            .map_err(|e| format!("failed to serialize close_run s3 payload: {e}"))?
476    };
477
478    // Send plain JSON -- no body-level gzip.  The Sentinel API (FastAPI) does
479    // not decompress Content-Encoding: gzip on incoming request bodies, so
480    // sending compressed bytes causes a 422.  This matches the Python reference
481    // which calls requests.post(url, json=payload) with no Content-Encoding.
482    let response = agent
483        .post(&url)
484        .header("Authorization", &format!("Bearer {token}"))
485        .header("Content-Type", "application/json")
486        .send(body.as_bytes())
487        .map_err(|e| format!("close_run POST failed: {e}"))?;
488
489    let status = response.status();
490    if status != 200 {
491        return Err(format!("close_run received HTTP {status}: expected 200"));
492    }
493
494    Ok(())
495}
496
497// ---------------------------------------------------------------------------
498// Unit tests
499// ---------------------------------------------------------------------------
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn test_creds_expiring_soon_far_future() {
507        let ctx = RunContext {
508            run_id: "test".to_string(),
509            upload_uri_prefix: "s3://b/p".to_string(),
510            credentials: UploadCredentials {
511                access_key_id: "k".to_string(),
512                secret_access_key: "s".to_string(),
513                session_token: "t".to_string(),
514                expires_at: "2099-01-01T00:00:00Z".to_string(),
515            },
516        };
517        assert!(!ctx.creds_expiring_soon());
518    }
519
520    #[test]
521    fn test_creds_expiring_soon_past() {
522        let ctx = RunContext {
523            run_id: "test".to_string(),
524            upload_uri_prefix: "s3://b/p".to_string(),
525            credentials: UploadCredentials {
526                access_key_id: "k".to_string(),
527                secret_access_key: "s".to_string(),
528                session_token: "t".to_string(),
529                expires_at: "1970-01-01T00:00:00Z".to_string(),
530            },
531        };
532        assert!(ctx.creds_expiring_soon());
533    }
534
535    #[test]
536    fn test_creds_expiring_soon_unparseable() {
537        let ctx = RunContext {
538            run_id: "test".to_string(),
539            upload_uri_prefix: "s3://b/p".to_string(),
540            credentials: UploadCredentials {
541                access_key_id: "k".to_string(),
542                secret_access_key: "s".to_string(),
543                session_token: "t".to_string(),
544                expires_at: "not-a-date".to_string(),
545            },
546        };
547        // Unparseable expires_at treated as already expired.
548        assert!(ctx.creds_expiring_soon());
549    }
550
551    #[test]
552    fn test_days_since_epoch_known_dates() {
553        assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
554        assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
555        assert_eq!(days_since_epoch(2026, 4, 1), Some(20544));
556    }
557
558    #[test]
559    fn test_parse_iso8601_secs_known() {
560        // 2026-04-01T00:00:00Z = 20544 days * 86400
561        assert_eq!(
562            parse_iso8601_secs("2026-04-01T00:00:00Z"),
563            Some(20544 * 86400)
564        );
565    }
566
567    // T-EOR-01: close_run POSTs to /runs/{run_id}/finish with the correct shape.
568    // Verifies: run_id in URL (not body), data_source=inline, data_csv present,
569    // no "s3" field, run_status and exit_code present.
570    #[test]
571    fn test_close_run_posts_to_finish_endpoint() {
572        // follow schema guidance of https://api.sentinel.sparecores.net/docs#/Resource%20Tracker/finish_run_runs__run_id__finish_post to PASS
573        use std::io::{Read as _, Write as _};
574        use std::sync::mpsc;
575        use std::time::Duration;
576
577        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
578        let port = listener.local_addr().unwrap().port();
579        let (tx, rx) = mpsc::channel::<Vec<u8>>();
580
581        std::thread::spawn(move || {
582            if let Ok((mut stream, _)) = listener.accept() {
583                // Read headers + body in a loop until Content-Length bytes are present.
584                let mut buf = Vec::<u8>::new();
585                let mut tmp = [0u8; 4096];
586                loop {
587                    let n = stream.read(&mut tmp).unwrap_or(0);
588                    if n == 0 {
589                        break;
590                    }
591                    buf.extend_from_slice(&tmp[..n]);
592                    // Find header/body separator.
593                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
594                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
595                        let cl = header_str
596                            .lines()
597                            .find_map(|l| {
598                                l.trim()
599                                    .strip_prefix("content-length:")
600                                    .and_then(|v| v.trim().parse::<usize>().ok())
601                            })
602                            .unwrap_or(0);
603                        if buf.len() >= sep + 4 + cl {
604                            break;
605                        }
606                    }
607                }
608                tx.send(buf).ok();
609                stream
610                    .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n{}")
611                    .ok();
612            }
613        });
614
615        let agent = ureq::config::Config::builder()
616            .timeout_global(Some(Duration::from_secs(30)))
617            .build()
618            .new_agent();
619
620        let ctx = RunContext {
621            run_id: "run-abc-999".to_string(),
622            upload_uri_prefix: "s3://b/p".to_string(),
623            credentials: UploadCredentials {
624                access_key_id: "k".to_string(),
625                secret_access_key: "s".to_string(),
626                session_token: "t".to_string(),
627                expires_at: "2099-01-01T00:00:00Z".to_string(),
628            },
629        };
630
631        let result = close_run(
632            &agent,
633            &format!("http://127.0.0.1:{port}"),
634            "test-token",
635            &ctx,
636            Some(0),
637            Some("header\nrow1\n".to_string()),
638            &[], // no S3 uploads → inline route
639        );
640        assert!(result.is_ok(), "close_run failed: {result:?}");
641
642        let raw = rx.recv().expect("mock server did not receive request");
643        let raw_str = String::from_utf8_lossy(&raw);
644
645        // run_id belongs in the URL path, not the JSON body.
646        assert!(
647            raw_str.contains("/runs/run-abc-999/finish"),
648            "URL must include /runs/{{run_id}}/finish: {raw_str}"
649        );
650        assert!(
651            !raw_str.contains("\"run_id\""),
652            "run_id must not appear in the JSON body: {raw_str}"
653        );
654
655        // data_source must be "inline"; no "s3" field anywhere.
656        assert!(
657            raw_str.contains("\"data_source\":\"inline\""),
658            "expected data_source=inline in body: {raw_str}"
659        );
660        assert!(
661            !raw_str.contains("\"s3\""),
662            "s3 must not appear in body: {raw_str}"
663        );
664
665        // data_csv must be present as a raw CSV string (not base64-encoded).
666        assert!(
667            raw_str.contains("\"data_csv\""),
668            "data_csv absent from body: {raw_str}"
669        );
670        assert!(
671            raw_str.contains("header"),
672            "data_csv must contain raw CSV content (not base64): {raw_str}"
673        );
674
675        // finished_at must be present as an ISO 8601 UTC timestamp.
676        assert!(
677            raw_str.contains("\"finished_at\""),
678            "finished_at absent from body: {raw_str}"
679        );
680
681        // run_status and exit_code must be present.
682        assert!(
683            raw_str.contains("\"run_status\":\"finished\""),
684            "run_status absent or wrong: {raw_str}"
685        );
686        assert!(
687            raw_str.contains("\"exit_code\":0"),
688            "exit_code absent or wrong: {raw_str}"
689        );
690    }
691
692    // T-EOR-02: close_run body does NOT contain run_id (it is already in the URL path).
693    #[test]
694    fn test_close_run_request_omits_run_id() {
695        let req = CloseRunInlineRequest {
696            exit_code: Some(0),
697            run_status: "finished",
698            finished_at: Some("2026-04-03T12:00:00Z".to_string()),
699            data_source: "inline",
700            data_csv: "timestamp,cpu\n1000,42\n".to_string(),
701        };
702        let json = serde_json::to_string(&req).expect("serialize failed");
703        assert!(
704            !json.contains("\"run_id\""),
705            "run_id must not appear in close_run body (it is in the URL): {json}"
706        );
707    }
708
709    // T-EOR-03: data_source is "inline", data_csv is raw CSV (not base64), finished_at present.
710    #[test]
711    fn test_close_run_data_source_inline() {
712        let raw_csv = "timestamp,cpu\n1000,42\n";
713        let req = CloseRunInlineRequest {
714            exit_code: Some(0),
715            run_status: "finished",
716            finished_at: Some("2026-04-03T12:00:00Z".to_string()),
717            data_source: "inline",
718            data_csv: raw_csv.to_string(),
719        };
720        let json = serde_json::to_string(&req).expect("serialize failed");
721        assert!(
722            json.contains("\"data_source\":\"inline\""),
723            "data_source is not 'inline': {json}"
724        );
725        // data_csv must be the raw string -- not base64.
726        assert!(
727            json.contains("timestamp"),
728            "data_csv must contain raw CSV content (not base64): {json}"
729        );
730        assert!(
731            json.contains("\"finished_at\":\"2026-04-03T12:00:00Z\""),
732            "finished_at absent or wrong: {json}"
733        );
734    }
735
736    // base64_encode: RFC 4648 test vectors (Section 10).
737    #[test]
738    fn test_base64_encode_rfc4648_vectors() {
739        assert_eq!(base64_encode(b""), "");
740        assert_eq!(base64_encode(b"f"), "Zg==");
741        assert_eq!(base64_encode(b"fo"), "Zm8=");
742        assert_eq!(base64_encode(b"foo"), "Zm9v");
743        assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
744        assert_eq!(base64_encode(b"fooba"), "Zm9vYmE=");
745        assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
746    }
747
748    // base64_encode: encoding and decoding round-trip is valid base64.
749    #[test]
750    fn test_base64_encode_csv_roundtrip() {
751        let csv = "timestamp,value\n1000,42\n1001,99\n";
752        let encoded = base64_encode(csv.as_bytes());
753        // All chars in the output must be valid base64 characters.
754        encoded.chars().for_each(|c| {
755            assert!(
756                c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=',
757                "invalid base64 char '{c}' in: {encoded}"
758            );
759        });
760    }
761
762    // days_since_epoch: invalid month/day/year return None.
763    #[test]
764    fn test_days_since_epoch_invalid_inputs() {
765        assert_eq!(days_since_epoch(1970, 0, 1), None, "month 0 is invalid");
766        assert_eq!(days_since_epoch(1970, 13, 1), None, "month 13 is invalid");
767        assert_eq!(days_since_epoch(1970, 1, 0), None, "day 0 is invalid");
768        assert_eq!(days_since_epoch(1970, 1, 32), None, "day 32 is invalid");
769        assert_eq!(
770            days_since_epoch(1969, 12, 31),
771            None,
772            "year before 1970 is invalid"
773        );
774    }
775
776    // parse_iso8601_secs: +00:00 suffix is handled the same as Z.
777    #[test]
778    fn test_parse_iso8601_secs_with_utc_offset() {
779        let with_z = parse_iso8601_secs("2026-04-01T00:00:00Z");
780        let with_plus = parse_iso8601_secs("2026-04-01T00:00:00+00:00");
781        assert_eq!(
782            with_z, with_plus,
783            "+00:00 and Z must parse to the same timestamp"
784        );
785    }
786
787    // parse_iso8601_secs: fewer than 6 numeric components returns None.
788    #[test]
789    fn test_parse_iso8601_secs_too_few_components() {
790        assert_eq!(
791            parse_iso8601_secs("2026-04-01"),
792            None,
793            "date-only string must return None"
794        );
795        assert_eq!(parse_iso8601_secs("not-a-date"), None);
796    }
797
798    // slice_is_empty helper is used by serde skip_serializing_if.
799    #[test]
800    fn test_slice_is_empty_helper() {
801        let empty: &[String] = &[];
802        let nonempty: &[String] = &["tag".to_string()];
803        assert!(slice_is_empty(&&*empty), "empty slice should return true");
804        assert!(
805            !slice_is_empty(&&*nonempty),
806            "nonempty slice should return false"
807        );
808    }
809
810    // unix_secs_to_iso8601: known Unix timestamps produce the expected ISO 8601 string.
811    #[test]
812    fn test_unix_secs_to_iso8601_known_values() {
813        // Unix epoch itself.
814        assert_eq!(unix_secs_to_iso8601(0), "1970-01-01T00:00:00Z");
815        // 2000-01-01T00:00:00Z = 946684800 s
816        assert_eq!(unix_secs_to_iso8601(946684800), "2000-01-01T00:00:00Z");
817        // 2026-04-03T00:00:00Z -- today's date from context; days = 56*365 + 14 leap days + 92
818        // Easier to round-trip via parse_iso8601_secs.
819        let secs = parse_iso8601_secs("2026-04-03T15:30:45Z").expect("parse failed");
820        assert_eq!(unix_secs_to_iso8601(secs), "2026-04-03T15:30:45Z");
821    }
822
823    // unix_secs_to_iso8601: leap-day boundary (2000-02-29 exists, 1900-02-29 did not
824    // but we only go back to 1970 so just verify 2000-02-29 round-trips).
825    #[test]
826    fn test_unix_secs_to_iso8601_leap_day() {
827        let secs = parse_iso8601_secs("2000-02-29T12:00:00Z").expect("parse failed");
828        assert_eq!(unix_secs_to_iso8601(secs), "2000-02-29T12:00:00Z");
829    }
830
831    // now_iso8601: returns a non-empty string that parses back successfully.
832    #[test]
833    fn test_now_iso8601_parses() {
834        let s = now_iso8601();
835        assert!(!s.is_empty(), "now_iso8601 must not be empty");
836        assert!(s.ends_with('Z'), "now_iso8601 must end with Z: {s}");
837        let secs = parse_iso8601_secs(&s);
838        assert!(secs.is_some(), "now_iso8601 output must parse back: {s}");
839    }
840
841    // T-EOR-06: finished_at is omitted from the JSON when set to None.
842    #[test]
843    fn test_close_run_finished_at_omitted_when_none() {
844        let req = CloseRunInlineRequest {
845            exit_code: None,
846            run_status: "finished",
847            finished_at: None,
848            data_source: "inline",
849            data_csv: "".to_string(),
850        };
851        let json = serde_json::to_string(&req).expect("serialize failed");
852        assert!(
853            !json.contains("\"finished_at\""),
854            "finished_at must be absent when None: {json}"
855        );
856    }
857
858    // ---------------------------------------------------------------------------
859    // Spec-driven tests for /runs/{run_id}/finish (RunFinishInline schema).
860    // See https://api.sentinel.sparecores.net/openapi.json -- RunFinishInline.
861    // Required fields: data_source (const "inline"), data_csv (raw string).
862    // Optional fields: exit_code (int|null), run_status ("finished"|"failed"),
863    //                  finished_at (date-time string|null).
864    // additionalProperties: false -- no extra fields allowed.
865    // ---------------------------------------------------------------------------
866
867    // Build a realistic RunFinishResponse JSON body, mirroring the real Sentinel API
868    // response shape (RunFinishResponse = {run: RunResponse, processing: RunFinishProcessing}).
869    // The real API's 200 response always has both fields; "processing.status" is required.
870    fn finish_response_json(run_id: &str, run_status: &str, exit_code: Option<i32>) -> String {
871        let exit_code_field = match exit_code {
872            Some(c) => format!(",\"exit_code\":{c}"),
873            None => String::new(),
874        };
875        format!(
876            r#"{{"run":{{"run_id":"{run_id}","created_at":"2026-04-03T10:00:00Z","heartbeat_at":"2026-04-03T10:00:30Z","finished_at":"2026-04-03T10:01:00Z","run_status":"{run_status}","tag_count":0,"tags":[]{exit_code_field}}},"processing":{{"status":"ok","rows":1,"files":null,"duration_ms":5.0,"error":null}}}}"#
877        )
878    }
879
880    // Helper: spin up a mock TCP server that mimics the real Sentinel /finish endpoint:
881    // reads the full request, validates presence of required fields, returns a proper
882    // RunFinishResponse JSON on success (200) or a 422 JSON error if required fields
883    // are missing.  Returns the parsed request body JSON for assertions.
884    fn capture_close_run_body(exit_code: Option<i32>, csv: Option<&str>) -> String {
885        use std::io::{Read as _, Write as _};
886        use std::sync::mpsc;
887        use std::time::Duration;
888
889        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
890        let port = listener.local_addr().unwrap().port();
891        let (tx, rx) = mpsc::channel::<Vec<u8>>();
892
893        std::thread::spawn(move || {
894            if let Ok((mut stream, _)) = listener.accept() {
895                // Prevent the read loop from blocking indefinitely: if the client
896                // keeps the connection half-open (send done, waiting for response)
897                // stream.read() would stall until ureq's 30-second timeout fires,
898                // causing rx.recv() to block for the same duration.  A 5-second
899                // read timeout unblocks the loop promptly under any failure mode.
900                stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
901                let mut buf = Vec::<u8>::new();
902                let mut tmp = [0u8; 4096];
903                loop {
904                    let n = stream.read(&mut tmp).unwrap_or(0);
905                    if n == 0 {
906                        break;
907                    }
908                    buf.extend_from_slice(&tmp[..n]);
909                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
910                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
911                        let cl = header_str
912                            .lines()
913                            .find_map(|l| {
914                                l.trim()
915                                    .strip_prefix("content-length:")
916                                    .and_then(|v| v.trim().parse::<usize>().ok())
917                            })
918                            .unwrap_or(0);
919                        if buf.len() >= sep + 4 + cl {
920                            break;
921                        }
922                    }
923                }
924                // Parse the body and decide: 200 with RunFinishResponse, or 422.
925                let body_start = buf
926                    .windows(4)
927                    .position(|w| w == b"\r\n\r\n")
928                    .map(|p| p + 4)
929                    .unwrap_or(buf.len());
930                let body_str = String::from_utf8_lossy(&buf[body_start..]);
931                let parsed: serde_json::Value =
932                    serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
933                // Real API requires data_source and data_csv for RunFinishInline.
934                let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("inline")
935                    && parsed.get("data_csv").is_some();
936                let (status_line, resp_body) = if valid {
937                    let run_status = parsed
938                        .get("run_status")
939                        .and_then(|v| v.as_str())
940                        .unwrap_or("finished");
941                    let ec = parsed
942                        .get("exit_code")
943                        .and_then(|v| v.as_i64())
944                        .map(|v| v as i32);
945                    (
946                        "HTTP/1.1 200 OK",
947                        finish_response_json("run-spec-test", run_status, ec),
948                    )
949                } else {
950                    (
951                        "HTTP/1.1 422 Unprocessable Entity",
952                        r#"{"detail":[{"loc":["body"],"msg":"field required","type":"value_error.missing"}]}"#.to_string(),
953                    )
954                };
955                tx.send(buf).ok();
956                let http = format!(
957                    "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
958                    resp_body.len(),
959                    resp_body
960                );
961                stream.write_all(http.as_bytes()).ok();
962            }
963        });
964
965        let agent = ureq::config::Config::builder()
966            .timeout_global(Some(Duration::from_secs(30)))
967            .build()
968            .new_agent();
969        let ctx = RunContext {
970            run_id: "run-spec-test".to_string(),
971            upload_uri_prefix: "s3://b/p".to_string(),
972            credentials: UploadCredentials {
973                access_key_id: "k".to_string(),
974                secret_access_key: "s".to_string(),
975                session_token: "t".to_string(),
976                expires_at: "2099-01-01T00:00:00Z".to_string(),
977            },
978        };
979        let _ = close_run(
980            &agent,
981            &format!("http://127.0.0.1:{port}"),
982            "token",
983            &ctx,
984            exit_code,
985            csv.map(String::from),
986            &[], // no S3 uploads → inline route
987        );
988        let raw = rx.recv().expect("mock server did not capture request");
989        // Extract the JSON body (everything after \r\n\r\n).
990        let body_start = raw
991            .windows(4)
992            .position(|w| w == b"\r\n\r\n")
993            .map(|p| p + 4)
994            .unwrap_or(0);
995        String::from_utf8_lossy(&raw[body_start..]).to_string()
996    }
997
998    // T-FIN-01: run_status is "finished" when exit_code is 0 (clean success).
999    #[test]
1000    fn test_close_run_run_status_finished_for_zero_exit() {
1001        let body = capture_close_run_body(Some(0), None);
1002        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1003        assert_eq!(
1004            v["run_status"], "finished",
1005            "run_status must be 'finished' for exit_code=0: {body}"
1006        );
1007        assert_eq!(v["exit_code"], 0, "exit_code must be 0 in payload: {body}");
1008    }
1009
1010    // T-FIN-02: run_status is "finished" when exit_code is None (SIGTERM shutdown).
1011    #[test]
1012    fn test_close_run_run_status_finished_for_sigterm() {
1013        let body = capture_close_run_body(None, None);
1014        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1015        assert_eq!(
1016            v["run_status"], "finished",
1017            "run_status must be 'finished' when exit_code is None (SIGTERM): {body}"
1018        );
1019        // exit_code is skipped when None -- it must not appear in the payload.
1020        assert!(
1021            v.get("exit_code").is_none(),
1022            "exit_code must be absent when None (spec: optional integer): {body}"
1023        );
1024    }
1025
1026    // T-FIN-03: run_status is "failed" for any non-zero exit code.
1027    #[test]
1028    fn test_close_run_run_status_failed_for_nonzero_exit() {
1029        for code in [1, 2, 127, 130, 255] {
1030            let body = capture_close_run_body(Some(code), None);
1031            let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1032            assert_eq!(
1033                v["run_status"], "failed",
1034                "run_status must be 'failed' for exit_code={code}: {body}"
1035            );
1036        }
1037    }
1038
1039    // T-FIN-04: data_csv is a raw CSV string, not base64.
1040    // Spec: data_csv type is string, description "Raw CSV string".
1041    #[test]
1042    fn test_close_run_data_csv_is_raw_csv_not_base64() {
1043        let raw_csv = "timestamp,cpu_pct\n1743638400,42.5\n1743638401,44.0\n";
1044        let body = capture_close_run_body(Some(0), Some(raw_csv));
1045        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1046        let data_csv = v["data_csv"].as_str().expect("data_csv must be a string");
1047        assert!(
1048            data_csv.contains("timestamp"),
1049            "data_csv must be raw CSV (contains header row): {data_csv}"
1050        );
1051        assert!(
1052            data_csv.contains("42.5"),
1053            "data_csv must be raw CSV (contains data values): {data_csv}"
1054        );
1055        // A base64-encoded CSV would not contain commas or newlines.
1056        assert!(
1057            data_csv.contains(','),
1058            "data_csv must contain CSV commas (not base64): {data_csv}"
1059        );
1060    }
1061
1062    // T-FIN-05: finished_at is present and parses as a valid ISO 8601 UTC timestamp.
1063    // Spec: finished_at is an optional date-time field.
1064    #[test]
1065    fn test_close_run_finished_at_is_valid_iso8601() {
1066        let body = capture_close_run_body(Some(0), None);
1067        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1068        let fa = v["finished_at"]
1069            .as_str()
1070            .expect("finished_at must be a string");
1071        assert!(fa.ends_with('Z'), "finished_at must end with Z (UTC): {fa}");
1072        let secs = parse_iso8601_secs(fa);
1073        assert!(
1074            secs.is_some(),
1075            "finished_at must be a parseable ISO 8601 timestamp: {fa}"
1076        );
1077        // Must be recent -- within a few seconds of now.
1078        let now = std::time::SystemTime::now()
1079            .duration_since(std::time::UNIX_EPOCH)
1080            .unwrap_or_default()
1081            .as_secs();
1082        let diff = now.abs_diff(secs.unwrap());
1083        assert!(
1084            diff < 60,
1085            "finished_at must be close to current time (diff={diff}s): {fa}"
1086        );
1087    }
1088
1089    // T-FIN-06: close_run handles a realistic RunFinishResponse JSON without error.
1090    // Spec: 200 response body is RunFinishResponse {run, processing}.
1091    #[test]
1092    fn test_close_run_handles_valid_run_finish_response() {
1093        use std::io::{Read as _, Write as _};
1094        use std::time::Duration;
1095
1096        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1097        let port = listener.local_addr().unwrap().port();
1098
1099        // Minimal valid RunFinishResponse per OpenAPI spec.
1100        let response_body = r#"{
1101            "run": {
1102                "run_id": "01959e3a-0001-0000-0000-000000000000",
1103                "created_at": "2026-04-03T10:00:00Z",
1104                "heartbeat_at": "2026-04-03T10:00:30Z",
1105                "finished_at": "2026-04-03T10:01:00Z",
1106                "run_status": "finished",
1107                "tag_count": 0,
1108                "tags": []
1109            },
1110            "processing": {
1111                "status": "ok",
1112                "rows": 60,
1113                "files": null,
1114                "duration_ms": 12.5,
1115                "error": null
1116            }
1117        }"#;
1118        let http_response = format!(
1119            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1120            response_body.len(),
1121            response_body
1122        );
1123
1124        std::thread::spawn(move || {
1125            if let Ok((mut stream, _)) = listener.accept() {
1126                let mut tmp = [0u8; 4096];
1127                let _ = stream.read(&mut tmp);
1128                stream.write_all(http_response.as_bytes()).ok();
1129            }
1130        });
1131
1132        let agent = ureq::config::Config::builder()
1133            .timeout_global(Some(Duration::from_secs(30)))
1134            .build()
1135            .new_agent();
1136        let ctx = RunContext {
1137            run_id: "01959e3a-0001-0000-0000-000000000000".to_string(),
1138            upload_uri_prefix: "s3://b/p".to_string(),
1139            credentials: UploadCredentials {
1140                access_key_id: "k".to_string(),
1141                secret_access_key: "s".to_string(),
1142                session_token: "t".to_string(),
1143                expires_at: "2099-01-01T00:00:00Z".to_string(),
1144            },
1145        };
1146        let result = close_run(
1147            &agent,
1148            &format!("http://127.0.0.1:{port}"),
1149            "test-token",
1150            &ctx,
1151            Some(0),
1152            Some("timestamp,cpu_pct\n1743638400,42.5\n".to_string()),
1153            &[], // no S3 uploads → inline route
1154        );
1155        assert!(
1156            result.is_ok(),
1157            "close_run must succeed for a 200 response: {result:?}"
1158        );
1159    }
1160
1161    // T-FIN-07: no extra fields are sent beyond what the spec allows
1162    // (RunFinishInline has additionalProperties: false).
1163    #[test]
1164    fn test_close_run_no_extra_fields_in_payload() {
1165        let body = capture_close_run_body(Some(0), Some("ts,v\n1,2\n"));
1166        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1167        let obj = v.as_object().expect("payload must be a JSON object");
1168        let allowed: std::collections::HashSet<&str> = [
1169            "exit_code",
1170            "run_status",
1171            "finished_at",
1172            "data_source",
1173            "data_csv",
1174        ]
1175        .iter()
1176        .copied()
1177        .collect();
1178        for key in obj.keys() {
1179            assert!(
1180                allowed.contains(key.as_str()),
1181                "unexpected field '{key}' in payload -- not allowed by RunFinishInline schema (additionalProperties: false)"
1182            );
1183        }
1184        // Required fields must always be present.
1185        assert!(obj.contains_key("data_source"), "data_source is required");
1186        assert!(obj.contains_key("data_csv"), "data_csv is required");
1187    }
1188
1189    // ---------------------------------------------------------------------------
1190    // S3 route tests for close_run (RunFinishS3 schema variant).
1191    // ---------------------------------------------------------------------------
1192
1193    // Helper: call close_run with S3 URIs, return the captured request body JSON.
1194    fn capture_close_run_s3_body(exit_code: Option<i32>, uris: &[&str]) -> String {
1195        use std::io::{Read as _, Write as _};
1196        use std::sync::mpsc;
1197        use std::time::Duration;
1198
1199        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1200        let port = listener.local_addr().unwrap().port();
1201        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1202
1203        std::thread::spawn(move || {
1204            if let Ok((mut stream, _)) = listener.accept() {
1205                stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
1206                let mut buf = Vec::<u8>::new();
1207                let mut tmp = [0u8; 4096];
1208                loop {
1209                    let n = stream.read(&mut tmp).unwrap_or(0);
1210                    if n == 0 {
1211                        break;
1212                    }
1213                    buf.extend_from_slice(&tmp[..n]);
1214                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1215                        let hdr = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1216                        let cl = hdr
1217                            .lines()
1218                            .find_map(|l| {
1219                                l.trim()
1220                                    .strip_prefix("content-length:")
1221                                    .and_then(|v| v.trim().parse::<usize>().ok())
1222                            })
1223                            .unwrap_or(0);
1224                        if buf.len() >= sep + 4 + cl {
1225                            break;
1226                        }
1227                    }
1228                }
1229                // Real API: 200 RunFinishResponse for a valid S3 payload.
1230                let body_start = buf
1231                    .windows(4)
1232                    .position(|w| w == b"\r\n\r\n")
1233                    .map(|p| p + 4)
1234                    .unwrap_or(buf.len());
1235                let body_str = String::from_utf8_lossy(&buf[body_start..]);
1236                let parsed: serde_json::Value =
1237                    serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
1238                let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("s3")
1239                    && parsed.get("data_uris").is_some();
1240                let run_status = parsed
1241                    .get("run_status")
1242                    .and_then(|v| v.as_str())
1243                    .unwrap_or("finished");
1244                let ec = parsed
1245                    .get("exit_code")
1246                    .and_then(|v| v.as_i64())
1247                    .map(|v| v as i32);
1248                let (status_line, resp_body) = if valid {
1249                    (
1250                        "HTTP/1.1 200 OK",
1251                        finish_response_json("run-s3-test", run_status, ec),
1252                    )
1253                } else {
1254                    (
1255                        "HTTP/1.1 422 Unprocessable Entity",
1256                        r#"{"detail":[{"msg":"field required","type":"value_error.missing"}]}"#
1257                            .to_string(),
1258                    )
1259                };
1260                tx.send(buf).ok();
1261                let http = format!(
1262                    "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1263                    resp_body.len(),
1264                    resp_body
1265                );
1266                stream.write_all(http.as_bytes()).ok();
1267            }
1268        });
1269
1270        let agent = ureq::config::Config::builder()
1271            .timeout_global(Some(Duration::from_secs(30)))
1272            .build()
1273            .new_agent();
1274        let ctx = RunContext {
1275            run_id: "run-s3-test".to_string(),
1276            upload_uri_prefix: "s3://b/p".to_string(),
1277            credentials: UploadCredentials {
1278                access_key_id: "k".to_string(),
1279                secret_access_key: "s".to_string(),
1280                session_token: "t".to_string(),
1281                expires_at: "2099-01-01T00:00:00Z".to_string(),
1282            },
1283        };
1284        let uploaded: Vec<String> = uris.iter().map(|s| (*s).to_string()).collect();
1285        let _ = close_run(
1286            &agent,
1287            &format!("http://127.0.0.1:{port}"),
1288            "token",
1289            &ctx,
1290            exit_code,
1291            None, // remaining_csv unused in S3 route
1292            &uploaded,
1293        );
1294        let raw = rx.recv().expect("mock server did not capture request");
1295        let body_start = raw
1296            .windows(4)
1297            .position(|w| w == b"\r\n\r\n")
1298            .map(|p| p + 4)
1299            .unwrap_or(0);
1300        String::from_utf8_lossy(&raw[body_start..]).to_string()
1301    }
1302
1303    // T-S3R-01: when uploaded_uris is non-empty, data_source is "s3" and
1304    // data_uris contains the URIs (RunFinishS3 schema variant).
1305    #[test]
1306    fn test_close_run_uses_s3_route_when_uris_present() {
1307        let uris = &[
1308            "s3://my-bucket/prefix/run-abc/000000.csv.gz",
1309            "s3://my-bucket/prefix/run-abc/000001.csv.gz",
1310        ];
1311        let body = capture_close_run_s3_body(Some(0), uris);
1312        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1313        assert_eq!(
1314            v["data_source"], "s3",
1315            "data_source must be 's3' when URIs present: {body}"
1316        );
1317        let arr = v["data_uris"]
1318            .as_array()
1319            .expect("data_uris must be a JSON array");
1320        assert_eq!(arr.len(), 2, "data_uris must have 2 elements: {body}");
1321        assert_eq!(arr[0], "s3://my-bucket/prefix/run-abc/000000.csv.gz");
1322        assert_eq!(arr[1], "s3://my-bucket/prefix/run-abc/000001.csv.gz");
1323        // data_csv must NOT be present in the S3 route (additionalProperties: false).
1324        assert!(
1325            !body.contains("\"data_csv\""),
1326            "data_csv must be absent in S3 route: {body}"
1327        );
1328    }
1329
1330    // T-S3R-02: S3 route payload contains no extra fields beyond RunFinishS3 schema.
1331    // RunFinishS3 has additionalProperties: false.
1332    #[test]
1333    fn test_close_run_s3_no_extra_fields() {
1334        let uris = &["s3://bucket/prefix/run/000000.csv.gz"];
1335        let body = capture_close_run_s3_body(Some(0), uris);
1336        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1337        let obj = v.as_object().expect("payload must be a JSON object");
1338        let allowed: std::collections::HashSet<&str> = [
1339            "exit_code",
1340            "run_status",
1341            "finished_at",
1342            "data_source",
1343            "data_uris",
1344        ]
1345        .iter()
1346        .copied()
1347        .collect();
1348        for key in obj.keys() {
1349            assert!(
1350                allowed.contains(key.as_str()),
1351                "unexpected field '{key}' in S3 route payload (additionalProperties: false): {body}"
1352            );
1353        }
1354        assert!(
1355            obj.contains_key("data_source"),
1356            "data_source is required in S3 route"
1357        );
1358        assert!(
1359            obj.contains_key("data_uris"),
1360            "data_uris is required in S3 route"
1361        );
1362    }
1363
1364    // T-S3R-03: when uploaded_uris is empty, data_source is "inline" (not "s3")
1365    // and data_csv is present.  Confirms route dispatch is based on uploaded_uris.
1366    #[test]
1367    fn test_close_run_uses_inline_route_when_no_uris() {
1368        let body = capture_close_run_body(Some(0), Some("ts,cpu\n1,2\n"));
1369        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1370        assert_eq!(
1371            v["data_source"], "inline",
1372            "data_source must be 'inline' when no URIs: {body}"
1373        );
1374        assert!(
1375            v.get("data_csv").is_some(),
1376            "data_csv must be present for inline route: {body}"
1377        );
1378        assert!(
1379            v.get("data_uris").is_none(),
1380            "data_uris must be absent for inline route: {body}"
1381        );
1382    }
1383
1384    // T-EOR-05: refresh_credentials updates ctx.credentials in place on success.
1385    #[test]
1386    fn test_refresh_credentials_updates_context() {
1387        use std::io::{Read as _, Write as _};
1388        use std::time::Duration;
1389
1390        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1391        let port = listener.local_addr().unwrap().port();
1392
1393        let response_body = r#"{"upload_credentials":{"access_key":"NEW_AK","secret_key":"NEW_SK","session_token":"NEW_ST","expiration":"2099-06-01T00:00:00Z"}}"#;
1394        let http_response = format!(
1395            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1396            response_body.len(),
1397            response_body
1398        );
1399
1400        std::thread::spawn(move || {
1401            if let Ok((mut stream, _)) = listener.accept() {
1402                let mut tmp = [0u8; 4096];
1403                let _ = stream.read(&mut tmp);
1404                stream.write_all(http_response.as_bytes()).ok();
1405            }
1406        });
1407
1408        let agent = ureq::config::Config::builder()
1409            .timeout_global(Some(Duration::from_secs(30)))
1410            .build()
1411            .new_agent();
1412
1413        let mut ctx = RunContext {
1414            run_id: "run-123".to_string(),
1415            upload_uri_prefix: "s3://b/p".to_string(),
1416            credentials: UploadCredentials {
1417                access_key_id: "OLD_AK".to_string(),
1418                secret_access_key: "OLD_SK".to_string(),
1419                session_token: "OLD_ST".to_string(),
1420                expires_at: "2099-01-01T00:00:00Z".to_string(),
1421            },
1422        };
1423
1424        let result = refresh_credentials(
1425            &agent,
1426            &format!("http://127.0.0.1:{port}"),
1427            "test-token",
1428            &mut ctx,
1429        );
1430        assert!(
1431            result.is_ok(),
1432            "refresh_credentials failed: {:?}",
1433            result.err()
1434        );
1435        assert_eq!(ctx.credentials.access_key_id, "NEW_AK");
1436        assert_eq!(ctx.credentials.secret_access_key, "NEW_SK");
1437        assert_eq!(ctx.credentials.session_token, "NEW_ST");
1438        assert_eq!(ctx.credentials.expires_at, "2099-06-01T00:00:00Z");
1439    }
1440
1441    // T-EOR-04: start_run POSTs to /runs and parses the response correctly.
1442    #[test]
1443    fn test_start_run_posts_to_runs_endpoint() {
1444        use std::io::{Read as _, Write as _};
1445        use std::sync::mpsc;
1446        use std::time::Duration;
1447
1448        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1449        let port = listener.local_addr().unwrap().port();
1450        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1451
1452        // Minimal valid StartRunResponse JSON.
1453        let response_body = r#"{"run_id":"run-xyz","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1454        let http_response = format!(
1455            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1456            response_body.len(),
1457            response_body
1458        );
1459
1460        std::thread::spawn(move || {
1461            if let Ok((mut stream, _)) = listener.accept() {
1462                let mut buf = Vec::<u8>::new();
1463                let mut tmp = [0u8; 4096];
1464                loop {
1465                    let n = stream.read(&mut tmp).unwrap_or(0);
1466                    if n == 0 {
1467                        break;
1468                    }
1469                    buf.extend_from_slice(&tmp[..n]);
1470                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1471                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1472                        let cl = header_str
1473                            .lines()
1474                            .find_map(|l| {
1475                                l.trim()
1476                                    .strip_prefix("content-length:")
1477                                    .and_then(|v| v.trim().parse::<usize>().ok())
1478                            })
1479                            .unwrap_or(0);
1480                        if buf.len() >= sep + 4 + cl {
1481                            break;
1482                        }
1483                    }
1484                }
1485                tx.send(buf).ok();
1486                stream.write_all(http_response.as_bytes()).ok();
1487            }
1488        });
1489
1490        let agent = ureq::config::Config::builder()
1491            .timeout_global(Some(Duration::from_secs(30)))
1492            .build()
1493            .new_agent();
1494
1495        let meta = crate::config::JobMetadata {
1496            job_name: Some("test-job".to_string()),
1497            ..Default::default()
1498        };
1499        let host = crate::metrics::HostInfo::default();
1500        let cloud = crate::metrics::CloudInfo::default();
1501
1502        let result = start_run(
1503            &agent,
1504            &format!("http://127.0.0.1:{port}"),
1505            "test-token",
1506            &meta,
1507            Some(42),
1508            &host,
1509            &cloud,
1510        );
1511        assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1512
1513        let ctx = result.unwrap();
1514        assert_eq!(ctx.run_id, "run-xyz");
1515        assert_eq!(ctx.upload_uri_prefix, "s3://b/p");
1516        assert_eq!(ctx.credentials.access_key_id, "AK");
1517
1518        let raw = rx.recv().expect("mock server did not receive request");
1519        let raw_str = String::from_utf8_lossy(&raw);
1520        assert!(
1521            raw_str.contains("POST /runs"),
1522            "must POST to /runs: {raw_str}"
1523        );
1524        assert!(
1525            raw_str.contains("\"job_name\":\"test-job\""),
1526            "job_name missing: {raw_str}"
1527        );
1528        // `pid` is intentionally absent -- the RunCreate schema does not accept it.
1529        assert!(
1530            !raw_str.contains("\"pid\""),
1531            "pid must not appear in the payload: {raw_str}"
1532        );
1533    }
1534
1535    // T-CMD-01: when start_run is called with a non-empty command (shell-wrapper mode),
1536    // the JSON body must contain a "command" array matching the wrapped command tokens.
1537    // This mirrors: SENTINEL_API_TOKEN=... resource-tracker --output /tmp/log \
1538    //   stress --cpu 4 --vm 1 --vm-bytes 12024M --timeout 63s
1539    #[test]
1540    fn test_start_run_includes_command_array_in_payload() {
1541        use std::io::{Read as _, Write as _};
1542        use std::sync::mpsc;
1543        use std::time::Duration;
1544
1545        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1546        let port = listener.local_addr().unwrap().port();
1547        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1548
1549        let response_body = r#"{"run_id":"r1","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1550        let http_response = format!(
1551            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1552            response_body.len(),
1553            response_body
1554        );
1555
1556        std::thread::spawn(move || {
1557            if let Ok((mut stream, _)) = listener.accept() {
1558                let mut buf = Vec::<u8>::new();
1559                let mut tmp = [0u8; 4096];
1560                loop {
1561                    let n = stream.read(&mut tmp).unwrap_or(0);
1562                    if n == 0 {
1563                        break;
1564                    }
1565                    buf.extend_from_slice(&tmp[..n]);
1566                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1567                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1568                        let cl = header_str
1569                            .lines()
1570                            .find_map(|l| {
1571                                l.trim()
1572                                    .strip_prefix("content-length:")
1573                                    .and_then(|v| v.trim().parse::<usize>().ok())
1574                            })
1575                            .unwrap_or(0);
1576                        if buf.len() >= sep + 4 + cl {
1577                            break;
1578                        }
1579                    }
1580                }
1581                tx.send(buf).ok();
1582                stream.write_all(http_response.as_bytes()).ok();
1583            }
1584        });
1585
1586        let agent = ureq::config::Config::builder()
1587            .timeout_global(Some(Duration::from_secs(30)))
1588            .build()
1589            .new_agent();
1590
1591        // Simulate: resource-tracker --output /tmp/resource-tracker-logs \
1592        //   stress --cpu 4 --vm 1 --vm-bytes 12024M --timeout 63s
1593        let wrapped_command: Vec<String> = vec![
1594            "stress",
1595            "--cpu",
1596            "4",
1597            "--vm",
1598            "1",
1599            "--vm-bytes",
1600            "12024M",
1601            "--timeout",
1602            "63s",
1603        ]
1604        .into_iter()
1605        .map(String::from)
1606        .collect();
1607
1608        let meta = crate::config::JobMetadata {
1609            command: wrapped_command.clone(),
1610            ..Default::default()
1611        };
1612        let host = crate::metrics::HostInfo::default();
1613        let cloud = crate::metrics::CloudInfo::default();
1614
1615        let result = start_run(
1616            &agent,
1617            &format!("http://127.0.0.1:{port}"),
1618            "test-token",
1619            &meta,
1620            None,
1621            &host,
1622            &cloud,
1623        );
1624        assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1625
1626        let raw = rx.recv().expect("mock server did not receive request");
1627        let raw_str = String::from_utf8_lossy(&raw);
1628
1629        // The RunCreate schema declares `command` as `string | null` ("JSON array encoded
1630        // in TEXT"), so the body must contain the command as a JSON-encoded string, not a
1631        // bare JSON array.
1632        let expected = r#""command":"[\"stress\",\"--cpu\",\"4\",\"--vm\",\"1\",\"--vm-bytes\",\"12024M\",\"--timeout\",\"63s\"]""#;
1633        assert!(
1634            raw_str.contains(expected),
1635            "command JSON string not found in payload.\nExpected: {expected}\nGot body: {raw_str}"
1636        );
1637    }
1638
1639    // T-CMD-02: when start_run is called without a wrapped command (standalone mode),
1640    // the "command" field must be absent from the JSON body.
1641    #[test]
1642    fn test_start_run_omits_command_when_standalone() {
1643        let req_payload = MetadataPayload {
1644            job_name: None,
1645            project_name: None,
1646            stage_name: None,
1647            task_name: None,
1648            team: None,
1649            env: None,
1650            language: None,
1651            orchestrator: None,
1652            executor: None,
1653            external_run_id: None,
1654            container_image: None,
1655            tags: &[],
1656            command: None, // empty = standalone mode
1657        };
1658        let json = serde_json::to_string(&req_payload).expect("serialize failed");
1659        assert!(
1660            !json.contains("\"command\""),
1661            "command must be absent from payload in standalone mode: {json}"
1662        );
1663    }
1664
1665    // ---------------------------------------------------------------------------
1666    // Integration test: hits the REAL Sentinel API.
1667    //
1668    // Requires:
1669    //   SENTINEL_API_TOKEN  -- a valid bearer token
1670    //   SENTINEL_API_BASE   -- optional; defaults to https://api.sentinel.sparecores.net
1671    //
1672    // Run explicitly (skipped in normal `cargo test`):
1673    //   cargo test test_real_api_finish_run -- --include-ignored
1674    //   SENTINEL_API_TOKEN=<token> cargo test test_real_api_finish_run -- --include-ignored
1675    // ---------------------------------------------------------------------------
1676
1677    // T-INT-01: start_run + close_run against the real API both return Ok(()).
1678    // Verifies end-to-end that:
1679    //   - start_run registers a new run and returns a RunContext with a run_id.
1680    //   - close_run POSTs the correct RunFinishInline payload and receives 200.
1681    // Runs automatically when SENTINEL_API_TOKEN is set; skips otherwise.
1682    #[test]
1683    fn test_real_api_finish_run_returns_ok() {
1684        use crate::config::JobMetadata;
1685        use crate::metrics::{CloudInfo, CpuMetrics, HostInfo, MemoryMetrics, Sample};
1686        use crate::sentinel::upload::samples_to_csv;
1687        use std::time::Duration;
1688
1689        let token = match std::env::var("SENTINEL_API_TOKEN") {
1690            Ok(t) if !t.is_empty() => t,
1691            _ => {
1692                eprintln!("skip: SENTINEL_API_TOKEN not set or empty");
1693                return;
1694            }
1695        };
1696        let api_base = std::env::var("SENTINEL_API_BASE")
1697            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
1698        eprintln!("T-INT-01: using api_base={api_base}");
1699
1700        let agent = ureq::config::Config::builder()
1701            .timeout_global(Some(Duration::from_secs(30)))
1702            .build()
1703            .new_agent();
1704
1705        let metadata = JobMetadata {
1706            job_name: Some("integration-test-close-run".to_string()),
1707            ..Default::default()
1708        };
1709        let host = HostInfo::default();
1710        let cloud = CloudInfo::default();
1711
1712        // Step 1: register a new run.
1713        eprintln!("T-INT-01: calling start_run...");
1714        let ctx = match start_run(&agent, &api_base, &token, &metadata, None, &host, &cloud) {
1715            Ok(c) => {
1716                eprintln!("T-INT-01: start_run ok -- run_id={}", c.run_id);
1717                c
1718            }
1719            Err(e) => panic!("start_run failed: {e}"),
1720        };
1721        assert!(!ctx.run_id.is_empty(), "run_id must be non-empty");
1722
1723        // Step 2: build a proper sample using the real CSV format so that
1724        // the column names match what the API expects.
1725        let timestamp_secs = std::time::SystemTime::now()
1726            .duration_since(std::time::UNIX_EPOCH)
1727            .unwrap_or_default()
1728            .as_secs();
1729        let sample = Sample {
1730            timestamp_secs,
1731            job_name: Some("integration-test-close-run".to_string()),
1732            tracked_pid: None,
1733            cpu: CpuMetrics::default(),
1734            memory: MemoryMetrics::default(),
1735            network: vec![],
1736            disk: vec![],
1737            gpu: vec![],
1738        };
1739        let csv = samples_to_csv(&[sample], 1);
1740        eprintln!(
1741            "T-INT-01: csv preview (first 120 chars): {}",
1742            &csv[..csv.len().min(120)]
1743        );
1744
1745        // Step 3: finish the run with the inline CSV (no S3 uploads in this test).
1746        eprintln!("T-INT-01: calling close_run...");
1747        let result = close_run(
1748            &agent,
1749            &api_base,
1750            &token,
1751            &ctx,
1752            Some(0),
1753            Some(csv),
1754            &[], // no S3 uploads → inline route
1755        );
1756        match &result {
1757            Ok(()) => eprintln!("T-INT-01: close_run ok -- 200 received"),
1758            Err(e) => eprintln!("T-INT-01: close_run FAILED: {e}"),
1759        }
1760        assert!(
1761            result.is_ok(),
1762            "close_run must return Ok (200) against the real API: {result:?}"
1763        );
1764    }
1765}