Skip to main content

resource_tracker/sentinel/
run.rs

1use crate::config::JobMetadata;
2use crate::metrics::{CloudInfo, HostInfo};
3use crate::sentinel::s3::UploadCredentials;
4use serde::{Deserialize, Serialize};
5
6fn slice_is_empty(v: &&[String]) -> bool {
7    v.is_empty()
8}
9
10// ---------------------------------------------------------------------------
11// Base64 encoding (stdlib only -- avoids a crate dependency)
12// ---------------------------------------------------------------------------
13
14#[cfg(test)]
15fn base64_encode(input: &[u8]) -> String {
16    const ALPHA: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
17    let mut out = String::with_capacity((input.len() + 2) / 3 * 4);
18    input.chunks(3).for_each(|chunk| {
19        let b0 = u32::from(chunk[0]);
20        let b1 = if chunk.len() > 1 {
21            u32::from(chunk[1])
22        } else {
23            0
24        };
25        let b2 = if chunk.len() > 2 {
26            u32::from(chunk[2])
27        } else {
28            0
29        };
30        let n = (b0 << 16) | (b1 << 8) | b2;
31        out.push(char::from(ALPHA[((n >> 18) & 0x3f) as usize]));
32        out.push(char::from(ALPHA[((n >> 12) & 0x3f) as usize]));
33        out.push(if chunk.len() > 1 {
34            char::from(ALPHA[((n >> 6) & 0x3f) as usize])
35        } else {
36            '='
37        });
38        out.push(if chunk.len() > 2 {
39            char::from(ALPHA[(n & 0x3f) as usize])
40        } else {
41            '='
42        });
43    });
44    out
45}
46
47// ---------------------------------------------------------------------------
48// API response types
49// ---------------------------------------------------------------------------
50
51#[derive(Debug, Deserialize)]
52struct StartRunResponse {
53    run_id: String,
54    upload_uri_prefix: String,
55    upload_credentials: RawCredentials,
56}
57
58/// Field names match the live Sentinel API response (Python reference:
59/// `sentinel_api.py` `register_run` docstring).
60/// `expiration` is the documented Python name; `expires_at` is accepted as an
61/// alias in case the live API uses a different casing.  The field is optional
62/// so a missing value does not abort the run -- it falls back to a far-future
63/// timestamp (credentials treated as always-fresh).
64#[derive(Debug, Deserialize)]
65struct RawCredentials {
66    access_key: String,
67    secret_key: String,
68    session_token: String,
69    #[serde(alias = "expires_at", default)]
70    expiration: Option<String>,
71}
72
73#[derive(Debug, Deserialize)]
74struct RefreshCredentialsResponse {
75    upload_credentials: RawCredentials,
76}
77
78// ---------------------------------------------------------------------------
79// RunContext
80// ---------------------------------------------------------------------------
81
82/// State returned by `start_run` and referenced by all subsequent API calls.
83#[derive(Debug, Clone)]
84pub struct RunContext {
85    pub run_id: String,
86    pub upload_uri_prefix: String,
87    pub credentials: UploadCredentials,
88}
89
90impl RunContext {
91    /// Returns `true` when the STS credentials expire within 5 minutes.
92    /// Satisfies T-STR-04.
93    pub fn creds_expiring_soon(&self) -> bool {
94        match parse_iso8601_secs(&self.credentials.expires_at) {
95            Some(expires_at_secs) => {
96                let now = std::time::SystemTime::now()
97                    .duration_since(std::time::UNIX_EPOCH)
98                    .unwrap_or_default()
99                    .as_secs();
100                expires_at_secs.saturating_sub(now) < 300
101            }
102            None => {
103                // Unparseable expires_at: treat as already expired to be safe.
104                true
105            }
106        }
107    }
108}
109
110/// Parse a subset of ISO 8601 UTC timestamps into Unix seconds.
111/// Handles the common AWS format `"YYYY-MM-DDTHH:MM:SSZ"`.
112fn parse_iso8601_secs(s: &str) -> Option<u64> {
113    // Expected: "2026-04-01T12:00:00Z" (20 chars, no sub-second, UTC only)
114    let s = s.trim_end_matches('Z');
115    let s = s.trim_end_matches("+00:00");
116    let nums: Vec<u64> = s
117        .split(|c: char| !c.is_ascii_digit())
118        .filter(|p| !p.is_empty())
119        .map(|p| p.parse().ok())
120        .collect::<Option<Vec<_>>>()?;
121    if nums.len() < 6 {
122        return None;
123    }
124    let (y, mo, d, h, mi, sec) = (nums[0], nums[1], nums[2], nums[3], nums[4], nums[5]);
125    // Approximate days since epoch (good enough for expires_at comparison).
126    let days = days_since_epoch(y, mo, d)?;
127    Some(days * 86400 + h * 3600 + mi * 60 + sec)
128}
129
130/// Format a Unix timestamp as an ISO 8601 UTC string (`"YYYY-MM-DDTHH:MM:SSZ"`).
131/// Accurate for 1970-2199.
132fn unix_secs_to_iso8601(secs: u64) -> String {
133    let tod = secs % 86400;
134    let mut days = secs / 86400;
135    let hh = tod / 3600;
136    let mm = (tod % 3600) / 60;
137    let ss = tod % 60;
138
139    let mut year = 1970u64;
140    loop {
141        let is_leap =
142            year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
143        let yd = if is_leap { 366u64 } else { 365u64 };
144        if days < yd {
145            break;
146        }
147        days -= yd;
148        year += 1;
149    }
150    let is_leap = year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
151    const MDAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
152    let mut month = 1u64;
153    loop {
154        let dim = if month == 2 && is_leap {
155            29u64
156        } else {
157            MDAYS[(month - 1) as usize]
158        };
159        if days < dim {
160            break;
161        }
162        days -= dim;
163        month += 1;
164    }
165    let day = days + 1;
166    format!("{year:04}-{month:02}-{day:02}T{hh:02}:{mm:02}:{ss:02}Z")
167}
168
169fn now_iso8601() -> String {
170    let secs = std::time::SystemTime::now()
171        .duration_since(std::time::UNIX_EPOCH)
172        .unwrap_or_default()
173        .as_secs();
174    unix_secs_to_iso8601(secs)
175}
176
177/// Days from 1970-01-01 to the given date (Gregorian, valid 1970–2099).
178fn days_since_epoch(y: u64, mo: u64, d: u64) -> Option<u64> {
179    if !(1..=12).contains(&mo) || !(1..=31).contains(&d) || y < 1970 {
180        return None;
181    }
182    // Days per month (non-leap); February adjusted below.
183    const DAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
184    let is_leap = y.is_multiple_of(4) && !y.is_multiple_of(100) || y.is_multiple_of(400);
185    let year_days: u64 = (1970..y)
186        .map(|yr| {
187            if (yr % 4 == 0 && yr % 100 != 0) || yr % 400 == 0 {
188                366
189            } else {
190                365
191            }
192        })
193        .sum();
194    let month_days: u64 = (1..mo)
195        .map(|m| {
196            if m == 2 && is_leap {
197                29
198            } else {
199                DAYS[(m - 1) as usize]
200            }
201        })
202        .sum();
203    Some(year_days + month_days + d - 1)
204}
205
206// ---------------------------------------------------------------------------
207// Request payload types
208// ---------------------------------------------------------------------------
209
210/// Python `register_run` sends a flat dict merging metadata, host_info, and
211/// cloud_info.  `#[serde(flatten)]` on all three fields reproduces that shape.
212#[derive(Debug, Serialize)]
213struct StartRunRequest<'a> {
214    #[serde(flatten)]
215    metadata: MetadataPayload<'a>,
216    #[serde(flatten)]
217    host: &'a HostInfo,
218    #[serde(flatten)]
219    cloud: &'a CloudInfo,
220}
221
222#[derive(Debug, Serialize)]
223struct MetadataPayload<'a> {
224    #[serde(skip_serializing_if = "Option::is_none")]
225    job_name: Option<&'a str>,
226    #[serde(skip_serializing_if = "Option::is_none")]
227    project_name: Option<&'a str>,
228    #[serde(skip_serializing_if = "Option::is_none")]
229    stage_name: Option<&'a str>,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    task_name: Option<&'a str>,
232    #[serde(skip_serializing_if = "Option::is_none")]
233    team: Option<&'a str>,
234    #[serde(skip_serializing_if = "Option::is_none")]
235    env: Option<&'a str>,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    language: Option<&'a str>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    orchestrator: Option<&'a str>,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    executor: Option<&'a str>,
242    #[serde(skip_serializing_if = "Option::is_none")]
243    external_run_id: Option<&'a str>,
244    #[serde(skip_serializing_if = "Option::is_none")]
245    container_image: Option<&'a str>,
246    #[serde(skip_serializing_if = "slice_is_empty")]
247    tags: &'a [String],
248    // NOTE: `pid` was intentionally omitted.  The `RunCreate` schema at
249    // https://api.sentinel.sparecores.net/openapi.json does not include a `pid`
250    // field, so sending it caused Pydantic validation to reject it with HTTP 422.
251    /// Shell-wrapper command encoded as a JSON string, e.g. `"[\"stress\",\"--cpu\",\"4\"]"`.
252    /// The Sentinel API `command` field is `string | null` ("JSON array encoded in TEXT"),
253    /// not a JSON array.  Omitted when not running in shell-wrapper mode.
254    #[serde(skip_serializing_if = "Option::is_none")]
255    command: Option<String>,
256}
257
258// See https://api.sentinel.sparecores.net/docs#/Resource%20Tracker/finish_run_runs__run_id__finish_post
259// The /finish endpoint accepts two discriminated variants keyed on data_source:
260//   "inline" → RunFinishInline  (data_csv: raw CSV string, additionalProperties:false)
261//   "s3"     → RunFinishS3      (data_uris: [s3://...], additionalProperties:false)
262// run_id goes in the URL path only -- do not repeat in body.
263
264/// Payload for RunFinishInline: remaining samples sent as a raw CSV string.
265/// Used when no S3 batches were uploaded (short runs or all S3 failures).
266#[derive(Debug, Serialize)]
267struct CloseRunInlineRequest {
268    #[serde(skip_serializing_if = "Option::is_none")]
269    exit_code: Option<i32>,
270    run_status: &'static str,
271    /// Exact finish time in ISO 8601 UTC format, e.g. "2026-04-03T12:00:00Z".
272    #[serde(skip_serializing_if = "Option::is_none")]
273    finished_at: Option<String>,
274    /// Discriminator value -- must be exactly "inline".
275    data_source: &'static str,
276    /// Raw CSV string -- NOT base64-encoded.  The API schema type is plain string.
277    data_csv: String,
278}
279
280/// Payload for RunFinishS3: data was already uploaded to S3 in batches.
281/// Used when at least one S3 batch was successfully uploaded.
282/// The final remaining samples must have been flushed to S3 before calling
283/// close_run (the BatchUploader performs this flush on shutdown).
284#[derive(Debug, Serialize)]
285struct CloseRunS3Request {
286    #[serde(skip_serializing_if = "Option::is_none")]
287    exit_code: Option<i32>,
288    run_status: &'static str,
289    /// Exact finish time in ISO 8601 UTC format, e.g. "2026-04-03T12:00:00Z".
290    #[serde(skip_serializing_if = "Option::is_none")]
291    finished_at: Option<String>,
292    /// Discriminator value -- must be exactly "s3".
293    data_source: &'static str,
294    /// S3 URIs of all uploaded batch files (including the final flush).
295    data_uris: Vec<String>,
296}
297
298// ---------------------------------------------------------------------------
299// API calls
300// ---------------------------------------------------------------------------
301
302/// POST to `/runs` to register a new run with the Sentinel API.
303///
304/// On failure the caller should log a warning and disable streaming;
305/// local stdout output continues normally (Section 11 error handling).
306pub fn start_run(
307    agent: &ureq::Agent,
308    api_base: &str,
309    token: &str,
310    metadata: &JobMetadata,
311    pid: Option<i32>,
312    host: &HostInfo,
313    cloud: &CloudInfo,
314) -> Result<RunContext, String> {
315    // The API `command` field is `string | null` ("JSON array encoded in TEXT").
316    // Serialize the Vec<String> to a JSON string; send None when no command was given.
317    let command_json: Option<String> = if metadata.command.is_empty() {
318        None
319    } else {
320        serde_json::to_string(&metadata.command).ok()
321    };
322
323    // `pid` is tracked locally but is not part of the `RunCreate` API schema.
324    let _ = pid;
325
326    let payload = StartRunRequest {
327        metadata: MetadataPayload {
328            job_name: metadata.job_name.as_deref(),
329            project_name: metadata.project_name.as_deref(),
330            stage_name: metadata.stage_name.as_deref(),
331            task_name: metadata.task_name.as_deref(),
332            team: metadata.team.as_deref(),
333            env: metadata.env.as_deref(),
334            language: metadata.language.as_deref(),
335            orchestrator: metadata.orchestrator.as_deref(),
336            executor: metadata.executor.as_deref(),
337            external_run_id: metadata.external_run_id.as_deref(),
338            container_image: metadata.container_image.as_deref(),
339            tags: &metadata.tags,
340            command: command_json,
341        },
342        host,
343        cloud,
344    };
345
346    let url = format!("{api_base}/runs");
347    let body = serde_json::to_string(&payload)
348        .map_err(|e| format!("failed to serialize start_run payload: {e}"))?;
349
350    let mut response = agent
351        .post(&url)
352        .header("Authorization", &format!("Bearer {token}"))
353        .header("Content-Type", "application/json")
354        .send(body.as_bytes())
355        .map_err(|e| format!("start_run POST failed: {e}"))?;
356
357    let text = response
358        .body_mut()
359        .read_to_string()
360        .map_err(|e| format!("start_run read body failed: {e}"))?;
361
362    let resp: StartRunResponse = serde_json::from_str(&text).map_err(|e| {
363        format!(
364            "start_run parse response failed: {e} ({} bytes)",
365            text.len()
366        )
367    })?;
368
369    Ok(RunContext {
370        run_id: resp.run_id,
371        upload_uri_prefix: resp.upload_uri_prefix,
372        credentials: UploadCredentials {
373            access_key_id: resp.upload_credentials.access_key,
374            secret_access_key: resp.upload_credentials.secret_key,
375            session_token: resp.upload_credentials.session_token,
376            expires_at: resp
377                .upload_credentials
378                .expiration
379                .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
380        },
381    })
382}
383
384/// POST to `/runs/{run_id}/credentials/refresh` to obtain fresh STS credentials.
385///
386/// Updates `ctx.credentials` in place on success.
387pub fn refresh_credentials(
388    agent: &ureq::Agent,
389    api_base: &str,
390    token: &str,
391    ctx: &mut RunContext,
392) -> Result<(), String> {
393    let url = format!("{api_base}/runs/{}/refresh-credentials", ctx.run_id);
394    let mut response = agent
395        .post(&url)
396        .header("Authorization", &format!("Bearer {token}"))
397        .send(b"" as &[u8])
398        .map_err(|e| format!("credential refresh POST failed: {e}"))?;
399
400    let text = response
401        .body_mut()
402        .read_to_string()
403        .map_err(|e| format!("credential refresh read body failed: {e}"))?;
404
405    let resp: RefreshCredentialsResponse = serde_json::from_str(&text).map_err(|e| {
406        format!(
407            "credential refresh parse failed: {e} ({} bytes)",
408            text.len()
409        )
410    })?;
411
412    ctx.credentials = UploadCredentials {
413        access_key_id: resp.upload_credentials.access_key,
414        secret_access_key: resp.upload_credentials.secret_key,
415        session_token: resp.upload_credentials.session_token,
416        expires_at: resp
417            .upload_credentials
418            .expiration
419            .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
420    };
421    Ok(())
422}
423
424/// POST to `/runs/{run_id}/finish` to mark the run complete.
425///
426/// Dispatches to the correct schema variant based on whether S3 uploads occurred:
427///
428/// - `uploaded_uris` non-empty → `RunFinishS3` (`data_source="s3"`, `data_uris=[...]`).
429///   The `BatchUploader` already flushed the final remaining samples to S3 on
430///   shutdown before this function is called, so `uploaded_uris` contains every batch.
431///   `remaining_csv` is ignored in this path.
432///
433/// - `uploaded_uris` empty → `RunFinishInline` (`data_source="inline"`, `data_csv=...`).
434///   Used for short runs or when all S3 uploads failed.  `remaining_csv` is the
435///   raw CSV of all collected samples (never base64-encoded).
436pub fn close_run(
437    agent: &ureq::Agent,
438    api_base: &str,
439    token: &str,
440    ctx: &RunContext,
441    exit_code: Option<i32>,
442    remaining_csv: Option<String>,
443    uploaded_uris: &[String],
444) -> Result<(), String> {
445    // "finished" for success/clean exit (including SIGTERM), "failed" for non-zero.
446    let run_status = match exit_code {
447        Some(0) | None => "finished",
448        Some(_) => "failed",
449    };
450    let finished_at = Some(now_iso8601());
451
452    let url = format!("{api_base}/runs/{}/finish", ctx.run_id);
453    let body = if uploaded_uris.is_empty() {
454        // Inline route: no S3 batches uploaded.
455        // data_csv must be the raw CSV string (not base64) per the API schema.
456        let payload = CloseRunInlineRequest {
457            exit_code,
458            run_status,
459            finished_at,
460            data_source: "inline",
461            data_csv: remaining_csv.unwrap_or_default(),
462        };
463        serde_json::to_string(&payload)
464            .map_err(|e| format!("failed to serialize close_run inline payload: {e}"))?
465    } else {
466        // S3 route: all batches (including final flush) are in uploaded_uris.
467        let payload = CloseRunS3Request {
468            exit_code,
469            run_status,
470            finished_at,
471            data_source: "s3",
472            data_uris: uploaded_uris.to_vec(),
473        };
474        serde_json::to_string(&payload)
475            .map_err(|e| format!("failed to serialize close_run s3 payload: {e}"))?
476    };
477
478    // Send plain JSON -- no body-level gzip.  The Sentinel API (FastAPI) does
479    // not decompress Content-Encoding: gzip on incoming request bodies, so
480    // sending compressed bytes causes a 422.  This matches the Python reference
481    // which calls requests.post(url, json=payload) with no Content-Encoding.
482    let response = agent
483        .post(&url)
484        .header("Authorization", &format!("Bearer {token}"))
485        .header("Content-Type", "application/json")
486        .send(body.as_bytes())
487        .map_err(|e| format!("close_run POST failed: {e}"))?;
488
489    let status = response.status();
490    if status != 200 {
491        return Err(format!("close_run received HTTP {status}: expected 200"));
492    }
493
494    Ok(())
495}
496
497// ---------------------------------------------------------------------------
498// Unit tests
499// ---------------------------------------------------------------------------
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn test_creds_expiring_soon_far_future() {
507        let ctx = RunContext {
508            run_id: "test".to_string(),
509            upload_uri_prefix: "s3://b/p".to_string(),
510            credentials: UploadCredentials {
511                access_key_id: "k".to_string(),
512                secret_access_key: "s".to_string(),
513                session_token: "t".to_string(),
514                expires_at: "2099-01-01T00:00:00Z".to_string(),
515            },
516        };
517        assert!(!ctx.creds_expiring_soon());
518    }
519
520    #[test]
521    fn test_creds_expiring_soon_past() {
522        let ctx = RunContext {
523            run_id: "test".to_string(),
524            upload_uri_prefix: "s3://b/p".to_string(),
525            credentials: UploadCredentials {
526                access_key_id: "k".to_string(),
527                secret_access_key: "s".to_string(),
528                session_token: "t".to_string(),
529                expires_at: "1970-01-01T00:00:00Z".to_string(),
530            },
531        };
532        assert!(ctx.creds_expiring_soon());
533    }
534
535    #[test]
536    fn test_creds_expiring_soon_unparseable() {
537        let ctx = RunContext {
538            run_id: "test".to_string(),
539            upload_uri_prefix: "s3://b/p".to_string(),
540            credentials: UploadCredentials {
541                access_key_id: "k".to_string(),
542                secret_access_key: "s".to_string(),
543                session_token: "t".to_string(),
544                expires_at: "not-a-date".to_string(),
545            },
546        };
547        // Unparseable expires_at treated as already expired.
548        assert!(ctx.creds_expiring_soon());
549    }
550
551    #[test]
552    fn test_days_since_epoch_known_dates() {
553        assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
554        assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
555        assert_eq!(days_since_epoch(2026, 4, 1), Some(20544));
556    }
557
558    #[test]
559    fn test_parse_iso8601_secs_known() {
560        // 2026-04-01T00:00:00Z = 20544 days * 86400
561        assert_eq!(
562            parse_iso8601_secs("2026-04-01T00:00:00Z"),
563            Some(20544 * 86400)
564        );
565    }
566
567    // T-EOR-01: close_run POSTs to /runs/{run_id}/finish with the correct shape.
568    // Verifies: run_id in URL (not body), data_source=inline, data_csv present,
569    // no "s3" field, run_status and exit_code present.
570    #[test]
571    fn test_close_run_posts_to_finish_endpoint() {
572        // follow schema guidance of https://api.sentinel.sparecores.net/docs#/Resource%20Tracker/finish_run_runs__run_id__finish_post to PASS
573        use std::io::{Read as _, Write as _};
574        use std::sync::mpsc;
575        use std::time::Duration;
576
577        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
578        let port = listener.local_addr().unwrap().port();
579        let (tx, rx) = mpsc::channel::<Vec<u8>>();
580
581        std::thread::spawn(move || {
582            if let Ok((mut stream, _)) = listener.accept() {
583                // Read headers + body in a loop until Content-Length bytes are present.
584                let mut buf = Vec::<u8>::new();
585                let mut tmp = [0u8; 4096];
586                loop {
587                    let n = stream.read(&mut tmp).unwrap_or(0);
588                    if n == 0 {
589                        break;
590                    }
591                    buf.extend_from_slice(&tmp[..n]);
592                    // Find header/body separator.
593                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
594                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
595                        let cl = header_str
596                            .lines()
597                            .find_map(|l| {
598                                l.trim()
599                                    .strip_prefix("content-length:")
600                                    .and_then(|v| v.trim().parse::<usize>().ok())
601                            })
602                            .unwrap_or(0);
603                        if buf.len() >= sep + 4 + cl {
604                            break;
605                        }
606                    }
607                }
608                tx.send(buf).ok();
609                stream
610                    .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n{}")
611                    .ok();
612            }
613        });
614
615        let agent = ureq::config::Config::builder()
616            .timeout_global(Some(Duration::from_secs(30)))
617            .build()
618            .new_agent();
619
620        let ctx = RunContext {
621            run_id: "run-abc-999".to_string(),
622            upload_uri_prefix: "s3://b/p".to_string(),
623            credentials: UploadCredentials {
624                access_key_id: "k".to_string(),
625                secret_access_key: "s".to_string(),
626                session_token: "t".to_string(),
627                expires_at: "2099-01-01T00:00:00Z".to_string(),
628            },
629        };
630
631        let result = close_run(
632            &agent,
633            &format!("http://127.0.0.1:{port}"),
634            "test-token",
635            &ctx,
636            Some(0),
637            Some("header\nrow1\n".to_string()),
638            &[], // no S3 uploads → inline route
639        );
640        assert!(result.is_ok(), "close_run failed: {result:?}");
641
642        let raw = rx.recv().expect("mock server did not receive request");
643        let raw_str = String::from_utf8_lossy(&raw);
644
645        // run_id belongs in the URL path, not the JSON body.
646        assert!(
647            raw_str.contains("/runs/run-abc-999/finish"),
648            "URL must include /runs/{{run_id}}/finish: {raw_str}"
649        );
650        assert!(
651            !raw_str.contains("\"run_id\""),
652            "run_id must not appear in the JSON body: {raw_str}"
653        );
654
655        // data_source must be "inline"; no "s3" field anywhere.
656        assert!(
657            raw_str.contains("\"data_source\":\"inline\""),
658            "expected data_source=inline in body: {raw_str}"
659        );
660        assert!(
661            !raw_str.contains("\"s3\""),
662            "s3 must not appear in body: {raw_str}"
663        );
664
665        // data_csv must be present as a raw CSV string (not base64-encoded).
666        assert!(
667            raw_str.contains("\"data_csv\""),
668            "data_csv absent from body: {raw_str}"
669        );
670        assert!(
671            raw_str.contains("header"),
672            "data_csv must contain raw CSV content (not base64): {raw_str}"
673        );
674
675        // finished_at must be present as an ISO 8601 UTC timestamp.
676        assert!(
677            raw_str.contains("\"finished_at\""),
678            "finished_at absent from body: {raw_str}"
679        );
680
681        // run_status and exit_code must be present.
682        assert!(
683            raw_str.contains("\"run_status\":\"finished\""),
684            "run_status absent or wrong: {raw_str}"
685        );
686        assert!(
687            raw_str.contains("\"exit_code\":0"),
688            "exit_code absent or wrong: {raw_str}"
689        );
690    }
691
692    // T-EOR-02: close_run body does NOT contain run_id (it is already in the URL path).
693    #[test]
694    fn test_close_run_request_omits_run_id() {
695        let req = CloseRunInlineRequest {
696            exit_code: Some(0),
697            run_status: "finished",
698            finished_at: Some("2026-04-03T12:00:00Z".to_string()),
699            data_source: "inline",
700            data_csv: "timestamp,cpu\n1000,42\n".to_string(),
701        };
702        let json = serde_json::to_string(&req).expect("serialize failed");
703        assert!(
704            !json.contains("\"run_id\""),
705            "run_id must not appear in close_run body (it is in the URL): {json}"
706        );
707    }
708
709    // T-EOR-03: data_source is "inline", data_csv is raw CSV (not base64), finished_at present.
710    #[test]
711    fn test_close_run_data_source_inline() {
712        let raw_csv = "timestamp,cpu\n1000,42\n";
713        let req = CloseRunInlineRequest {
714            exit_code: Some(0),
715            run_status: "finished",
716            finished_at: Some("2026-04-03T12:00:00Z".to_string()),
717            data_source: "inline",
718            data_csv: raw_csv.to_string(),
719        };
720        let json = serde_json::to_string(&req).expect("serialize failed");
721        assert!(
722            json.contains("\"data_source\":\"inline\""),
723            "data_source is not 'inline': {json}"
724        );
725        // data_csv must be the raw string -- not base64.
726        assert!(
727            json.contains("timestamp"),
728            "data_csv must contain raw CSV content (not base64): {json}"
729        );
730        assert!(
731            json.contains("\"finished_at\":\"2026-04-03T12:00:00Z\""),
732            "finished_at absent or wrong: {json}"
733        );
734    }
735
736    // base64_encode: RFC 4648 test vectors (Section 10).
737    #[test]
738    fn test_base64_encode_rfc4648_vectors() {
739        assert_eq!(base64_encode(b""), "");
740        assert_eq!(base64_encode(b"f"), "Zg==");
741        assert_eq!(base64_encode(b"fo"), "Zm8=");
742        assert_eq!(base64_encode(b"foo"), "Zm9v");
743        assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
744        assert_eq!(base64_encode(b"fooba"), "Zm9vYmE=");
745        assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
746    }
747
748    // base64_encode: encoding and decoding round-trip is valid base64.
749    #[test]
750    fn test_base64_encode_csv_roundtrip() {
751        let csv = "timestamp,value\n1000,42\n1001,99\n";
752        let encoded = base64_encode(csv.as_bytes());
753        // All chars in the output must be valid base64 characters.
754        encoded.chars().for_each(|c| {
755            assert!(
756                c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=',
757                "invalid base64 char '{c}' in: {encoded}"
758            );
759        });
760    }
761
762    // days_since_epoch: invalid month/day/year return None.
763    #[test]
764    fn test_days_since_epoch_invalid_inputs() {
765        assert_eq!(days_since_epoch(1970, 0, 1), None, "month 0 is invalid");
766        assert_eq!(days_since_epoch(1970, 13, 1), None, "month 13 is invalid");
767        assert_eq!(days_since_epoch(1970, 1, 0), None, "day 0 is invalid");
768        assert_eq!(days_since_epoch(1970, 1, 32), None, "day 32 is invalid");
769        assert_eq!(
770            days_since_epoch(1969, 12, 31),
771            None,
772            "year before 1970 is invalid"
773        );
774    }
775
776    // parse_iso8601_secs: +00:00 suffix is handled the same as Z.
777    #[test]
778    fn test_parse_iso8601_secs_with_utc_offset() {
779        let with_z = parse_iso8601_secs("2026-04-01T00:00:00Z");
780        let with_plus = parse_iso8601_secs("2026-04-01T00:00:00+00:00");
781        assert_eq!(
782            with_z, with_plus,
783            "+00:00 and Z must parse to the same timestamp"
784        );
785    }
786
787    // parse_iso8601_secs: fewer than 6 numeric components returns None.
788    #[test]
789    fn test_parse_iso8601_secs_too_few_components() {
790        assert_eq!(
791            parse_iso8601_secs("2026-04-01"),
792            None,
793            "date-only string must return None"
794        );
795        assert_eq!(parse_iso8601_secs("not-a-date"), None);
796    }
797
798    // slice_is_empty helper is used by serde skip_serializing_if.
799    #[test]
800    fn test_slice_is_empty_helper() {
801        let empty: &[String] = &[];
802        let nonempty: &[String] = &["tag".to_string()];
803        assert!(slice_is_empty(&&*empty), "empty slice should return true");
804        assert!(
805            !slice_is_empty(&&*nonempty),
806            "nonempty slice should return false"
807        );
808    }
809
810    // unix_secs_to_iso8601: known Unix timestamps produce the expected ISO 8601 string.
811    #[test]
812    fn test_unix_secs_to_iso8601_known_values() {
813        // Unix epoch itself.
814        assert_eq!(unix_secs_to_iso8601(0), "1970-01-01T00:00:00Z");
815        // 2000-01-01T00:00:00Z = 946684800 s
816        assert_eq!(unix_secs_to_iso8601(946684800), "2000-01-01T00:00:00Z");
817        // 2026-04-03T00:00:00Z -- today's date from context; days = 56*365 + 14 leap days + 92
818        // Easier to round-trip via parse_iso8601_secs.
819        let secs = parse_iso8601_secs("2026-04-03T15:30:45Z").expect("parse failed");
820        assert_eq!(unix_secs_to_iso8601(secs), "2026-04-03T15:30:45Z");
821    }
822
823    // unix_secs_to_iso8601: leap-day boundary (2000-02-29 exists, 1900-02-29 did not
824    // but we only go back to 1970 so just verify 2000-02-29 round-trips).
825    #[test]
826    fn test_unix_secs_to_iso8601_leap_day() {
827        let secs = parse_iso8601_secs("2000-02-29T12:00:00Z").expect("parse failed");
828        assert_eq!(unix_secs_to_iso8601(secs), "2000-02-29T12:00:00Z");
829    }
830
831    // now_iso8601: returns a non-empty string that parses back successfully.
832    #[test]
833    fn test_now_iso8601_parses() {
834        let s = now_iso8601();
835        assert!(!s.is_empty(), "now_iso8601 must not be empty");
836        assert!(s.ends_with('Z'), "now_iso8601 must end with Z: {s}");
837        let secs = parse_iso8601_secs(&s);
838        assert!(secs.is_some(), "now_iso8601 output must parse back: {s}");
839    }
840
841    // T-EOR-06: finished_at is omitted from the JSON when set to None.
842    #[test]
843    fn test_close_run_finished_at_omitted_when_none() {
844        let req = CloseRunInlineRequest {
845            exit_code: None,
846            run_status: "finished",
847            finished_at: None,
848            data_source: "inline",
849            data_csv: "".to_string(),
850        };
851        let json = serde_json::to_string(&req).expect("serialize failed");
852        assert!(
853            !json.contains("\"finished_at\""),
854            "finished_at must be absent when None: {json}"
855        );
856    }
857
858    // ---------------------------------------------------------------------------
859    // Spec-driven tests for /runs/{run_id}/finish (RunFinishInline schema).
860    // See https://api.sentinel.sparecores.net/openapi.json -- RunFinishInline.
861    // Required fields: data_source (const "inline"), data_csv (raw string).
862    // Optional fields: exit_code (int|null), run_status ("finished"|"failed"),
863    //                  finished_at (date-time string|null).
864    // additionalProperties: false -- no extra fields allowed.
865    // ---------------------------------------------------------------------------
866
867    // Build a realistic RunFinishResponse JSON body, mirroring the real Sentinel API
868    // response shape (RunFinishResponse = {run: RunResponse, processing: RunFinishProcessing}).
869    // The real API's 200 response always has both fields; "processing.status" is required.
870    fn finish_response_json(run_id: &str, run_status: &str, exit_code: Option<i32>) -> String {
871        let exit_code_field = match exit_code {
872            Some(c) => format!(",\"exit_code\":{c}"),
873            None => String::new(),
874        };
875        format!(
876            r#"{{"run":{{"run_id":"{run_id}","created_at":"2026-04-03T10:00:00Z","heartbeat_at":"2026-04-03T10:00:30Z","finished_at":"2026-04-03T10:01:00Z","run_status":"{run_status}","tag_count":0,"tags":[]{exit_code_field}}},"processing":{{"status":"ok","rows":1,"files":null,"duration_ms":5.0,"error":null}}}}"#
877        )
878    }
879
880    // Helper: spin up a mock TCP server that mimics the real Sentinel /finish endpoint:
881    // reads the full request, validates presence of required fields, returns a proper
882    // RunFinishResponse JSON on success (200) or a 422 JSON error if required fields
883    // are missing.  Returns the parsed request body JSON for assertions.
884    fn capture_close_run_body(exit_code: Option<i32>, csv: Option<&str>) -> String {
885        use std::io::{Read as _, Write as _};
886        use std::sync::mpsc;
887        use std::time::Duration;
888
889        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
890        let port = listener.local_addr().unwrap().port();
891        let (tx, rx) = mpsc::channel::<Vec<u8>>();
892
893        std::thread::spawn(move || {
894            if let Ok((mut stream, _)) = listener.accept() {
895                let mut buf = Vec::<u8>::new();
896                let mut tmp = [0u8; 4096];
897                loop {
898                    let n = stream.read(&mut tmp).unwrap_or(0);
899                    if n == 0 {
900                        break;
901                    }
902                    buf.extend_from_slice(&tmp[..n]);
903                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
904                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
905                        let cl = header_str
906                            .lines()
907                            .find_map(|l| {
908                                l.trim()
909                                    .strip_prefix("content-length:")
910                                    .and_then(|v| v.trim().parse::<usize>().ok())
911                            })
912                            .unwrap_or(0);
913                        if buf.len() >= sep + 4 + cl {
914                            break;
915                        }
916                    }
917                }
918                // Parse the body and decide: 200 with RunFinishResponse, or 422.
919                let body_start = buf
920                    .windows(4)
921                    .position(|w| w == b"\r\n\r\n")
922                    .map(|p| p + 4)
923                    .unwrap_or(buf.len());
924                let body_str = String::from_utf8_lossy(&buf[body_start..]);
925                let parsed: serde_json::Value =
926                    serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
927                // Real API requires data_source and data_csv for RunFinishInline.
928                let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("inline")
929                    && parsed.get("data_csv").is_some();
930                let (status_line, resp_body) = if valid {
931                    let run_status = parsed
932                        .get("run_status")
933                        .and_then(|v| v.as_str())
934                        .unwrap_or("finished");
935                    let ec = parsed
936                        .get("exit_code")
937                        .and_then(|v| v.as_i64())
938                        .map(|v| v as i32);
939                    (
940                        "HTTP/1.1 200 OK",
941                        finish_response_json("run-spec-test", run_status, ec),
942                    )
943                } else {
944                    (
945                        "HTTP/1.1 422 Unprocessable Entity",
946                        r#"{"detail":[{"loc":["body"],"msg":"field required","type":"value_error.missing"}]}"#.to_string(),
947                    )
948                };
949                tx.send(buf).ok();
950                let http = format!(
951                    "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
952                    resp_body.len(),
953                    resp_body
954                );
955                stream.write_all(http.as_bytes()).ok();
956            }
957        });
958
959        let agent = ureq::config::Config::builder()
960            .timeout_global(Some(Duration::from_secs(30)))
961            .build()
962            .new_agent();
963        let ctx = RunContext {
964            run_id: "run-spec-test".to_string(),
965            upload_uri_prefix: "s3://b/p".to_string(),
966            credentials: UploadCredentials {
967                access_key_id: "k".to_string(),
968                secret_access_key: "s".to_string(),
969                session_token: "t".to_string(),
970                expires_at: "2099-01-01T00:00:00Z".to_string(),
971            },
972        };
973        let _ = close_run(
974            &agent,
975            &format!("http://127.0.0.1:{port}"),
976            "token",
977            &ctx,
978            exit_code,
979            csv.map(String::from),
980            &[], // no S3 uploads → inline route
981        );
982        let raw = rx.recv().expect("mock server did not capture request");
983        // Extract the JSON body (everything after \r\n\r\n).
984        let body_start = raw
985            .windows(4)
986            .position(|w| w == b"\r\n\r\n")
987            .map(|p| p + 4)
988            .unwrap_or(0);
989        String::from_utf8_lossy(&raw[body_start..]).to_string()
990    }
991
992    // T-FIN-01: run_status is "finished" when exit_code is 0 (clean success).
993    #[test]
994    fn test_close_run_run_status_finished_for_zero_exit() {
995        let body = capture_close_run_body(Some(0), None);
996        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
997        assert_eq!(
998            v["run_status"], "finished",
999            "run_status must be 'finished' for exit_code=0: {body}"
1000        );
1001        assert_eq!(v["exit_code"], 0, "exit_code must be 0 in payload: {body}");
1002    }
1003
1004    // T-FIN-02: run_status is "finished" when exit_code is None (SIGTERM shutdown).
1005    #[test]
1006    fn test_close_run_run_status_finished_for_sigterm() {
1007        let body = capture_close_run_body(None, None);
1008        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1009        assert_eq!(
1010            v["run_status"], "finished",
1011            "run_status must be 'finished' when exit_code is None (SIGTERM): {body}"
1012        );
1013        // exit_code is skipped when None -- it must not appear in the payload.
1014        assert!(
1015            v.get("exit_code").is_none(),
1016            "exit_code must be absent when None (spec: optional integer): {body}"
1017        );
1018    }
1019
1020    // T-FIN-03: run_status is "failed" for any non-zero exit code.
1021    #[test]
1022    fn test_close_run_run_status_failed_for_nonzero_exit() {
1023        for code in [1, 2, 127, 130, 255] {
1024            let body = capture_close_run_body(Some(code), None);
1025            let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1026            assert_eq!(
1027                v["run_status"], "failed",
1028                "run_status must be 'failed' for exit_code={code}: {body}"
1029            );
1030        }
1031    }
1032
1033    // T-FIN-04: data_csv is a raw CSV string, not base64.
1034    // Spec: data_csv type is string, description "Raw CSV string".
1035    #[test]
1036    fn test_close_run_data_csv_is_raw_csv_not_base64() {
1037        let raw_csv = "timestamp,cpu_pct\n1743638400,42.5\n1743638401,44.0\n";
1038        let body = capture_close_run_body(Some(0), Some(raw_csv));
1039        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1040        let data_csv = v["data_csv"].as_str().expect("data_csv must be a string");
1041        assert!(
1042            data_csv.contains("timestamp"),
1043            "data_csv must be raw CSV (contains header row): {data_csv}"
1044        );
1045        assert!(
1046            data_csv.contains("42.5"),
1047            "data_csv must be raw CSV (contains data values): {data_csv}"
1048        );
1049        // A base64-encoded CSV would not contain commas or newlines.
1050        assert!(
1051            data_csv.contains(','),
1052            "data_csv must contain CSV commas (not base64): {data_csv}"
1053        );
1054    }
1055
1056    // T-FIN-05: finished_at is present and parses as a valid ISO 8601 UTC timestamp.
1057    // Spec: finished_at is an optional date-time field.
1058    #[test]
1059    fn test_close_run_finished_at_is_valid_iso8601() {
1060        let body = capture_close_run_body(Some(0), None);
1061        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1062        let fa = v["finished_at"]
1063            .as_str()
1064            .expect("finished_at must be a string");
1065        assert!(fa.ends_with('Z'), "finished_at must end with Z (UTC): {fa}");
1066        let secs = parse_iso8601_secs(fa);
1067        assert!(
1068            secs.is_some(),
1069            "finished_at must be a parseable ISO 8601 timestamp: {fa}"
1070        );
1071        // Must be recent -- within a few seconds of now.
1072        let now = std::time::SystemTime::now()
1073            .duration_since(std::time::UNIX_EPOCH)
1074            .unwrap_or_default()
1075            .as_secs();
1076        let diff = now.abs_diff(secs.unwrap());
1077        assert!(
1078            diff < 60,
1079            "finished_at must be close to current time (diff={diff}s): {fa}"
1080        );
1081    }
1082
1083    // T-FIN-06: close_run handles a realistic RunFinishResponse JSON without error.
1084    // Spec: 200 response body is RunFinishResponse {run, processing}.
1085    #[test]
1086    fn test_close_run_handles_valid_run_finish_response() {
1087        use std::io::{Read as _, Write as _};
1088        use std::time::Duration;
1089
1090        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1091        let port = listener.local_addr().unwrap().port();
1092
1093        // Minimal valid RunFinishResponse per OpenAPI spec.
1094        let response_body = r#"{
1095            "run": {
1096                "run_id": "01959e3a-0001-0000-0000-000000000000",
1097                "created_at": "2026-04-03T10:00:00Z",
1098                "heartbeat_at": "2026-04-03T10:00:30Z",
1099                "finished_at": "2026-04-03T10:01:00Z",
1100                "run_status": "finished",
1101                "tag_count": 0,
1102                "tags": []
1103            },
1104            "processing": {
1105                "status": "ok",
1106                "rows": 60,
1107                "files": null,
1108                "duration_ms": 12.5,
1109                "error": null
1110            }
1111        }"#;
1112        let http_response = format!(
1113            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1114            response_body.len(),
1115            response_body
1116        );
1117
1118        std::thread::spawn(move || {
1119            if let Ok((mut stream, _)) = listener.accept() {
1120                let mut tmp = [0u8; 4096];
1121                let _ = stream.read(&mut tmp);
1122                stream.write_all(http_response.as_bytes()).ok();
1123            }
1124        });
1125
1126        let agent = ureq::config::Config::builder()
1127            .timeout_global(Some(Duration::from_secs(30)))
1128            .build()
1129            .new_agent();
1130        let ctx = RunContext {
1131            run_id: "01959e3a-0001-0000-0000-000000000000".to_string(),
1132            upload_uri_prefix: "s3://b/p".to_string(),
1133            credentials: UploadCredentials {
1134                access_key_id: "k".to_string(),
1135                secret_access_key: "s".to_string(),
1136                session_token: "t".to_string(),
1137                expires_at: "2099-01-01T00:00:00Z".to_string(),
1138            },
1139        };
1140        let result = close_run(
1141            &agent,
1142            &format!("http://127.0.0.1:{port}"),
1143            "test-token",
1144            &ctx,
1145            Some(0),
1146            Some("timestamp,cpu_pct\n1743638400,42.5\n".to_string()),
1147            &[], // no S3 uploads → inline route
1148        );
1149        assert!(
1150            result.is_ok(),
1151            "close_run must succeed for a 200 response: {result:?}"
1152        );
1153    }
1154
1155    // T-FIN-07: no extra fields are sent beyond what the spec allows
1156    // (RunFinishInline has additionalProperties: false).
1157    #[test]
1158    fn test_close_run_no_extra_fields_in_payload() {
1159        let body = capture_close_run_body(Some(0), Some("ts,v\n1,2\n"));
1160        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1161        let obj = v.as_object().expect("payload must be a JSON object");
1162        let allowed: std::collections::HashSet<&str> = [
1163            "exit_code",
1164            "run_status",
1165            "finished_at",
1166            "data_source",
1167            "data_csv",
1168        ]
1169        .iter()
1170        .copied()
1171        .collect();
1172        for key in obj.keys() {
1173            assert!(
1174                allowed.contains(key.as_str()),
1175                "unexpected field '{key}' in payload -- not allowed by RunFinishInline schema (additionalProperties: false)"
1176            );
1177        }
1178        // Required fields must always be present.
1179        assert!(obj.contains_key("data_source"), "data_source is required");
1180        assert!(obj.contains_key("data_csv"), "data_csv is required");
1181    }
1182
1183    // ---------------------------------------------------------------------------
1184    // S3 route tests for close_run (RunFinishS3 schema variant).
1185    // ---------------------------------------------------------------------------
1186
1187    // Helper: call close_run with S3 URIs, return the captured request body JSON.
1188    fn capture_close_run_s3_body(exit_code: Option<i32>, uris: &[&str]) -> String {
1189        use std::io::{Read as _, Write as _};
1190        use std::sync::mpsc;
1191        use std::time::Duration;
1192
1193        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1194        let port = listener.local_addr().unwrap().port();
1195        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1196
1197        std::thread::spawn(move || {
1198            if let Ok((mut stream, _)) = listener.accept() {
1199                let mut buf = Vec::<u8>::new();
1200                let mut tmp = [0u8; 4096];
1201                loop {
1202                    let n = stream.read(&mut tmp).unwrap_or(0);
1203                    if n == 0 {
1204                        break;
1205                    }
1206                    buf.extend_from_slice(&tmp[..n]);
1207                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1208                        let hdr = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1209                        let cl = hdr
1210                            .lines()
1211                            .find_map(|l| {
1212                                l.trim()
1213                                    .strip_prefix("content-length:")
1214                                    .and_then(|v| v.trim().parse::<usize>().ok())
1215                            })
1216                            .unwrap_or(0);
1217                        if buf.len() >= sep + 4 + cl {
1218                            break;
1219                        }
1220                    }
1221                }
1222                // Real API: 200 RunFinishResponse for a valid S3 payload.
1223                let body_start = buf
1224                    .windows(4)
1225                    .position(|w| w == b"\r\n\r\n")
1226                    .map(|p| p + 4)
1227                    .unwrap_or(buf.len());
1228                let body_str = String::from_utf8_lossy(&buf[body_start..]);
1229                let parsed: serde_json::Value =
1230                    serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
1231                let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("s3")
1232                    && parsed.get("data_uris").is_some();
1233                let run_status = parsed
1234                    .get("run_status")
1235                    .and_then(|v| v.as_str())
1236                    .unwrap_or("finished");
1237                let ec = parsed
1238                    .get("exit_code")
1239                    .and_then(|v| v.as_i64())
1240                    .map(|v| v as i32);
1241                let (status_line, resp_body) = if valid {
1242                    (
1243                        "HTTP/1.1 200 OK",
1244                        finish_response_json("run-s3-test", run_status, ec),
1245                    )
1246                } else {
1247                    (
1248                        "HTTP/1.1 422 Unprocessable Entity",
1249                        r#"{"detail":[{"msg":"field required","type":"value_error.missing"}]}"#
1250                            .to_string(),
1251                    )
1252                };
1253                tx.send(buf).ok();
1254                let http = format!(
1255                    "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1256                    resp_body.len(),
1257                    resp_body
1258                );
1259                stream.write_all(http.as_bytes()).ok();
1260            }
1261        });
1262
1263        let agent = ureq::config::Config::builder()
1264            .timeout_global(Some(Duration::from_secs(30)))
1265            .build()
1266            .new_agent();
1267        let ctx = RunContext {
1268            run_id: "run-s3-test".to_string(),
1269            upload_uri_prefix: "s3://b/p".to_string(),
1270            credentials: UploadCredentials {
1271                access_key_id: "k".to_string(),
1272                secret_access_key: "s".to_string(),
1273                session_token: "t".to_string(),
1274                expires_at: "2099-01-01T00:00:00Z".to_string(),
1275            },
1276        };
1277        let uploaded: Vec<String> = uris.iter().map(|s| (*s).to_string()).collect();
1278        let _ = close_run(
1279            &agent,
1280            &format!("http://127.0.0.1:{port}"),
1281            "token",
1282            &ctx,
1283            exit_code,
1284            None, // remaining_csv unused in S3 route
1285            &uploaded,
1286        );
1287        let raw = rx.recv().expect("mock server did not capture request");
1288        let body_start = raw
1289            .windows(4)
1290            .position(|w| w == b"\r\n\r\n")
1291            .map(|p| p + 4)
1292            .unwrap_or(0);
1293        String::from_utf8_lossy(&raw[body_start..]).to_string()
1294    }
1295
1296    // T-S3R-01: when uploaded_uris is non-empty, data_source is "s3" and
1297    // data_uris contains the URIs (RunFinishS3 schema variant).
1298    #[test]
1299    fn test_close_run_uses_s3_route_when_uris_present() {
1300        let uris = &[
1301            "s3://my-bucket/prefix/run-abc/000000.csv.gz",
1302            "s3://my-bucket/prefix/run-abc/000001.csv.gz",
1303        ];
1304        let body = capture_close_run_s3_body(Some(0), uris);
1305        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1306        assert_eq!(
1307            v["data_source"], "s3",
1308            "data_source must be 's3' when URIs present: {body}"
1309        );
1310        let arr = v["data_uris"]
1311            .as_array()
1312            .expect("data_uris must be a JSON array");
1313        assert_eq!(arr.len(), 2, "data_uris must have 2 elements: {body}");
1314        assert_eq!(arr[0], "s3://my-bucket/prefix/run-abc/000000.csv.gz");
1315        assert_eq!(arr[1], "s3://my-bucket/prefix/run-abc/000001.csv.gz");
1316        // data_csv must NOT be present in the S3 route (additionalProperties: false).
1317        assert!(
1318            !body.contains("\"data_csv\""),
1319            "data_csv must be absent in S3 route: {body}"
1320        );
1321    }
1322
1323    // T-S3R-02: S3 route payload contains no extra fields beyond RunFinishS3 schema.
1324    // RunFinishS3 has additionalProperties: false.
1325    #[test]
1326    fn test_close_run_s3_no_extra_fields() {
1327        let uris = &["s3://bucket/prefix/run/000000.csv.gz"];
1328        let body = capture_close_run_s3_body(Some(0), uris);
1329        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1330        let obj = v.as_object().expect("payload must be a JSON object");
1331        let allowed: std::collections::HashSet<&str> = [
1332            "exit_code",
1333            "run_status",
1334            "finished_at",
1335            "data_source",
1336            "data_uris",
1337        ]
1338        .iter()
1339        .copied()
1340        .collect();
1341        for key in obj.keys() {
1342            assert!(
1343                allowed.contains(key.as_str()),
1344                "unexpected field '{key}' in S3 route payload (additionalProperties: false): {body}"
1345            );
1346        }
1347        assert!(
1348            obj.contains_key("data_source"),
1349            "data_source is required in S3 route"
1350        );
1351        assert!(
1352            obj.contains_key("data_uris"),
1353            "data_uris is required in S3 route"
1354        );
1355    }
1356
1357    // T-S3R-03: when uploaded_uris is empty, data_source is "inline" (not "s3")
1358    // and data_csv is present.  Confirms route dispatch is based on uploaded_uris.
1359    #[test]
1360    fn test_close_run_uses_inline_route_when_no_uris() {
1361        let body = capture_close_run_body(Some(0), Some("ts,cpu\n1,2\n"));
1362        let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1363        assert_eq!(
1364            v["data_source"], "inline",
1365            "data_source must be 'inline' when no URIs: {body}"
1366        );
1367        assert!(
1368            v.get("data_csv").is_some(),
1369            "data_csv must be present for inline route: {body}"
1370        );
1371        assert!(
1372            v.get("data_uris").is_none(),
1373            "data_uris must be absent for inline route: {body}"
1374        );
1375    }
1376
1377    // T-EOR-05: refresh_credentials updates ctx.credentials in place on success.
1378    #[test]
1379    fn test_refresh_credentials_updates_context() {
1380        use std::io::{Read as _, Write as _};
1381        use std::time::Duration;
1382
1383        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1384        let port = listener.local_addr().unwrap().port();
1385
1386        let response_body = r#"{"upload_credentials":{"access_key":"NEW_AK","secret_key":"NEW_SK","session_token":"NEW_ST","expiration":"2099-06-01T00:00:00Z"}}"#;
1387        let http_response = format!(
1388            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1389            response_body.len(),
1390            response_body
1391        );
1392
1393        std::thread::spawn(move || {
1394            if let Ok((mut stream, _)) = listener.accept() {
1395                let mut tmp = [0u8; 4096];
1396                let _ = stream.read(&mut tmp);
1397                stream.write_all(http_response.as_bytes()).ok();
1398            }
1399        });
1400
1401        let agent = ureq::config::Config::builder()
1402            .timeout_global(Some(Duration::from_secs(30)))
1403            .build()
1404            .new_agent();
1405
1406        let mut ctx = RunContext {
1407            run_id: "run-123".to_string(),
1408            upload_uri_prefix: "s3://b/p".to_string(),
1409            credentials: UploadCredentials {
1410                access_key_id: "OLD_AK".to_string(),
1411                secret_access_key: "OLD_SK".to_string(),
1412                session_token: "OLD_ST".to_string(),
1413                expires_at: "2099-01-01T00:00:00Z".to_string(),
1414            },
1415        };
1416
1417        let result = refresh_credentials(
1418            &agent,
1419            &format!("http://127.0.0.1:{port}"),
1420            "test-token",
1421            &mut ctx,
1422        );
1423        assert!(
1424            result.is_ok(),
1425            "refresh_credentials failed: {:?}",
1426            result.err()
1427        );
1428        assert_eq!(ctx.credentials.access_key_id, "NEW_AK");
1429        assert_eq!(ctx.credentials.secret_access_key, "NEW_SK");
1430        assert_eq!(ctx.credentials.session_token, "NEW_ST");
1431        assert_eq!(ctx.credentials.expires_at, "2099-06-01T00:00:00Z");
1432    }
1433
1434    // T-EOR-04: start_run POSTs to /runs and parses the response correctly.
1435    #[test]
1436    fn test_start_run_posts_to_runs_endpoint() {
1437        use std::io::{Read as _, Write as _};
1438        use std::sync::mpsc;
1439        use std::time::Duration;
1440
1441        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1442        let port = listener.local_addr().unwrap().port();
1443        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1444
1445        // Minimal valid StartRunResponse JSON.
1446        let response_body = r#"{"run_id":"run-xyz","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1447        let http_response = format!(
1448            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1449            response_body.len(),
1450            response_body
1451        );
1452
1453        std::thread::spawn(move || {
1454            if let Ok((mut stream, _)) = listener.accept() {
1455                let mut buf = Vec::<u8>::new();
1456                let mut tmp = [0u8; 4096];
1457                loop {
1458                    let n = stream.read(&mut tmp).unwrap_or(0);
1459                    if n == 0 {
1460                        break;
1461                    }
1462                    buf.extend_from_slice(&tmp[..n]);
1463                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1464                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1465                        let cl = header_str
1466                            .lines()
1467                            .find_map(|l| {
1468                                l.trim()
1469                                    .strip_prefix("content-length:")
1470                                    .and_then(|v| v.trim().parse::<usize>().ok())
1471                            })
1472                            .unwrap_or(0);
1473                        if buf.len() >= sep + 4 + cl {
1474                            break;
1475                        }
1476                    }
1477                }
1478                tx.send(buf).ok();
1479                stream.write_all(http_response.as_bytes()).ok();
1480            }
1481        });
1482
1483        let agent = ureq::config::Config::builder()
1484            .timeout_global(Some(Duration::from_secs(30)))
1485            .build()
1486            .new_agent();
1487
1488        let meta = crate::config::JobMetadata {
1489            job_name: Some("test-job".to_string()),
1490            ..Default::default()
1491        };
1492        let host = crate::metrics::HostInfo::default();
1493        let cloud = crate::metrics::CloudInfo::default();
1494
1495        let result = start_run(
1496            &agent,
1497            &format!("http://127.0.0.1:{port}"),
1498            "test-token",
1499            &meta,
1500            Some(42),
1501            &host,
1502            &cloud,
1503        );
1504        assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1505
1506        let ctx = result.unwrap();
1507        assert_eq!(ctx.run_id, "run-xyz");
1508        assert_eq!(ctx.upload_uri_prefix, "s3://b/p");
1509        assert_eq!(ctx.credentials.access_key_id, "AK");
1510
1511        let raw = rx.recv().expect("mock server did not receive request");
1512        let raw_str = String::from_utf8_lossy(&raw);
1513        assert!(
1514            raw_str.contains("POST /runs"),
1515            "must POST to /runs: {raw_str}"
1516        );
1517        assert!(
1518            raw_str.contains("\"job_name\":\"test-job\""),
1519            "job_name missing: {raw_str}"
1520        );
1521        // `pid` is intentionally absent -- the RunCreate schema does not accept it.
1522        assert!(
1523            !raw_str.contains("\"pid\""),
1524            "pid must not appear in the payload: {raw_str}"
1525        );
1526    }
1527
1528    // T-CMD-01: when start_run is called with a non-empty command (shell-wrapper mode),
1529    // the JSON body must contain a "command" array matching the wrapped command tokens.
1530    // This mirrors: SENTINEL_API_TOKEN=... resource-tracker --output /tmp/log \
1531    //   stress --cpu 4 --vm 1 --vm-bytes 12024M --timeout 63s
1532    #[test]
1533    fn test_start_run_includes_command_array_in_payload() {
1534        use std::io::{Read as _, Write as _};
1535        use std::sync::mpsc;
1536        use std::time::Duration;
1537
1538        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1539        let port = listener.local_addr().unwrap().port();
1540        let (tx, rx) = mpsc::channel::<Vec<u8>>();
1541
1542        let response_body = r#"{"run_id":"r1","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1543        let http_response = format!(
1544            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1545            response_body.len(),
1546            response_body
1547        );
1548
1549        std::thread::spawn(move || {
1550            if let Ok((mut stream, _)) = listener.accept() {
1551                let mut buf = Vec::<u8>::new();
1552                let mut tmp = [0u8; 4096];
1553                loop {
1554                    let n = stream.read(&mut tmp).unwrap_or(0);
1555                    if n == 0 {
1556                        break;
1557                    }
1558                    buf.extend_from_slice(&tmp[..n]);
1559                    if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1560                        let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1561                        let cl = header_str
1562                            .lines()
1563                            .find_map(|l| {
1564                                l.trim()
1565                                    .strip_prefix("content-length:")
1566                                    .and_then(|v| v.trim().parse::<usize>().ok())
1567                            })
1568                            .unwrap_or(0);
1569                        if buf.len() >= sep + 4 + cl {
1570                            break;
1571                        }
1572                    }
1573                }
1574                tx.send(buf).ok();
1575                stream.write_all(http_response.as_bytes()).ok();
1576            }
1577        });
1578
1579        let agent = ureq::config::Config::builder()
1580            .timeout_global(Some(Duration::from_secs(30)))
1581            .build()
1582            .new_agent();
1583
1584        // Simulate: resource-tracker --output /tmp/resource-tracker-logs \
1585        //   stress --cpu 4 --vm 1 --vm-bytes 12024M --timeout 63s
1586        let wrapped_command: Vec<String> = vec![
1587            "stress",
1588            "--cpu",
1589            "4",
1590            "--vm",
1591            "1",
1592            "--vm-bytes",
1593            "12024M",
1594            "--timeout",
1595            "63s",
1596        ]
1597        .into_iter()
1598        .map(String::from)
1599        .collect();
1600
1601        let meta = crate::config::JobMetadata {
1602            command: wrapped_command.clone(),
1603            ..Default::default()
1604        };
1605        let host = crate::metrics::HostInfo::default();
1606        let cloud = crate::metrics::CloudInfo::default();
1607
1608        let result = start_run(
1609            &agent,
1610            &format!("http://127.0.0.1:{port}"),
1611            "test-token",
1612            &meta,
1613            None,
1614            &host,
1615            &cloud,
1616        );
1617        assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1618
1619        let raw = rx.recv().expect("mock server did not receive request");
1620        let raw_str = String::from_utf8_lossy(&raw);
1621
1622        // The RunCreate schema declares `command` as `string | null` ("JSON array encoded
1623        // in TEXT"), so the body must contain the command as a JSON-encoded string, not a
1624        // bare JSON array.
1625        let expected = r#""command":"[\"stress\",\"--cpu\",\"4\",\"--vm\",\"1\",\"--vm-bytes\",\"12024M\",\"--timeout\",\"63s\"]""#;
1626        assert!(
1627            raw_str.contains(expected),
1628            "command JSON string not found in payload.\nExpected: {expected}\nGot body: {raw_str}"
1629        );
1630    }
1631
1632    // T-CMD-02: when start_run is called without a wrapped command (standalone mode),
1633    // the "command" field must be absent from the JSON body.
1634    #[test]
1635    fn test_start_run_omits_command_when_standalone() {
1636        let req_payload = MetadataPayload {
1637            job_name: None,
1638            project_name: None,
1639            stage_name: None,
1640            task_name: None,
1641            team: None,
1642            env: None,
1643            language: None,
1644            orchestrator: None,
1645            executor: None,
1646            external_run_id: None,
1647            container_image: None,
1648            tags: &[],
1649            command: None, // empty = standalone mode
1650        };
1651        let json = serde_json::to_string(&req_payload).expect("serialize failed");
1652        assert!(
1653            !json.contains("\"command\""),
1654            "command must be absent from payload in standalone mode: {json}"
1655        );
1656    }
1657
1658    // ---------------------------------------------------------------------------
1659    // Integration test: hits the REAL Sentinel API.
1660    //
1661    // Requires:
1662    //   SENTINEL_API_TOKEN  -- a valid bearer token
1663    //   SENTINEL_API_BASE   -- optional; defaults to https://api.sentinel.sparecores.net
1664    //
1665    // Run explicitly (skipped in normal `cargo test`):
1666    //   cargo test test_real_api_finish_run -- --include-ignored
1667    //   SENTINEL_API_TOKEN=<token> cargo test test_real_api_finish_run -- --include-ignored
1668    // ---------------------------------------------------------------------------
1669
1670    // T-INT-01: start_run + close_run against the real API both return Ok(()).
1671    // Verifies end-to-end that:
1672    //   - start_run registers a new run and returns a RunContext with a run_id.
1673    //   - close_run POSTs the correct RunFinishInline payload and receives 200.
1674    // Runs automatically when SENTINEL_API_TOKEN is set; skips otherwise.
1675    #[test]
1676    fn test_real_api_finish_run_returns_ok() {
1677        use crate::config::JobMetadata;
1678        use crate::metrics::{CloudInfo, CpuMetrics, HostInfo, MemoryMetrics, Sample};
1679        use crate::sentinel::upload::samples_to_csv;
1680        use std::time::Duration;
1681
1682        let token = match std::env::var("SENTINEL_API_TOKEN") {
1683            Ok(t) if !t.is_empty() => t,
1684            _ => {
1685                eprintln!("skip: SENTINEL_API_TOKEN not set or empty");
1686                return;
1687            }
1688        };
1689        let api_base = std::env::var("SENTINEL_API_BASE")
1690            .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
1691        eprintln!("T-INT-01: using api_base={api_base}");
1692
1693        let agent = ureq::config::Config::builder()
1694            .timeout_global(Some(Duration::from_secs(30)))
1695            .build()
1696            .new_agent();
1697
1698        let metadata = JobMetadata {
1699            job_name: Some("integration-test-close-run".to_string()),
1700            ..Default::default()
1701        };
1702        let host = HostInfo::default();
1703        let cloud = CloudInfo::default();
1704
1705        // Step 1: register a new run.
1706        eprintln!("T-INT-01: calling start_run...");
1707        let ctx = match start_run(&agent, &api_base, &token, &metadata, None, &host, &cloud) {
1708            Ok(c) => {
1709                eprintln!("T-INT-01: start_run ok -- run_id={}", c.run_id);
1710                c
1711            }
1712            Err(e) => panic!("start_run failed: {e}"),
1713        };
1714        assert!(!ctx.run_id.is_empty(), "run_id must be non-empty");
1715
1716        // Step 2: build a proper sample using the real CSV format so that
1717        // the column names match what the API expects.
1718        let timestamp_secs = std::time::SystemTime::now()
1719            .duration_since(std::time::UNIX_EPOCH)
1720            .unwrap_or_default()
1721            .as_secs();
1722        let sample = Sample {
1723            timestamp_secs,
1724            job_name: Some("integration-test-close-run".to_string()),
1725            tracked_pid: None,
1726            cpu: CpuMetrics::default(),
1727            memory: MemoryMetrics::default(),
1728            network: vec![],
1729            disk: vec![],
1730            gpu: vec![],
1731        };
1732        let csv = samples_to_csv(&[sample], 1);
1733        eprintln!(
1734            "T-INT-01: csv preview (first 120 chars): {}",
1735            &csv[..csv.len().min(120)]
1736        );
1737
1738        // Step 3: finish the run with the inline CSV (no S3 uploads in this test).
1739        eprintln!("T-INT-01: calling close_run...");
1740        let result = close_run(
1741            &agent,
1742            &api_base,
1743            &token,
1744            &ctx,
1745            Some(0),
1746            Some(csv),
1747            &[], // no S3 uploads → inline route
1748        );
1749        match &result {
1750            Ok(()) => eprintln!("T-INT-01: close_run ok -- 200 received"),
1751            Err(e) => eprintln!("T-INT-01: close_run FAILED: {e}"),
1752        }
1753        assert!(
1754            result.is_ok(),
1755            "close_run must return Ok (200) against the real API: {result:?}"
1756        );
1757    }
1758}