1use crate::config::JobMetadata;
2use crate::metrics::{CloudInfo, HostInfo};
3use crate::sentinel::s3::UploadCredentials;
4use serde::{Deserialize, Serialize};
5
6fn slice_is_empty(v: &&[String]) -> bool {
7 v.is_empty()
8}
9
10#[cfg(test)]
15fn base64_encode(input: &[u8]) -> String {
16 const ALPHA: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
17 let mut out = String::with_capacity((input.len() + 2) / 3 * 4);
18 input.chunks(3).for_each(|chunk| {
19 let b0 = u32::from(chunk[0]);
20 let b1 = if chunk.len() > 1 {
21 u32::from(chunk[1])
22 } else {
23 0
24 };
25 let b2 = if chunk.len() > 2 {
26 u32::from(chunk[2])
27 } else {
28 0
29 };
30 let n = (b0 << 16) | (b1 << 8) | b2;
31 out.push(char::from(ALPHA[((n >> 18) & 0x3f) as usize]));
32 out.push(char::from(ALPHA[((n >> 12) & 0x3f) as usize]));
33 out.push(if chunk.len() > 1 {
34 char::from(ALPHA[((n >> 6) & 0x3f) as usize])
35 } else {
36 '='
37 });
38 out.push(if chunk.len() > 2 {
39 char::from(ALPHA[(n & 0x3f) as usize])
40 } else {
41 '='
42 });
43 });
44 out
45}
46
47#[derive(Debug, Deserialize)]
52struct StartRunResponse {
53 run_id: String,
54 upload_uri_prefix: String,
55 upload_credentials: RawCredentials,
56}
57
58#[derive(Debug, Deserialize)]
65struct RawCredentials {
66 access_key: String,
67 secret_key: String,
68 session_token: String,
69 #[serde(alias = "expires_at", default)]
70 expiration: Option<String>,
71}
72
73#[derive(Debug, Deserialize)]
74struct RefreshCredentialsResponse {
75 upload_credentials: RawCredentials,
76}
77
78#[derive(Debug, Clone)]
84pub struct RunContext {
85 pub run_id: String,
86 pub upload_uri_prefix: String,
87 pub credentials: UploadCredentials,
88}
89
90impl RunContext {
91 pub fn creds_expiring_soon(&self) -> bool {
94 match parse_iso8601_secs(&self.credentials.expires_at) {
95 Some(expires_at_secs) => {
96 let now = std::time::SystemTime::now()
97 .duration_since(std::time::UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_secs();
100 expires_at_secs.saturating_sub(now) < 300
101 }
102 None => {
103 true
105 }
106 }
107 }
108}
109
110fn parse_iso8601_secs(s: &str) -> Option<u64> {
113 let s = s.trim_end_matches('Z');
115 let s = s.trim_end_matches("+00:00");
116 let nums: Vec<u64> = s
117 .split(|c: char| !c.is_ascii_digit())
118 .filter(|p| !p.is_empty())
119 .map(|p| p.parse().ok())
120 .collect::<Option<Vec<_>>>()?;
121 if nums.len() < 6 {
122 return None;
123 }
124 let (y, mo, d, h, mi, sec) = (nums[0], nums[1], nums[2], nums[3], nums[4], nums[5]);
125 let days = days_since_epoch(y, mo, d)?;
127 Some(days * 86400 + h * 3600 + mi * 60 + sec)
128}
129
130fn unix_secs_to_iso8601(secs: u64) -> String {
133 let tod = secs % 86400;
134 let mut days = secs / 86400;
135 let hh = tod / 3600;
136 let mm = (tod % 3600) / 60;
137 let ss = tod % 60;
138
139 let mut year = 1970u64;
140 loop {
141 let is_leap =
142 year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
143 let yd = if is_leap { 366u64 } else { 365u64 };
144 if days < yd {
145 break;
146 }
147 days -= yd;
148 year += 1;
149 }
150 let is_leap = year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
151 const MDAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
152 let mut month = 1u64;
153 loop {
154 let dim = if month == 2 && is_leap {
155 29u64
156 } else {
157 MDAYS[(month - 1) as usize]
158 };
159 if days < dim {
160 break;
161 }
162 days -= dim;
163 month += 1;
164 }
165 let day = days + 1;
166 format!("{year:04}-{month:02}-{day:02}T{hh:02}:{mm:02}:{ss:02}Z")
167}
168
169fn now_iso8601() -> String {
170 let secs = std::time::SystemTime::now()
171 .duration_since(std::time::UNIX_EPOCH)
172 .unwrap_or_default()
173 .as_secs();
174 unix_secs_to_iso8601(secs)
175}
176
177fn days_since_epoch(y: u64, mo: u64, d: u64) -> Option<u64> {
179 if !(1..=12).contains(&mo) || !(1..=31).contains(&d) || y < 1970 {
180 return None;
181 }
182 const DAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
184 let is_leap = y.is_multiple_of(4) && !y.is_multiple_of(100) || y.is_multiple_of(400);
185 let year_days: u64 = (1970..y)
186 .map(|yr| {
187 if (yr % 4 == 0 && yr % 100 != 0) || yr % 400 == 0 {
188 366
189 } else {
190 365
191 }
192 })
193 .sum();
194 let month_days: u64 = (1..mo)
195 .map(|m| {
196 if m == 2 && is_leap {
197 29
198 } else {
199 DAYS[(m - 1) as usize]
200 }
201 })
202 .sum();
203 Some(year_days + month_days + d - 1)
204}
205
206#[derive(Debug, Serialize)]
213struct StartRunRequest<'a> {
214 #[serde(flatten)]
215 metadata: MetadataPayload<'a>,
216 #[serde(flatten)]
217 host: &'a HostInfo,
218 #[serde(flatten)]
219 cloud: &'a CloudInfo,
220}
221
222#[derive(Debug, Serialize)]
223struct MetadataPayload<'a> {
224 #[serde(skip_serializing_if = "Option::is_none")]
225 job_name: Option<&'a str>,
226 #[serde(skip_serializing_if = "Option::is_none")]
227 project_name: Option<&'a str>,
228 #[serde(skip_serializing_if = "Option::is_none")]
229 stage_name: Option<&'a str>,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 task_name: Option<&'a str>,
232 #[serde(skip_serializing_if = "Option::is_none")]
233 team: Option<&'a str>,
234 #[serde(skip_serializing_if = "Option::is_none")]
235 env: Option<&'a str>,
236 #[serde(skip_serializing_if = "Option::is_none")]
237 language: Option<&'a str>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 orchestrator: Option<&'a str>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 executor: Option<&'a str>,
242 #[serde(skip_serializing_if = "Option::is_none")]
243 external_run_id: Option<&'a str>,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 container_image: Option<&'a str>,
246 #[serde(skip_serializing_if = "slice_is_empty")]
247 tags: &'a [String],
248 #[serde(skip_serializing_if = "Option::is_none")]
255 command: Option<String>,
256}
257
258#[derive(Debug, Serialize)]
267struct CloseRunInlineRequest {
268 #[serde(skip_serializing_if = "Option::is_none")]
269 exit_code: Option<i32>,
270 run_status: &'static str,
271 #[serde(skip_serializing_if = "Option::is_none")]
273 finished_at: Option<String>,
274 data_source: &'static str,
276 data_csv: String,
278}
279
280#[derive(Debug, Serialize)]
285struct CloseRunS3Request {
286 #[serde(skip_serializing_if = "Option::is_none")]
287 exit_code: Option<i32>,
288 run_status: &'static str,
289 #[serde(skip_serializing_if = "Option::is_none")]
291 finished_at: Option<String>,
292 data_source: &'static str,
294 data_uris: Vec<String>,
296}
297
298pub fn start_run(
307 agent: &ureq::Agent,
308 api_base: &str,
309 token: &str,
310 metadata: &JobMetadata,
311 pid: Option<i32>,
312 host: &HostInfo,
313 cloud: &CloudInfo,
314) -> Result<RunContext, String> {
315 let command_json: Option<String> = if metadata.command.is_empty() {
318 None
319 } else {
320 serde_json::to_string(&metadata.command).ok()
321 };
322
323 let _ = pid;
325
326 let payload = StartRunRequest {
327 metadata: MetadataPayload {
328 job_name: metadata.job_name.as_deref(),
329 project_name: metadata.project_name.as_deref(),
330 stage_name: metadata.stage_name.as_deref(),
331 task_name: metadata.task_name.as_deref(),
332 team: metadata.team.as_deref(),
333 env: metadata.env.as_deref(),
334 language: metadata.language.as_deref(),
335 orchestrator: metadata.orchestrator.as_deref(),
336 executor: metadata.executor.as_deref(),
337 external_run_id: metadata.external_run_id.as_deref(),
338 container_image: metadata.container_image.as_deref(),
339 tags: &metadata.tags,
340 command: command_json,
341 },
342 host,
343 cloud,
344 };
345
346 let url = format!("{api_base}/runs");
347 let body = serde_json::to_string(&payload)
348 .map_err(|e| format!("failed to serialize start_run payload: {e}"))?;
349
350 let mut response = agent
351 .post(&url)
352 .header("Authorization", &format!("Bearer {token}"))
353 .header("Content-Type", "application/json")
354 .send(body.as_bytes())
355 .map_err(|e| format!("start_run POST failed: {e}"))?;
356
357 let text = response
358 .body_mut()
359 .read_to_string()
360 .map_err(|e| format!("start_run read body failed: {e}"))?;
361
362 let resp: StartRunResponse = serde_json::from_str(&text).map_err(|e| {
363 format!(
364 "start_run parse response failed: {e} ({} bytes)",
365 text.len()
366 )
367 })?;
368
369 Ok(RunContext {
370 run_id: resp.run_id,
371 upload_uri_prefix: resp.upload_uri_prefix,
372 credentials: UploadCredentials {
373 access_key_id: resp.upload_credentials.access_key,
374 secret_access_key: resp.upload_credentials.secret_key,
375 session_token: resp.upload_credentials.session_token,
376 expires_at: resp
377 .upload_credentials
378 .expiration
379 .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
380 },
381 })
382}
383
384pub fn refresh_credentials(
388 agent: &ureq::Agent,
389 api_base: &str,
390 token: &str,
391 ctx: &mut RunContext,
392) -> Result<(), String> {
393 let url = format!("{api_base}/runs/{}/refresh-credentials", ctx.run_id);
394 let mut response = agent
395 .post(&url)
396 .header("Authorization", &format!("Bearer {token}"))
397 .send(b"" as &[u8])
398 .map_err(|e| format!("credential refresh POST failed: {e}"))?;
399
400 let text = response
401 .body_mut()
402 .read_to_string()
403 .map_err(|e| format!("credential refresh read body failed: {e}"))?;
404
405 let resp: RefreshCredentialsResponse = serde_json::from_str(&text).map_err(|e| {
406 format!(
407 "credential refresh parse failed: {e} ({} bytes)",
408 text.len()
409 )
410 })?;
411
412 ctx.credentials = UploadCredentials {
413 access_key_id: resp.upload_credentials.access_key,
414 secret_access_key: resp.upload_credentials.secret_key,
415 session_token: resp.upload_credentials.session_token,
416 expires_at: resp
417 .upload_credentials
418 .expiration
419 .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
420 };
421 Ok(())
422}
423
424pub fn close_run(
437 agent: &ureq::Agent,
438 api_base: &str,
439 token: &str,
440 ctx: &RunContext,
441 exit_code: Option<i32>,
442 remaining_csv: Option<String>,
443 uploaded_uris: &[String],
444) -> Result<(), String> {
445 let run_status = match exit_code {
447 Some(0) | None => "finished",
448 Some(_) => "failed",
449 };
450 let finished_at = Some(now_iso8601());
451
452 let url = format!("{api_base}/runs/{}/finish", ctx.run_id);
453 let body = if uploaded_uris.is_empty() {
454 let payload = CloseRunInlineRequest {
457 exit_code,
458 run_status,
459 finished_at,
460 data_source: "inline",
461 data_csv: remaining_csv.unwrap_or_default(),
462 };
463 serde_json::to_string(&payload)
464 .map_err(|e| format!("failed to serialize close_run inline payload: {e}"))?
465 } else {
466 let payload = CloseRunS3Request {
468 exit_code,
469 run_status,
470 finished_at,
471 data_source: "s3",
472 data_uris: uploaded_uris.to_vec(),
473 };
474 serde_json::to_string(&payload)
475 .map_err(|e| format!("failed to serialize close_run s3 payload: {e}"))?
476 };
477
478 let response = agent
483 .post(&url)
484 .header("Authorization", &format!("Bearer {token}"))
485 .header("Content-Type", "application/json")
486 .send(body.as_bytes())
487 .map_err(|e| format!("close_run POST failed: {e}"))?;
488
489 let status = response.status();
490 if status != 200 {
491 return Err(format!("close_run received HTTP {status}: expected 200"));
492 }
493
494 Ok(())
495}
496
497#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn test_creds_expiring_soon_far_future() {
507 let ctx = RunContext {
508 run_id: "test".to_string(),
509 upload_uri_prefix: "s3://b/p".to_string(),
510 credentials: UploadCredentials {
511 access_key_id: "k".to_string(),
512 secret_access_key: "s".to_string(),
513 session_token: "t".to_string(),
514 expires_at: "2099-01-01T00:00:00Z".to_string(),
515 },
516 };
517 assert!(!ctx.creds_expiring_soon());
518 }
519
520 #[test]
521 fn test_creds_expiring_soon_past() {
522 let ctx = RunContext {
523 run_id: "test".to_string(),
524 upload_uri_prefix: "s3://b/p".to_string(),
525 credentials: UploadCredentials {
526 access_key_id: "k".to_string(),
527 secret_access_key: "s".to_string(),
528 session_token: "t".to_string(),
529 expires_at: "1970-01-01T00:00:00Z".to_string(),
530 },
531 };
532 assert!(ctx.creds_expiring_soon());
533 }
534
535 #[test]
536 fn test_creds_expiring_soon_unparseable() {
537 let ctx = RunContext {
538 run_id: "test".to_string(),
539 upload_uri_prefix: "s3://b/p".to_string(),
540 credentials: UploadCredentials {
541 access_key_id: "k".to_string(),
542 secret_access_key: "s".to_string(),
543 session_token: "t".to_string(),
544 expires_at: "not-a-date".to_string(),
545 },
546 };
547 assert!(ctx.creds_expiring_soon());
549 }
550
551 #[test]
552 fn test_days_since_epoch_known_dates() {
553 assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
554 assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
555 assert_eq!(days_since_epoch(2026, 4, 1), Some(20544));
556 }
557
558 #[test]
559 fn test_parse_iso8601_secs_known() {
560 assert_eq!(
562 parse_iso8601_secs("2026-04-01T00:00:00Z"),
563 Some(20544 * 86400)
564 );
565 }
566
567 #[test]
571 fn test_close_run_posts_to_finish_endpoint() {
572 use std::io::{Read as _, Write as _};
574 use std::sync::mpsc;
575 use std::time::Duration;
576
577 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
578 let port = listener.local_addr().unwrap().port();
579 let (tx, rx) = mpsc::channel::<Vec<u8>>();
580
581 std::thread::spawn(move || {
582 if let Ok((mut stream, _)) = listener.accept() {
583 let mut buf = Vec::<u8>::new();
585 let mut tmp = [0u8; 4096];
586 loop {
587 let n = stream.read(&mut tmp).unwrap_or(0);
588 if n == 0 {
589 break;
590 }
591 buf.extend_from_slice(&tmp[..n]);
592 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
594 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
595 let cl = header_str
596 .lines()
597 .find_map(|l| {
598 l.trim()
599 .strip_prefix("content-length:")
600 .and_then(|v| v.trim().parse::<usize>().ok())
601 })
602 .unwrap_or(0);
603 if buf.len() >= sep + 4 + cl {
604 break;
605 }
606 }
607 }
608 tx.send(buf).ok();
609 stream
610 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n{}")
611 .ok();
612 }
613 });
614
615 let agent = ureq::config::Config::builder()
616 .timeout_global(Some(Duration::from_secs(30)))
617 .build()
618 .new_agent();
619
620 let ctx = RunContext {
621 run_id: "run-abc-999".to_string(),
622 upload_uri_prefix: "s3://b/p".to_string(),
623 credentials: UploadCredentials {
624 access_key_id: "k".to_string(),
625 secret_access_key: "s".to_string(),
626 session_token: "t".to_string(),
627 expires_at: "2099-01-01T00:00:00Z".to_string(),
628 },
629 };
630
631 let result = close_run(
632 &agent,
633 &format!("http://127.0.0.1:{port}"),
634 "test-token",
635 &ctx,
636 Some(0),
637 Some("header\nrow1\n".to_string()),
638 &[], );
640 assert!(result.is_ok(), "close_run failed: {result:?}");
641
642 let raw = rx.recv().expect("mock server did not receive request");
643 let raw_str = String::from_utf8_lossy(&raw);
644
645 assert!(
647 raw_str.contains("/runs/run-abc-999/finish"),
648 "URL must include /runs/{{run_id}}/finish: {raw_str}"
649 );
650 assert!(
651 !raw_str.contains("\"run_id\""),
652 "run_id must not appear in the JSON body: {raw_str}"
653 );
654
655 assert!(
657 raw_str.contains("\"data_source\":\"inline\""),
658 "expected data_source=inline in body: {raw_str}"
659 );
660 assert!(
661 !raw_str.contains("\"s3\""),
662 "s3 must not appear in body: {raw_str}"
663 );
664
665 assert!(
667 raw_str.contains("\"data_csv\""),
668 "data_csv absent from body: {raw_str}"
669 );
670 assert!(
671 raw_str.contains("header"),
672 "data_csv must contain raw CSV content (not base64): {raw_str}"
673 );
674
675 assert!(
677 raw_str.contains("\"finished_at\""),
678 "finished_at absent from body: {raw_str}"
679 );
680
681 assert!(
683 raw_str.contains("\"run_status\":\"finished\""),
684 "run_status absent or wrong: {raw_str}"
685 );
686 assert!(
687 raw_str.contains("\"exit_code\":0"),
688 "exit_code absent or wrong: {raw_str}"
689 );
690 }
691
692 #[test]
694 fn test_close_run_request_omits_run_id() {
695 let req = CloseRunInlineRequest {
696 exit_code: Some(0),
697 run_status: "finished",
698 finished_at: Some("2026-04-03T12:00:00Z".to_string()),
699 data_source: "inline",
700 data_csv: "timestamp,cpu\n1000,42\n".to_string(),
701 };
702 let json = serde_json::to_string(&req).expect("serialize failed");
703 assert!(
704 !json.contains("\"run_id\""),
705 "run_id must not appear in close_run body (it is in the URL): {json}"
706 );
707 }
708
709 #[test]
711 fn test_close_run_data_source_inline() {
712 let raw_csv = "timestamp,cpu\n1000,42\n";
713 let req = CloseRunInlineRequest {
714 exit_code: Some(0),
715 run_status: "finished",
716 finished_at: Some("2026-04-03T12:00:00Z".to_string()),
717 data_source: "inline",
718 data_csv: raw_csv.to_string(),
719 };
720 let json = serde_json::to_string(&req).expect("serialize failed");
721 assert!(
722 json.contains("\"data_source\":\"inline\""),
723 "data_source is not 'inline': {json}"
724 );
725 assert!(
727 json.contains("timestamp"),
728 "data_csv must contain raw CSV content (not base64): {json}"
729 );
730 assert!(
731 json.contains("\"finished_at\":\"2026-04-03T12:00:00Z\""),
732 "finished_at absent or wrong: {json}"
733 );
734 }
735
736 #[test]
738 fn test_base64_encode_rfc4648_vectors() {
739 assert_eq!(base64_encode(b""), "");
740 assert_eq!(base64_encode(b"f"), "Zg==");
741 assert_eq!(base64_encode(b"fo"), "Zm8=");
742 assert_eq!(base64_encode(b"foo"), "Zm9v");
743 assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
744 assert_eq!(base64_encode(b"fooba"), "Zm9vYmE=");
745 assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
746 }
747
748 #[test]
750 fn test_base64_encode_csv_roundtrip() {
751 let csv = "timestamp,value\n1000,42\n1001,99\n";
752 let encoded = base64_encode(csv.as_bytes());
753 encoded.chars().for_each(|c| {
755 assert!(
756 c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=',
757 "invalid base64 char '{c}' in: {encoded}"
758 );
759 });
760 }
761
762 #[test]
764 fn test_days_since_epoch_invalid_inputs() {
765 assert_eq!(days_since_epoch(1970, 0, 1), None, "month 0 is invalid");
766 assert_eq!(days_since_epoch(1970, 13, 1), None, "month 13 is invalid");
767 assert_eq!(days_since_epoch(1970, 1, 0), None, "day 0 is invalid");
768 assert_eq!(days_since_epoch(1970, 1, 32), None, "day 32 is invalid");
769 assert_eq!(
770 days_since_epoch(1969, 12, 31),
771 None,
772 "year before 1970 is invalid"
773 );
774 }
775
776 #[test]
778 fn test_parse_iso8601_secs_with_utc_offset() {
779 let with_z = parse_iso8601_secs("2026-04-01T00:00:00Z");
780 let with_plus = parse_iso8601_secs("2026-04-01T00:00:00+00:00");
781 assert_eq!(
782 with_z, with_plus,
783 "+00:00 and Z must parse to the same timestamp"
784 );
785 }
786
787 #[test]
789 fn test_parse_iso8601_secs_too_few_components() {
790 assert_eq!(
791 parse_iso8601_secs("2026-04-01"),
792 None,
793 "date-only string must return None"
794 );
795 assert_eq!(parse_iso8601_secs("not-a-date"), None);
796 }
797
798 #[test]
800 fn test_slice_is_empty_helper() {
801 let empty: &[String] = &[];
802 let nonempty: &[String] = &["tag".to_string()];
803 assert!(slice_is_empty(&&*empty), "empty slice should return true");
804 assert!(
805 !slice_is_empty(&&*nonempty),
806 "nonempty slice should return false"
807 );
808 }
809
810 #[test]
812 fn test_unix_secs_to_iso8601_known_values() {
813 assert_eq!(unix_secs_to_iso8601(0), "1970-01-01T00:00:00Z");
815 assert_eq!(unix_secs_to_iso8601(946684800), "2000-01-01T00:00:00Z");
817 let secs = parse_iso8601_secs("2026-04-03T15:30:45Z").expect("parse failed");
820 assert_eq!(unix_secs_to_iso8601(secs), "2026-04-03T15:30:45Z");
821 }
822
823 #[test]
826 fn test_unix_secs_to_iso8601_leap_day() {
827 let secs = parse_iso8601_secs("2000-02-29T12:00:00Z").expect("parse failed");
828 assert_eq!(unix_secs_to_iso8601(secs), "2000-02-29T12:00:00Z");
829 }
830
831 #[test]
833 fn test_now_iso8601_parses() {
834 let s = now_iso8601();
835 assert!(!s.is_empty(), "now_iso8601 must not be empty");
836 assert!(s.ends_with('Z'), "now_iso8601 must end with Z: {s}");
837 let secs = parse_iso8601_secs(&s);
838 assert!(secs.is_some(), "now_iso8601 output must parse back: {s}");
839 }
840
841 #[test]
843 fn test_close_run_finished_at_omitted_when_none() {
844 let req = CloseRunInlineRequest {
845 exit_code: None,
846 run_status: "finished",
847 finished_at: None,
848 data_source: "inline",
849 data_csv: "".to_string(),
850 };
851 let json = serde_json::to_string(&req).expect("serialize failed");
852 assert!(
853 !json.contains("\"finished_at\""),
854 "finished_at must be absent when None: {json}"
855 );
856 }
857
858 fn finish_response_json(run_id: &str, run_status: &str, exit_code: Option<i32>) -> String {
871 let exit_code_field = match exit_code {
872 Some(c) => format!(",\"exit_code\":{c}"),
873 None => String::new(),
874 };
875 format!(
876 r#"{{"run":{{"run_id":"{run_id}","created_at":"2026-04-03T10:00:00Z","heartbeat_at":"2026-04-03T10:00:30Z","finished_at":"2026-04-03T10:01:00Z","run_status":"{run_status}","tag_count":0,"tags":[]{exit_code_field}}},"processing":{{"status":"ok","rows":1,"files":null,"duration_ms":5.0,"error":null}}}}"#
877 )
878 }
879
880 fn capture_close_run_body(exit_code: Option<i32>, csv: Option<&str>) -> String {
885 use std::io::{Read as _, Write as _};
886 use std::sync::mpsc;
887 use std::time::Duration;
888
889 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
890 let port = listener.local_addr().unwrap().port();
891 let (tx, rx) = mpsc::channel::<Vec<u8>>();
892
893 std::thread::spawn(move || {
894 if let Ok((mut stream, _)) = listener.accept() {
895 let mut buf = Vec::<u8>::new();
896 let mut tmp = [0u8; 4096];
897 loop {
898 let n = stream.read(&mut tmp).unwrap_or(0);
899 if n == 0 {
900 break;
901 }
902 buf.extend_from_slice(&tmp[..n]);
903 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
904 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
905 let cl = header_str
906 .lines()
907 .find_map(|l| {
908 l.trim()
909 .strip_prefix("content-length:")
910 .and_then(|v| v.trim().parse::<usize>().ok())
911 })
912 .unwrap_or(0);
913 if buf.len() >= sep + 4 + cl {
914 break;
915 }
916 }
917 }
918 let body_start = buf
920 .windows(4)
921 .position(|w| w == b"\r\n\r\n")
922 .map(|p| p + 4)
923 .unwrap_or(buf.len());
924 let body_str = String::from_utf8_lossy(&buf[body_start..]);
925 let parsed: serde_json::Value =
926 serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
927 let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("inline")
929 && parsed.get("data_csv").is_some();
930 let (status_line, resp_body) = if valid {
931 let run_status = parsed
932 .get("run_status")
933 .and_then(|v| v.as_str())
934 .unwrap_or("finished");
935 let ec = parsed
936 .get("exit_code")
937 .and_then(|v| v.as_i64())
938 .map(|v| v as i32);
939 (
940 "HTTP/1.1 200 OK",
941 finish_response_json("run-spec-test", run_status, ec),
942 )
943 } else {
944 (
945 "HTTP/1.1 422 Unprocessable Entity",
946 r#"{"detail":[{"loc":["body"],"msg":"field required","type":"value_error.missing"}]}"#.to_string(),
947 )
948 };
949 tx.send(buf).ok();
950 let http = format!(
951 "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
952 resp_body.len(),
953 resp_body
954 );
955 stream.write_all(http.as_bytes()).ok();
956 }
957 });
958
959 let agent = ureq::config::Config::builder()
960 .timeout_global(Some(Duration::from_secs(30)))
961 .build()
962 .new_agent();
963 let ctx = RunContext {
964 run_id: "run-spec-test".to_string(),
965 upload_uri_prefix: "s3://b/p".to_string(),
966 credentials: UploadCredentials {
967 access_key_id: "k".to_string(),
968 secret_access_key: "s".to_string(),
969 session_token: "t".to_string(),
970 expires_at: "2099-01-01T00:00:00Z".to_string(),
971 },
972 };
973 let _ = close_run(
974 &agent,
975 &format!("http://127.0.0.1:{port}"),
976 "token",
977 &ctx,
978 exit_code,
979 csv.map(String::from),
980 &[], );
982 let raw = rx.recv().expect("mock server did not capture request");
983 let body_start = raw
985 .windows(4)
986 .position(|w| w == b"\r\n\r\n")
987 .map(|p| p + 4)
988 .unwrap_or(0);
989 String::from_utf8_lossy(&raw[body_start..]).to_string()
990 }
991
992 #[test]
994 fn test_close_run_run_status_finished_for_zero_exit() {
995 let body = capture_close_run_body(Some(0), None);
996 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
997 assert_eq!(
998 v["run_status"], "finished",
999 "run_status must be 'finished' for exit_code=0: {body}"
1000 );
1001 assert_eq!(v["exit_code"], 0, "exit_code must be 0 in payload: {body}");
1002 }
1003
1004 #[test]
1006 fn test_close_run_run_status_finished_for_sigterm() {
1007 let body = capture_close_run_body(None, None);
1008 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1009 assert_eq!(
1010 v["run_status"], "finished",
1011 "run_status must be 'finished' when exit_code is None (SIGTERM): {body}"
1012 );
1013 assert!(
1015 v.get("exit_code").is_none(),
1016 "exit_code must be absent when None (spec: optional integer): {body}"
1017 );
1018 }
1019
1020 #[test]
1022 fn test_close_run_run_status_failed_for_nonzero_exit() {
1023 for code in [1, 2, 127, 130, 255] {
1024 let body = capture_close_run_body(Some(code), None);
1025 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1026 assert_eq!(
1027 v["run_status"], "failed",
1028 "run_status must be 'failed' for exit_code={code}: {body}"
1029 );
1030 }
1031 }
1032
1033 #[test]
1036 fn test_close_run_data_csv_is_raw_csv_not_base64() {
1037 let raw_csv = "timestamp,cpu_pct\n1743638400,42.5\n1743638401,44.0\n";
1038 let body = capture_close_run_body(Some(0), Some(raw_csv));
1039 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1040 let data_csv = v["data_csv"].as_str().expect("data_csv must be a string");
1041 assert!(
1042 data_csv.contains("timestamp"),
1043 "data_csv must be raw CSV (contains header row): {data_csv}"
1044 );
1045 assert!(
1046 data_csv.contains("42.5"),
1047 "data_csv must be raw CSV (contains data values): {data_csv}"
1048 );
1049 assert!(
1051 data_csv.contains(','),
1052 "data_csv must contain CSV commas (not base64): {data_csv}"
1053 );
1054 }
1055
1056 #[test]
1059 fn test_close_run_finished_at_is_valid_iso8601() {
1060 let body = capture_close_run_body(Some(0), None);
1061 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1062 let fa = v["finished_at"]
1063 .as_str()
1064 .expect("finished_at must be a string");
1065 assert!(fa.ends_with('Z'), "finished_at must end with Z (UTC): {fa}");
1066 let secs = parse_iso8601_secs(fa);
1067 assert!(
1068 secs.is_some(),
1069 "finished_at must be a parseable ISO 8601 timestamp: {fa}"
1070 );
1071 let now = std::time::SystemTime::now()
1073 .duration_since(std::time::UNIX_EPOCH)
1074 .unwrap_or_default()
1075 .as_secs();
1076 let diff = now.abs_diff(secs.unwrap());
1077 assert!(
1078 diff < 60,
1079 "finished_at must be close to current time (diff={diff}s): {fa}"
1080 );
1081 }
1082
1083 #[test]
1086 fn test_close_run_handles_valid_run_finish_response() {
1087 use std::io::{Read as _, Write as _};
1088 use std::time::Duration;
1089
1090 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1091 let port = listener.local_addr().unwrap().port();
1092
1093 let response_body = r#"{
1095 "run": {
1096 "run_id": "01959e3a-0001-0000-0000-000000000000",
1097 "created_at": "2026-04-03T10:00:00Z",
1098 "heartbeat_at": "2026-04-03T10:00:30Z",
1099 "finished_at": "2026-04-03T10:01:00Z",
1100 "run_status": "finished",
1101 "tag_count": 0,
1102 "tags": []
1103 },
1104 "processing": {
1105 "status": "ok",
1106 "rows": 60,
1107 "files": null,
1108 "duration_ms": 12.5,
1109 "error": null
1110 }
1111 }"#;
1112 let http_response = format!(
1113 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1114 response_body.len(),
1115 response_body
1116 );
1117
1118 std::thread::spawn(move || {
1119 if let Ok((mut stream, _)) = listener.accept() {
1120 let mut tmp = [0u8; 4096];
1121 let _ = stream.read(&mut tmp);
1122 stream.write_all(http_response.as_bytes()).ok();
1123 }
1124 });
1125
1126 let agent = ureq::config::Config::builder()
1127 .timeout_global(Some(Duration::from_secs(30)))
1128 .build()
1129 .new_agent();
1130 let ctx = RunContext {
1131 run_id: "01959e3a-0001-0000-0000-000000000000".to_string(),
1132 upload_uri_prefix: "s3://b/p".to_string(),
1133 credentials: UploadCredentials {
1134 access_key_id: "k".to_string(),
1135 secret_access_key: "s".to_string(),
1136 session_token: "t".to_string(),
1137 expires_at: "2099-01-01T00:00:00Z".to_string(),
1138 },
1139 };
1140 let result = close_run(
1141 &agent,
1142 &format!("http://127.0.0.1:{port}"),
1143 "test-token",
1144 &ctx,
1145 Some(0),
1146 Some("timestamp,cpu_pct\n1743638400,42.5\n".to_string()),
1147 &[], );
1149 assert!(
1150 result.is_ok(),
1151 "close_run must succeed for a 200 response: {result:?}"
1152 );
1153 }
1154
1155 #[test]
1158 fn test_close_run_no_extra_fields_in_payload() {
1159 let body = capture_close_run_body(Some(0), Some("ts,v\n1,2\n"));
1160 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1161 let obj = v.as_object().expect("payload must be a JSON object");
1162 let allowed: std::collections::HashSet<&str> = [
1163 "exit_code",
1164 "run_status",
1165 "finished_at",
1166 "data_source",
1167 "data_csv",
1168 ]
1169 .iter()
1170 .copied()
1171 .collect();
1172 for key in obj.keys() {
1173 assert!(
1174 allowed.contains(key.as_str()),
1175 "unexpected field '{key}' in payload -- not allowed by RunFinishInline schema (additionalProperties: false)"
1176 );
1177 }
1178 assert!(obj.contains_key("data_source"), "data_source is required");
1180 assert!(obj.contains_key("data_csv"), "data_csv is required");
1181 }
1182
1183 fn capture_close_run_s3_body(exit_code: Option<i32>, uris: &[&str]) -> String {
1189 use std::io::{Read as _, Write as _};
1190 use std::sync::mpsc;
1191 use std::time::Duration;
1192
1193 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1194 let port = listener.local_addr().unwrap().port();
1195 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1196
1197 std::thread::spawn(move || {
1198 if let Ok((mut stream, _)) = listener.accept() {
1199 let mut buf = Vec::<u8>::new();
1200 let mut tmp = [0u8; 4096];
1201 loop {
1202 let n = stream.read(&mut tmp).unwrap_or(0);
1203 if n == 0 {
1204 break;
1205 }
1206 buf.extend_from_slice(&tmp[..n]);
1207 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1208 let hdr = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1209 let cl = hdr
1210 .lines()
1211 .find_map(|l| {
1212 l.trim()
1213 .strip_prefix("content-length:")
1214 .and_then(|v| v.trim().parse::<usize>().ok())
1215 })
1216 .unwrap_or(0);
1217 if buf.len() >= sep + 4 + cl {
1218 break;
1219 }
1220 }
1221 }
1222 let body_start = buf
1224 .windows(4)
1225 .position(|w| w == b"\r\n\r\n")
1226 .map(|p| p + 4)
1227 .unwrap_or(buf.len());
1228 let body_str = String::from_utf8_lossy(&buf[body_start..]);
1229 let parsed: serde_json::Value =
1230 serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
1231 let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("s3")
1232 && parsed.get("data_uris").is_some();
1233 let run_status = parsed
1234 .get("run_status")
1235 .and_then(|v| v.as_str())
1236 .unwrap_or("finished");
1237 let ec = parsed
1238 .get("exit_code")
1239 .and_then(|v| v.as_i64())
1240 .map(|v| v as i32);
1241 let (status_line, resp_body) = if valid {
1242 (
1243 "HTTP/1.1 200 OK",
1244 finish_response_json("run-s3-test", run_status, ec),
1245 )
1246 } else {
1247 (
1248 "HTTP/1.1 422 Unprocessable Entity",
1249 r#"{"detail":[{"msg":"field required","type":"value_error.missing"}]}"#
1250 .to_string(),
1251 )
1252 };
1253 tx.send(buf).ok();
1254 let http = format!(
1255 "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1256 resp_body.len(),
1257 resp_body
1258 );
1259 stream.write_all(http.as_bytes()).ok();
1260 }
1261 });
1262
1263 let agent = ureq::config::Config::builder()
1264 .timeout_global(Some(Duration::from_secs(30)))
1265 .build()
1266 .new_agent();
1267 let ctx = RunContext {
1268 run_id: "run-s3-test".to_string(),
1269 upload_uri_prefix: "s3://b/p".to_string(),
1270 credentials: UploadCredentials {
1271 access_key_id: "k".to_string(),
1272 secret_access_key: "s".to_string(),
1273 session_token: "t".to_string(),
1274 expires_at: "2099-01-01T00:00:00Z".to_string(),
1275 },
1276 };
1277 let uploaded: Vec<String> = uris.iter().map(|s| (*s).to_string()).collect();
1278 let _ = close_run(
1279 &agent,
1280 &format!("http://127.0.0.1:{port}"),
1281 "token",
1282 &ctx,
1283 exit_code,
1284 None, &uploaded,
1286 );
1287 let raw = rx.recv().expect("mock server did not capture request");
1288 let body_start = raw
1289 .windows(4)
1290 .position(|w| w == b"\r\n\r\n")
1291 .map(|p| p + 4)
1292 .unwrap_or(0);
1293 String::from_utf8_lossy(&raw[body_start..]).to_string()
1294 }
1295
1296 #[test]
1299 fn test_close_run_uses_s3_route_when_uris_present() {
1300 let uris = &[
1301 "s3://my-bucket/prefix/run-abc/000000.csv.gz",
1302 "s3://my-bucket/prefix/run-abc/000001.csv.gz",
1303 ];
1304 let body = capture_close_run_s3_body(Some(0), uris);
1305 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1306 assert_eq!(
1307 v["data_source"], "s3",
1308 "data_source must be 's3' when URIs present: {body}"
1309 );
1310 let arr = v["data_uris"]
1311 .as_array()
1312 .expect("data_uris must be a JSON array");
1313 assert_eq!(arr.len(), 2, "data_uris must have 2 elements: {body}");
1314 assert_eq!(arr[0], "s3://my-bucket/prefix/run-abc/000000.csv.gz");
1315 assert_eq!(arr[1], "s3://my-bucket/prefix/run-abc/000001.csv.gz");
1316 assert!(
1318 !body.contains("\"data_csv\""),
1319 "data_csv must be absent in S3 route: {body}"
1320 );
1321 }
1322
1323 #[test]
1326 fn test_close_run_s3_no_extra_fields() {
1327 let uris = &["s3://bucket/prefix/run/000000.csv.gz"];
1328 let body = capture_close_run_s3_body(Some(0), uris);
1329 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1330 let obj = v.as_object().expect("payload must be a JSON object");
1331 let allowed: std::collections::HashSet<&str> = [
1332 "exit_code",
1333 "run_status",
1334 "finished_at",
1335 "data_source",
1336 "data_uris",
1337 ]
1338 .iter()
1339 .copied()
1340 .collect();
1341 for key in obj.keys() {
1342 assert!(
1343 allowed.contains(key.as_str()),
1344 "unexpected field '{key}' in S3 route payload (additionalProperties: false): {body}"
1345 );
1346 }
1347 assert!(
1348 obj.contains_key("data_source"),
1349 "data_source is required in S3 route"
1350 );
1351 assert!(
1352 obj.contains_key("data_uris"),
1353 "data_uris is required in S3 route"
1354 );
1355 }
1356
1357 #[test]
1360 fn test_close_run_uses_inline_route_when_no_uris() {
1361 let body = capture_close_run_body(Some(0), Some("ts,cpu\n1,2\n"));
1362 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1363 assert_eq!(
1364 v["data_source"], "inline",
1365 "data_source must be 'inline' when no URIs: {body}"
1366 );
1367 assert!(
1368 v.get("data_csv").is_some(),
1369 "data_csv must be present for inline route: {body}"
1370 );
1371 assert!(
1372 v.get("data_uris").is_none(),
1373 "data_uris must be absent for inline route: {body}"
1374 );
1375 }
1376
1377 #[test]
1379 fn test_refresh_credentials_updates_context() {
1380 use std::io::{Read as _, Write as _};
1381 use std::time::Duration;
1382
1383 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1384 let port = listener.local_addr().unwrap().port();
1385
1386 let response_body = r#"{"upload_credentials":{"access_key":"NEW_AK","secret_key":"NEW_SK","session_token":"NEW_ST","expiration":"2099-06-01T00:00:00Z"}}"#;
1387 let http_response = format!(
1388 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1389 response_body.len(),
1390 response_body
1391 );
1392
1393 std::thread::spawn(move || {
1394 if let Ok((mut stream, _)) = listener.accept() {
1395 let mut tmp = [0u8; 4096];
1396 let _ = stream.read(&mut tmp);
1397 stream.write_all(http_response.as_bytes()).ok();
1398 }
1399 });
1400
1401 let agent = ureq::config::Config::builder()
1402 .timeout_global(Some(Duration::from_secs(30)))
1403 .build()
1404 .new_agent();
1405
1406 let mut ctx = RunContext {
1407 run_id: "run-123".to_string(),
1408 upload_uri_prefix: "s3://b/p".to_string(),
1409 credentials: UploadCredentials {
1410 access_key_id: "OLD_AK".to_string(),
1411 secret_access_key: "OLD_SK".to_string(),
1412 session_token: "OLD_ST".to_string(),
1413 expires_at: "2099-01-01T00:00:00Z".to_string(),
1414 },
1415 };
1416
1417 let result = refresh_credentials(
1418 &agent,
1419 &format!("http://127.0.0.1:{port}"),
1420 "test-token",
1421 &mut ctx,
1422 );
1423 assert!(
1424 result.is_ok(),
1425 "refresh_credentials failed: {:?}",
1426 result.err()
1427 );
1428 assert_eq!(ctx.credentials.access_key_id, "NEW_AK");
1429 assert_eq!(ctx.credentials.secret_access_key, "NEW_SK");
1430 assert_eq!(ctx.credentials.session_token, "NEW_ST");
1431 assert_eq!(ctx.credentials.expires_at, "2099-06-01T00:00:00Z");
1432 }
1433
1434 #[test]
1436 fn test_start_run_posts_to_runs_endpoint() {
1437 use std::io::{Read as _, Write as _};
1438 use std::sync::mpsc;
1439 use std::time::Duration;
1440
1441 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1442 let port = listener.local_addr().unwrap().port();
1443 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1444
1445 let response_body = r#"{"run_id":"run-xyz","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1447 let http_response = format!(
1448 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1449 response_body.len(),
1450 response_body
1451 );
1452
1453 std::thread::spawn(move || {
1454 if let Ok((mut stream, _)) = listener.accept() {
1455 let mut buf = Vec::<u8>::new();
1456 let mut tmp = [0u8; 4096];
1457 loop {
1458 let n = stream.read(&mut tmp).unwrap_or(0);
1459 if n == 0 {
1460 break;
1461 }
1462 buf.extend_from_slice(&tmp[..n]);
1463 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1464 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1465 let cl = header_str
1466 .lines()
1467 .find_map(|l| {
1468 l.trim()
1469 .strip_prefix("content-length:")
1470 .and_then(|v| v.trim().parse::<usize>().ok())
1471 })
1472 .unwrap_or(0);
1473 if buf.len() >= sep + 4 + cl {
1474 break;
1475 }
1476 }
1477 }
1478 tx.send(buf).ok();
1479 stream.write_all(http_response.as_bytes()).ok();
1480 }
1481 });
1482
1483 let agent = ureq::config::Config::builder()
1484 .timeout_global(Some(Duration::from_secs(30)))
1485 .build()
1486 .new_agent();
1487
1488 let meta = crate::config::JobMetadata {
1489 job_name: Some("test-job".to_string()),
1490 ..Default::default()
1491 };
1492 let host = crate::metrics::HostInfo::default();
1493 let cloud = crate::metrics::CloudInfo::default();
1494
1495 let result = start_run(
1496 &agent,
1497 &format!("http://127.0.0.1:{port}"),
1498 "test-token",
1499 &meta,
1500 Some(42),
1501 &host,
1502 &cloud,
1503 );
1504 assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1505
1506 let ctx = result.unwrap();
1507 assert_eq!(ctx.run_id, "run-xyz");
1508 assert_eq!(ctx.upload_uri_prefix, "s3://b/p");
1509 assert_eq!(ctx.credentials.access_key_id, "AK");
1510
1511 let raw = rx.recv().expect("mock server did not receive request");
1512 let raw_str = String::from_utf8_lossy(&raw);
1513 assert!(
1514 raw_str.contains("POST /runs"),
1515 "must POST to /runs: {raw_str}"
1516 );
1517 assert!(
1518 raw_str.contains("\"job_name\":\"test-job\""),
1519 "job_name missing: {raw_str}"
1520 );
1521 assert!(
1523 !raw_str.contains("\"pid\""),
1524 "pid must not appear in the payload: {raw_str}"
1525 );
1526 }
1527
1528 #[test]
1533 fn test_start_run_includes_command_array_in_payload() {
1534 use std::io::{Read as _, Write as _};
1535 use std::sync::mpsc;
1536 use std::time::Duration;
1537
1538 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1539 let port = listener.local_addr().unwrap().port();
1540 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1541
1542 let response_body = r#"{"run_id":"r1","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1543 let http_response = format!(
1544 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1545 response_body.len(),
1546 response_body
1547 );
1548
1549 std::thread::spawn(move || {
1550 if let Ok((mut stream, _)) = listener.accept() {
1551 let mut buf = Vec::<u8>::new();
1552 let mut tmp = [0u8; 4096];
1553 loop {
1554 let n = stream.read(&mut tmp).unwrap_or(0);
1555 if n == 0 {
1556 break;
1557 }
1558 buf.extend_from_slice(&tmp[..n]);
1559 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1560 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1561 let cl = header_str
1562 .lines()
1563 .find_map(|l| {
1564 l.trim()
1565 .strip_prefix("content-length:")
1566 .and_then(|v| v.trim().parse::<usize>().ok())
1567 })
1568 .unwrap_or(0);
1569 if buf.len() >= sep + 4 + cl {
1570 break;
1571 }
1572 }
1573 }
1574 tx.send(buf).ok();
1575 stream.write_all(http_response.as_bytes()).ok();
1576 }
1577 });
1578
1579 let agent = ureq::config::Config::builder()
1580 .timeout_global(Some(Duration::from_secs(30)))
1581 .build()
1582 .new_agent();
1583
1584 let wrapped_command: Vec<String> = vec![
1587 "stress",
1588 "--cpu",
1589 "4",
1590 "--vm",
1591 "1",
1592 "--vm-bytes",
1593 "12024M",
1594 "--timeout",
1595 "63s",
1596 ]
1597 .into_iter()
1598 .map(String::from)
1599 .collect();
1600
1601 let meta = crate::config::JobMetadata {
1602 command: wrapped_command.clone(),
1603 ..Default::default()
1604 };
1605 let host = crate::metrics::HostInfo::default();
1606 let cloud = crate::metrics::CloudInfo::default();
1607
1608 let result = start_run(
1609 &agent,
1610 &format!("http://127.0.0.1:{port}"),
1611 "test-token",
1612 &meta,
1613 None,
1614 &host,
1615 &cloud,
1616 );
1617 assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1618
1619 let raw = rx.recv().expect("mock server did not receive request");
1620 let raw_str = String::from_utf8_lossy(&raw);
1621
1622 let expected = r#""command":"[\"stress\",\"--cpu\",\"4\",\"--vm\",\"1\",\"--vm-bytes\",\"12024M\",\"--timeout\",\"63s\"]""#;
1626 assert!(
1627 raw_str.contains(expected),
1628 "command JSON string not found in payload.\nExpected: {expected}\nGot body: {raw_str}"
1629 );
1630 }
1631
1632 #[test]
1635 fn test_start_run_omits_command_when_standalone() {
1636 let req_payload = MetadataPayload {
1637 job_name: None,
1638 project_name: None,
1639 stage_name: None,
1640 task_name: None,
1641 team: None,
1642 env: None,
1643 language: None,
1644 orchestrator: None,
1645 executor: None,
1646 external_run_id: None,
1647 container_image: None,
1648 tags: &[],
1649 command: None, };
1651 let json = serde_json::to_string(&req_payload).expect("serialize failed");
1652 assert!(
1653 !json.contains("\"command\""),
1654 "command must be absent from payload in standalone mode: {json}"
1655 );
1656 }
1657
1658 #[test]
1676 fn test_real_api_finish_run_returns_ok() {
1677 use crate::config::JobMetadata;
1678 use crate::metrics::{CloudInfo, CpuMetrics, HostInfo, MemoryMetrics, Sample};
1679 use crate::sentinel::upload::samples_to_csv;
1680 use std::time::Duration;
1681
1682 let token = match std::env::var("SENTINEL_API_TOKEN") {
1683 Ok(t) if !t.is_empty() => t,
1684 _ => {
1685 eprintln!("skip: SENTINEL_API_TOKEN not set or empty");
1686 return;
1687 }
1688 };
1689 let api_base = std::env::var("SENTINEL_API_BASE")
1690 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
1691 eprintln!("T-INT-01: using api_base={api_base}");
1692
1693 let agent = ureq::config::Config::builder()
1694 .timeout_global(Some(Duration::from_secs(30)))
1695 .build()
1696 .new_agent();
1697
1698 let metadata = JobMetadata {
1699 job_name: Some("integration-test-close-run".to_string()),
1700 ..Default::default()
1701 };
1702 let host = HostInfo::default();
1703 let cloud = CloudInfo::default();
1704
1705 eprintln!("T-INT-01: calling start_run...");
1707 let ctx = match start_run(&agent, &api_base, &token, &metadata, None, &host, &cloud) {
1708 Ok(c) => {
1709 eprintln!("T-INT-01: start_run ok -- run_id={}", c.run_id);
1710 c
1711 }
1712 Err(e) => panic!("start_run failed: {e}"),
1713 };
1714 assert!(!ctx.run_id.is_empty(), "run_id must be non-empty");
1715
1716 let timestamp_secs = std::time::SystemTime::now()
1719 .duration_since(std::time::UNIX_EPOCH)
1720 .unwrap_or_default()
1721 .as_secs();
1722 let sample = Sample {
1723 timestamp_secs,
1724 job_name: Some("integration-test-close-run".to_string()),
1725 tracked_pid: None,
1726 cpu: CpuMetrics::default(),
1727 memory: MemoryMetrics::default(),
1728 network: vec![],
1729 disk: vec![],
1730 gpu: vec![],
1731 };
1732 let csv = samples_to_csv(&[sample], 1);
1733 eprintln!(
1734 "T-INT-01: csv preview (first 120 chars): {}",
1735 &csv[..csv.len().min(120)]
1736 );
1737
1738 eprintln!("T-INT-01: calling close_run...");
1740 let result = close_run(
1741 &agent,
1742 &api_base,
1743 &token,
1744 &ctx,
1745 Some(0),
1746 Some(csv),
1747 &[], );
1749 match &result {
1750 Ok(()) => eprintln!("T-INT-01: close_run ok -- 200 received"),
1751 Err(e) => eprintln!("T-INT-01: close_run FAILED: {e}"),
1752 }
1753 assert!(
1754 result.is_ok(),
1755 "close_run must return Ok (200) against the real API: {result:?}"
1756 );
1757 }
1758}