1use crate::config::JobMetadata;
2use crate::metrics::{CloudInfo, HostInfo};
3use crate::sentinel::s3::UploadCredentials;
4use serde::{Deserialize, Serialize};
5
6fn slice_is_empty(v: &&[String]) -> bool {
7 v.is_empty()
8}
9
10#[cfg(test)]
15fn base64_encode(input: &[u8]) -> String {
16 const ALPHA: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
17 let mut out = String::with_capacity((input.len() + 2) / 3 * 4);
18 input.chunks(3).for_each(|chunk| {
19 let b0 = u32::from(chunk[0]);
20 let b1 = if chunk.len() > 1 {
21 u32::from(chunk[1])
22 } else {
23 0
24 };
25 let b2 = if chunk.len() > 2 {
26 u32::from(chunk[2])
27 } else {
28 0
29 };
30 let n = (b0 << 16) | (b1 << 8) | b2;
31 out.push(char::from(ALPHA[((n >> 18) & 0x3f) as usize]));
32 out.push(char::from(ALPHA[((n >> 12) & 0x3f) as usize]));
33 out.push(if chunk.len() > 1 {
34 char::from(ALPHA[((n >> 6) & 0x3f) as usize])
35 } else {
36 '='
37 });
38 out.push(if chunk.len() > 2 {
39 char::from(ALPHA[(n & 0x3f) as usize])
40 } else {
41 '='
42 });
43 });
44 out
45}
46
47#[derive(Debug, Deserialize)]
52struct StartRunResponse {
53 run_id: String,
54 upload_uri_prefix: String,
55 upload_credentials: RawCredentials,
56}
57
58#[derive(Debug, Deserialize)]
65struct RawCredentials {
66 access_key: String,
67 secret_key: String,
68 session_token: String,
69 #[serde(alias = "expires_at", default)]
70 expiration: Option<String>,
71}
72
73#[derive(Debug, Deserialize)]
74struct RefreshCredentialsResponse {
75 upload_credentials: RawCredentials,
76}
77
78#[derive(Debug, Clone)]
84pub struct RunContext {
85 pub run_id: String,
86 pub upload_uri_prefix: String,
87 pub credentials: UploadCredentials,
88}
89
90impl RunContext {
91 pub fn creds_expiring_soon(&self) -> bool {
94 match parse_iso8601_secs(&self.credentials.expires_at) {
95 Some(expires_at_secs) => {
96 let now = std::time::SystemTime::now()
97 .duration_since(std::time::UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_secs();
100 expires_at_secs.saturating_sub(now) < 300
101 }
102 None => {
103 true
105 }
106 }
107 }
108}
109
110fn parse_iso8601_secs(s: &str) -> Option<u64> {
113 let s = s.trim_end_matches('Z');
115 let s = s.trim_end_matches("+00:00");
116 let nums: Vec<u64> = s
117 .split(|c: char| !c.is_ascii_digit())
118 .filter(|p| !p.is_empty())
119 .map(|p| p.parse().ok())
120 .collect::<Option<Vec<_>>>()?;
121 if nums.len() < 6 {
122 return None;
123 }
124 let (y, mo, d, h, mi, sec) = (nums[0], nums[1], nums[2], nums[3], nums[4], nums[5]);
125 let days = days_since_epoch(y, mo, d)?;
127 Some(days * 86400 + h * 3600 + mi * 60 + sec)
128}
129
130fn unix_secs_to_iso8601(secs: u64) -> String {
133 let tod = secs % 86400;
134 let mut days = secs / 86400;
135 let hh = tod / 3600;
136 let mm = (tod % 3600) / 60;
137 let ss = tod % 60;
138
139 let mut year = 1970u64;
140 loop {
141 let is_leap =
142 year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
143 let yd = if is_leap { 366u64 } else { 365u64 };
144 if days < yd {
145 break;
146 }
147 days -= yd;
148 year += 1;
149 }
150 let is_leap = year.is_multiple_of(4) && !year.is_multiple_of(100) || year.is_multiple_of(400);
151 const MDAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
152 let mut month = 1u64;
153 loop {
154 let dim = if month == 2 && is_leap {
155 29u64
156 } else {
157 MDAYS[(month - 1) as usize]
158 };
159 if days < dim {
160 break;
161 }
162 days -= dim;
163 month += 1;
164 }
165 let day = days + 1;
166 format!("{year:04}-{month:02}-{day:02}T{hh:02}:{mm:02}:{ss:02}Z")
167}
168
169fn now_iso8601() -> String {
170 let secs = std::time::SystemTime::now()
171 .duration_since(std::time::UNIX_EPOCH)
172 .unwrap_or_default()
173 .as_secs();
174 unix_secs_to_iso8601(secs)
175}
176
177fn days_since_epoch(y: u64, mo: u64, d: u64) -> Option<u64> {
179 if !(1..=12).contains(&mo) || !(1..=31).contains(&d) || y < 1970 {
180 return None;
181 }
182 const DAYS: [u64; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
184 let is_leap = y.is_multiple_of(4) && !y.is_multiple_of(100) || y.is_multiple_of(400);
185 let year_days: u64 = (1970..y)
186 .map(|yr| {
187 if (yr % 4 == 0 && yr % 100 != 0) || yr % 400 == 0 {
188 366
189 } else {
190 365
191 }
192 })
193 .sum();
194 let month_days: u64 = (1..mo)
195 .map(|m| {
196 if m == 2 && is_leap {
197 29
198 } else {
199 DAYS[(m - 1) as usize]
200 }
201 })
202 .sum();
203 Some(year_days + month_days + d - 1)
204}
205
206#[derive(Debug, Serialize)]
213struct StartRunRequest<'a> {
214 #[serde(flatten)]
215 metadata: MetadataPayload<'a>,
216 #[serde(flatten)]
217 host: &'a HostInfo,
218 #[serde(flatten)]
219 cloud: &'a CloudInfo,
220}
221
222#[derive(Debug, Serialize)]
223struct MetadataPayload<'a> {
224 #[serde(skip_serializing_if = "Option::is_none")]
225 job_name: Option<&'a str>,
226 #[serde(skip_serializing_if = "Option::is_none")]
227 project_name: Option<&'a str>,
228 #[serde(skip_serializing_if = "Option::is_none")]
229 stage_name: Option<&'a str>,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 task_name: Option<&'a str>,
232 #[serde(skip_serializing_if = "Option::is_none")]
233 team: Option<&'a str>,
234 #[serde(skip_serializing_if = "Option::is_none")]
235 env: Option<&'a str>,
236 #[serde(skip_serializing_if = "Option::is_none")]
237 language: Option<&'a str>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 orchestrator: Option<&'a str>,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 executor: Option<&'a str>,
242 #[serde(skip_serializing_if = "Option::is_none")]
243 external_run_id: Option<&'a str>,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 container_image: Option<&'a str>,
246 #[serde(skip_serializing_if = "slice_is_empty")]
247 tags: &'a [String],
248 #[serde(skip_serializing_if = "Option::is_none")]
255 command: Option<String>,
256}
257
258#[derive(Debug, Serialize)]
267struct CloseRunInlineRequest {
268 #[serde(skip_serializing_if = "Option::is_none")]
269 exit_code: Option<i32>,
270 run_status: &'static str,
271 #[serde(skip_serializing_if = "Option::is_none")]
273 finished_at: Option<String>,
274 data_source: &'static str,
276 data_csv: String,
278}
279
280#[derive(Debug, Serialize)]
285struct CloseRunS3Request {
286 #[serde(skip_serializing_if = "Option::is_none")]
287 exit_code: Option<i32>,
288 run_status: &'static str,
289 #[serde(skip_serializing_if = "Option::is_none")]
291 finished_at: Option<String>,
292 data_source: &'static str,
294 data_uris: Vec<String>,
296}
297
298pub fn start_run(
307 agent: &ureq::Agent,
308 api_base: &str,
309 token: &str,
310 metadata: &JobMetadata,
311 pid: Option<i32>,
312 host: &HostInfo,
313 cloud: &CloudInfo,
314) -> Result<RunContext, String> {
315 let command_json: Option<String> = if metadata.command.is_empty() {
318 None
319 } else {
320 serde_json::to_string(&metadata.command).ok()
321 };
322
323 let _ = pid;
325
326 let payload = StartRunRequest {
327 metadata: MetadataPayload {
328 job_name: metadata.job_name.as_deref(),
329 project_name: metadata.project_name.as_deref(),
330 stage_name: metadata.stage_name.as_deref(),
331 task_name: metadata.task_name.as_deref(),
332 team: metadata.team.as_deref(),
333 env: metadata.env.as_deref(),
334 language: metadata.language.as_deref(),
335 orchestrator: metadata.orchestrator.as_deref(),
336 executor: metadata.executor.as_deref(),
337 external_run_id: metadata.external_run_id.as_deref(),
338 container_image: metadata.container_image.as_deref(),
339 tags: &metadata.tags,
340 command: command_json,
341 },
342 host,
343 cloud,
344 };
345
346 let url = format!("{api_base}/runs");
347 let body = serde_json::to_string(&payload)
348 .map_err(|e| format!("failed to serialize start_run payload: {e}"))?;
349
350 let mut response = agent
351 .post(&url)
352 .header("Authorization", &format!("Bearer {token}"))
353 .header("Content-Type", "application/json")
354 .send(body.as_bytes())
355 .map_err(|e| format!("start_run POST failed: {e}"))?;
356
357 let text = response
358 .body_mut()
359 .read_to_string()
360 .map_err(|e| format!("start_run read body failed: {e}"))?;
361
362 let resp: StartRunResponse = serde_json::from_str(&text).map_err(|e| {
363 format!(
364 "start_run parse response failed: {e} ({} bytes)",
365 text.len()
366 )
367 })?;
368
369 Ok(RunContext {
370 run_id: resp.run_id,
371 upload_uri_prefix: resp.upload_uri_prefix,
372 credentials: UploadCredentials {
373 access_key_id: resp.upload_credentials.access_key,
374 secret_access_key: resp.upload_credentials.secret_key,
375 session_token: resp.upload_credentials.session_token,
376 expires_at: resp
377 .upload_credentials
378 .expiration
379 .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
380 },
381 })
382}
383
384pub fn refresh_credentials(
388 agent: &ureq::Agent,
389 api_base: &str,
390 token: &str,
391 ctx: &mut RunContext,
392) -> Result<(), String> {
393 let url = format!("{api_base}/runs/{}/refresh-credentials", ctx.run_id);
394 let mut response = agent
395 .post(&url)
396 .header("Authorization", &format!("Bearer {token}"))
397 .send(b"" as &[u8])
398 .map_err(|e| format!("credential refresh POST failed: {e}"))?;
399
400 let text = response
401 .body_mut()
402 .read_to_string()
403 .map_err(|e| format!("credential refresh read body failed: {e}"))?;
404
405 let resp: RefreshCredentialsResponse = serde_json::from_str(&text).map_err(|e| {
406 format!(
407 "credential refresh parse failed: {e} ({} bytes)",
408 text.len()
409 )
410 })?;
411
412 ctx.credentials = UploadCredentials {
413 access_key_id: resp.upload_credentials.access_key,
414 secret_access_key: resp.upload_credentials.secret_key,
415 session_token: resp.upload_credentials.session_token,
416 expires_at: resp
417 .upload_credentials
418 .expiration
419 .unwrap_or_else(|| "2099-01-01T00:00:00Z".to_string()),
420 };
421 Ok(())
422}
423
424pub fn close_run(
437 agent: &ureq::Agent,
438 api_base: &str,
439 token: &str,
440 ctx: &RunContext,
441 exit_code: Option<i32>,
442 remaining_csv: Option<String>,
443 uploaded_uris: &[String],
444) -> Result<(), String> {
445 let run_status = match exit_code {
447 Some(0) | None => "finished",
448 Some(_) => "failed",
449 };
450 let finished_at = Some(now_iso8601());
451
452 let url = format!("{api_base}/runs/{}/finish", ctx.run_id);
453 let body = if uploaded_uris.is_empty() {
454 let payload = CloseRunInlineRequest {
457 exit_code,
458 run_status,
459 finished_at,
460 data_source: "inline",
461 data_csv: remaining_csv.unwrap_or_default(),
462 };
463 serde_json::to_string(&payload)
464 .map_err(|e| format!("failed to serialize close_run inline payload: {e}"))?
465 } else {
466 let payload = CloseRunS3Request {
468 exit_code,
469 run_status,
470 finished_at,
471 data_source: "s3",
472 data_uris: uploaded_uris.to_vec(),
473 };
474 serde_json::to_string(&payload)
475 .map_err(|e| format!("failed to serialize close_run s3 payload: {e}"))?
476 };
477
478 let response = agent
483 .post(&url)
484 .header("Authorization", &format!("Bearer {token}"))
485 .header("Content-Type", "application/json")
486 .send(body.as_bytes())
487 .map_err(|e| format!("close_run POST failed: {e}"))?;
488
489 let status = response.status();
490 if status != 200 {
491 return Err(format!("close_run received HTTP {status}: expected 200"));
492 }
493
494 Ok(())
495}
496
497#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn test_creds_expiring_soon_far_future() {
507 let ctx = RunContext {
508 run_id: "test".to_string(),
509 upload_uri_prefix: "s3://b/p".to_string(),
510 credentials: UploadCredentials {
511 access_key_id: "k".to_string(),
512 secret_access_key: "s".to_string(),
513 session_token: "t".to_string(),
514 expires_at: "2099-01-01T00:00:00Z".to_string(),
515 },
516 };
517 assert!(!ctx.creds_expiring_soon());
518 }
519
520 #[test]
521 fn test_creds_expiring_soon_past() {
522 let ctx = RunContext {
523 run_id: "test".to_string(),
524 upload_uri_prefix: "s3://b/p".to_string(),
525 credentials: UploadCredentials {
526 access_key_id: "k".to_string(),
527 secret_access_key: "s".to_string(),
528 session_token: "t".to_string(),
529 expires_at: "1970-01-01T00:00:00Z".to_string(),
530 },
531 };
532 assert!(ctx.creds_expiring_soon());
533 }
534
535 #[test]
536 fn test_creds_expiring_soon_unparseable() {
537 let ctx = RunContext {
538 run_id: "test".to_string(),
539 upload_uri_prefix: "s3://b/p".to_string(),
540 credentials: UploadCredentials {
541 access_key_id: "k".to_string(),
542 secret_access_key: "s".to_string(),
543 session_token: "t".to_string(),
544 expires_at: "not-a-date".to_string(),
545 },
546 };
547 assert!(ctx.creds_expiring_soon());
549 }
550
551 #[test]
552 fn test_days_since_epoch_known_dates() {
553 assert_eq!(days_since_epoch(1970, 1, 1), Some(0));
554 assert_eq!(days_since_epoch(1970, 1, 2), Some(1));
555 assert_eq!(days_since_epoch(2026, 4, 1), Some(20544));
556 }
557
558 #[test]
559 fn test_parse_iso8601_secs_known() {
560 assert_eq!(
562 parse_iso8601_secs("2026-04-01T00:00:00Z"),
563 Some(20544 * 86400)
564 );
565 }
566
567 #[test]
571 fn test_close_run_posts_to_finish_endpoint() {
572 use std::io::{Read as _, Write as _};
574 use std::sync::mpsc;
575 use std::time::Duration;
576
577 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
578 let port = listener.local_addr().unwrap().port();
579 let (tx, rx) = mpsc::channel::<Vec<u8>>();
580
581 std::thread::spawn(move || {
582 if let Ok((mut stream, _)) = listener.accept() {
583 let mut buf = Vec::<u8>::new();
585 let mut tmp = [0u8; 4096];
586 loop {
587 let n = stream.read(&mut tmp).unwrap_or(0);
588 if n == 0 {
589 break;
590 }
591 buf.extend_from_slice(&tmp[..n]);
592 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
594 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
595 let cl = header_str
596 .lines()
597 .find_map(|l| {
598 l.trim()
599 .strip_prefix("content-length:")
600 .and_then(|v| v.trim().parse::<usize>().ok())
601 })
602 .unwrap_or(0);
603 if buf.len() >= sep + 4 + cl {
604 break;
605 }
606 }
607 }
608 tx.send(buf).ok();
609 stream
610 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n{}")
611 .ok();
612 }
613 });
614
615 let agent = ureq::config::Config::builder()
616 .timeout_global(Some(Duration::from_secs(30)))
617 .build()
618 .new_agent();
619
620 let ctx = RunContext {
621 run_id: "run-abc-999".to_string(),
622 upload_uri_prefix: "s3://b/p".to_string(),
623 credentials: UploadCredentials {
624 access_key_id: "k".to_string(),
625 secret_access_key: "s".to_string(),
626 session_token: "t".to_string(),
627 expires_at: "2099-01-01T00:00:00Z".to_string(),
628 },
629 };
630
631 let result = close_run(
632 &agent,
633 &format!("http://127.0.0.1:{port}"),
634 "test-token",
635 &ctx,
636 Some(0),
637 Some("header\nrow1\n".to_string()),
638 &[], );
640 assert!(result.is_ok(), "close_run failed: {result:?}");
641
642 let raw = rx.recv().expect("mock server did not receive request");
643 let raw_str = String::from_utf8_lossy(&raw);
644
645 assert!(
647 raw_str.contains("/runs/run-abc-999/finish"),
648 "URL must include /runs/{{run_id}}/finish: {raw_str}"
649 );
650 assert!(
651 !raw_str.contains("\"run_id\""),
652 "run_id must not appear in the JSON body: {raw_str}"
653 );
654
655 assert!(
657 raw_str.contains("\"data_source\":\"inline\""),
658 "expected data_source=inline in body: {raw_str}"
659 );
660 assert!(
661 !raw_str.contains("\"s3\""),
662 "s3 must not appear in body: {raw_str}"
663 );
664
665 assert!(
667 raw_str.contains("\"data_csv\""),
668 "data_csv absent from body: {raw_str}"
669 );
670 assert!(
671 raw_str.contains("header"),
672 "data_csv must contain raw CSV content (not base64): {raw_str}"
673 );
674
675 assert!(
677 raw_str.contains("\"finished_at\""),
678 "finished_at absent from body: {raw_str}"
679 );
680
681 assert!(
683 raw_str.contains("\"run_status\":\"finished\""),
684 "run_status absent or wrong: {raw_str}"
685 );
686 assert!(
687 raw_str.contains("\"exit_code\":0"),
688 "exit_code absent or wrong: {raw_str}"
689 );
690 }
691
692 #[test]
694 fn test_close_run_request_omits_run_id() {
695 let req = CloseRunInlineRequest {
696 exit_code: Some(0),
697 run_status: "finished",
698 finished_at: Some("2026-04-03T12:00:00Z".to_string()),
699 data_source: "inline",
700 data_csv: "timestamp,cpu\n1000,42\n".to_string(),
701 };
702 let json = serde_json::to_string(&req).expect("serialize failed");
703 assert!(
704 !json.contains("\"run_id\""),
705 "run_id must not appear in close_run body (it is in the URL): {json}"
706 );
707 }
708
709 #[test]
711 fn test_close_run_data_source_inline() {
712 let raw_csv = "timestamp,cpu\n1000,42\n";
713 let req = CloseRunInlineRequest {
714 exit_code: Some(0),
715 run_status: "finished",
716 finished_at: Some("2026-04-03T12:00:00Z".to_string()),
717 data_source: "inline",
718 data_csv: raw_csv.to_string(),
719 };
720 let json = serde_json::to_string(&req).expect("serialize failed");
721 assert!(
722 json.contains("\"data_source\":\"inline\""),
723 "data_source is not 'inline': {json}"
724 );
725 assert!(
727 json.contains("timestamp"),
728 "data_csv must contain raw CSV content (not base64): {json}"
729 );
730 assert!(
731 json.contains("\"finished_at\":\"2026-04-03T12:00:00Z\""),
732 "finished_at absent or wrong: {json}"
733 );
734 }
735
736 #[test]
738 fn test_base64_encode_rfc4648_vectors() {
739 assert_eq!(base64_encode(b""), "");
740 assert_eq!(base64_encode(b"f"), "Zg==");
741 assert_eq!(base64_encode(b"fo"), "Zm8=");
742 assert_eq!(base64_encode(b"foo"), "Zm9v");
743 assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
744 assert_eq!(base64_encode(b"fooba"), "Zm9vYmE=");
745 assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
746 }
747
748 #[test]
750 fn test_base64_encode_csv_roundtrip() {
751 let csv = "timestamp,value\n1000,42\n1001,99\n";
752 let encoded = base64_encode(csv.as_bytes());
753 encoded.chars().for_each(|c| {
755 assert!(
756 c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=',
757 "invalid base64 char '{c}' in: {encoded}"
758 );
759 });
760 }
761
762 #[test]
764 fn test_days_since_epoch_invalid_inputs() {
765 assert_eq!(days_since_epoch(1970, 0, 1), None, "month 0 is invalid");
766 assert_eq!(days_since_epoch(1970, 13, 1), None, "month 13 is invalid");
767 assert_eq!(days_since_epoch(1970, 1, 0), None, "day 0 is invalid");
768 assert_eq!(days_since_epoch(1970, 1, 32), None, "day 32 is invalid");
769 assert_eq!(
770 days_since_epoch(1969, 12, 31),
771 None,
772 "year before 1970 is invalid"
773 );
774 }
775
776 #[test]
778 fn test_parse_iso8601_secs_with_utc_offset() {
779 let with_z = parse_iso8601_secs("2026-04-01T00:00:00Z");
780 let with_plus = parse_iso8601_secs("2026-04-01T00:00:00+00:00");
781 assert_eq!(
782 with_z, with_plus,
783 "+00:00 and Z must parse to the same timestamp"
784 );
785 }
786
787 #[test]
789 fn test_parse_iso8601_secs_too_few_components() {
790 assert_eq!(
791 parse_iso8601_secs("2026-04-01"),
792 None,
793 "date-only string must return None"
794 );
795 assert_eq!(parse_iso8601_secs("not-a-date"), None);
796 }
797
798 #[test]
800 fn test_slice_is_empty_helper() {
801 let empty: &[String] = &[];
802 let nonempty: &[String] = &["tag".to_string()];
803 assert!(slice_is_empty(&&*empty), "empty slice should return true");
804 assert!(
805 !slice_is_empty(&&*nonempty),
806 "nonempty slice should return false"
807 );
808 }
809
810 #[test]
812 fn test_unix_secs_to_iso8601_known_values() {
813 assert_eq!(unix_secs_to_iso8601(0), "1970-01-01T00:00:00Z");
815 assert_eq!(unix_secs_to_iso8601(946684800), "2000-01-01T00:00:00Z");
817 let secs = parse_iso8601_secs("2026-04-03T15:30:45Z").expect("parse failed");
820 assert_eq!(unix_secs_to_iso8601(secs), "2026-04-03T15:30:45Z");
821 }
822
823 #[test]
826 fn test_unix_secs_to_iso8601_leap_day() {
827 let secs = parse_iso8601_secs("2000-02-29T12:00:00Z").expect("parse failed");
828 assert_eq!(unix_secs_to_iso8601(secs), "2000-02-29T12:00:00Z");
829 }
830
831 #[test]
833 fn test_now_iso8601_parses() {
834 let s = now_iso8601();
835 assert!(!s.is_empty(), "now_iso8601 must not be empty");
836 assert!(s.ends_with('Z'), "now_iso8601 must end with Z: {s}");
837 let secs = parse_iso8601_secs(&s);
838 assert!(secs.is_some(), "now_iso8601 output must parse back: {s}");
839 }
840
841 #[test]
843 fn test_close_run_finished_at_omitted_when_none() {
844 let req = CloseRunInlineRequest {
845 exit_code: None,
846 run_status: "finished",
847 finished_at: None,
848 data_source: "inline",
849 data_csv: "".to_string(),
850 };
851 let json = serde_json::to_string(&req).expect("serialize failed");
852 assert!(
853 !json.contains("\"finished_at\""),
854 "finished_at must be absent when None: {json}"
855 );
856 }
857
858 fn finish_response_json(run_id: &str, run_status: &str, exit_code: Option<i32>) -> String {
871 let exit_code_field = match exit_code {
872 Some(c) => format!(",\"exit_code\":{c}"),
873 None => String::new(),
874 };
875 format!(
876 r#"{{"run":{{"run_id":"{run_id}","created_at":"2026-04-03T10:00:00Z","heartbeat_at":"2026-04-03T10:00:30Z","finished_at":"2026-04-03T10:01:00Z","run_status":"{run_status}","tag_count":0,"tags":[]{exit_code_field}}},"processing":{{"status":"ok","rows":1,"files":null,"duration_ms":5.0,"error":null}}}}"#
877 )
878 }
879
880 fn capture_close_run_body(exit_code: Option<i32>, csv: Option<&str>) -> String {
885 use std::io::{Read as _, Write as _};
886 use std::sync::mpsc;
887 use std::time::Duration;
888
889 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
890 let port = listener.local_addr().unwrap().port();
891 let (tx, rx) = mpsc::channel::<Vec<u8>>();
892
893 std::thread::spawn(move || {
894 if let Ok((mut stream, _)) = listener.accept() {
895 stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
901 let mut buf = Vec::<u8>::new();
902 let mut tmp = [0u8; 4096];
903 loop {
904 let n = stream.read(&mut tmp).unwrap_or(0);
905 if n == 0 {
906 break;
907 }
908 buf.extend_from_slice(&tmp[..n]);
909 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
910 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
911 let cl = header_str
912 .lines()
913 .find_map(|l| {
914 l.trim()
915 .strip_prefix("content-length:")
916 .and_then(|v| v.trim().parse::<usize>().ok())
917 })
918 .unwrap_or(0);
919 if buf.len() >= sep + 4 + cl {
920 break;
921 }
922 }
923 }
924 let body_start = buf
926 .windows(4)
927 .position(|w| w == b"\r\n\r\n")
928 .map(|p| p + 4)
929 .unwrap_or(buf.len());
930 let body_str = String::from_utf8_lossy(&buf[body_start..]);
931 let parsed: serde_json::Value =
932 serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
933 let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("inline")
935 && parsed.get("data_csv").is_some();
936 let (status_line, resp_body) = if valid {
937 let run_status = parsed
938 .get("run_status")
939 .and_then(|v| v.as_str())
940 .unwrap_or("finished");
941 let ec = parsed
942 .get("exit_code")
943 .and_then(|v| v.as_i64())
944 .map(|v| v as i32);
945 (
946 "HTTP/1.1 200 OK",
947 finish_response_json("run-spec-test", run_status, ec),
948 )
949 } else {
950 (
951 "HTTP/1.1 422 Unprocessable Entity",
952 r#"{"detail":[{"loc":["body"],"msg":"field required","type":"value_error.missing"}]}"#.to_string(),
953 )
954 };
955 tx.send(buf).ok();
956 let http = format!(
957 "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
958 resp_body.len(),
959 resp_body
960 );
961 stream.write_all(http.as_bytes()).ok();
962 }
963 });
964
965 let agent = ureq::config::Config::builder()
966 .timeout_global(Some(Duration::from_secs(30)))
967 .build()
968 .new_agent();
969 let ctx = RunContext {
970 run_id: "run-spec-test".to_string(),
971 upload_uri_prefix: "s3://b/p".to_string(),
972 credentials: UploadCredentials {
973 access_key_id: "k".to_string(),
974 secret_access_key: "s".to_string(),
975 session_token: "t".to_string(),
976 expires_at: "2099-01-01T00:00:00Z".to_string(),
977 },
978 };
979 let _ = close_run(
980 &agent,
981 &format!("http://127.0.0.1:{port}"),
982 "token",
983 &ctx,
984 exit_code,
985 csv.map(String::from),
986 &[], );
988 let raw = rx.recv().expect("mock server did not capture request");
989 let body_start = raw
991 .windows(4)
992 .position(|w| w == b"\r\n\r\n")
993 .map(|p| p + 4)
994 .unwrap_or(0);
995 String::from_utf8_lossy(&raw[body_start..]).to_string()
996 }
997
998 #[test]
1000 fn test_close_run_run_status_finished_for_zero_exit() {
1001 let body = capture_close_run_body(Some(0), None);
1002 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1003 assert_eq!(
1004 v["run_status"], "finished",
1005 "run_status must be 'finished' for exit_code=0: {body}"
1006 );
1007 assert_eq!(v["exit_code"], 0, "exit_code must be 0 in payload: {body}");
1008 }
1009
1010 #[test]
1012 fn test_close_run_run_status_finished_for_sigterm() {
1013 let body = capture_close_run_body(None, None);
1014 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1015 assert_eq!(
1016 v["run_status"], "finished",
1017 "run_status must be 'finished' when exit_code is None (SIGTERM): {body}"
1018 );
1019 assert!(
1021 v.get("exit_code").is_none(),
1022 "exit_code must be absent when None (spec: optional integer): {body}"
1023 );
1024 }
1025
1026 #[test]
1028 fn test_close_run_run_status_failed_for_nonzero_exit() {
1029 for code in [1, 2, 127, 130, 255] {
1030 let body = capture_close_run_body(Some(code), None);
1031 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1032 assert_eq!(
1033 v["run_status"], "failed",
1034 "run_status must be 'failed' for exit_code={code}: {body}"
1035 );
1036 }
1037 }
1038
1039 #[test]
1042 fn test_close_run_data_csv_is_raw_csv_not_base64() {
1043 let raw_csv = "timestamp,cpu_pct\n1743638400,42.5\n1743638401,44.0\n";
1044 let body = capture_close_run_body(Some(0), Some(raw_csv));
1045 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1046 let data_csv = v["data_csv"].as_str().expect("data_csv must be a string");
1047 assert!(
1048 data_csv.contains("timestamp"),
1049 "data_csv must be raw CSV (contains header row): {data_csv}"
1050 );
1051 assert!(
1052 data_csv.contains("42.5"),
1053 "data_csv must be raw CSV (contains data values): {data_csv}"
1054 );
1055 assert!(
1057 data_csv.contains(','),
1058 "data_csv must contain CSV commas (not base64): {data_csv}"
1059 );
1060 }
1061
1062 #[test]
1065 fn test_close_run_finished_at_is_valid_iso8601() {
1066 let body = capture_close_run_body(Some(0), None);
1067 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1068 let fa = v["finished_at"]
1069 .as_str()
1070 .expect("finished_at must be a string");
1071 assert!(fa.ends_with('Z'), "finished_at must end with Z (UTC): {fa}");
1072 let secs = parse_iso8601_secs(fa);
1073 assert!(
1074 secs.is_some(),
1075 "finished_at must be a parseable ISO 8601 timestamp: {fa}"
1076 );
1077 let now = std::time::SystemTime::now()
1079 .duration_since(std::time::UNIX_EPOCH)
1080 .unwrap_or_default()
1081 .as_secs();
1082 let diff = now.abs_diff(secs.unwrap());
1083 assert!(
1084 diff < 60,
1085 "finished_at must be close to current time (diff={diff}s): {fa}"
1086 );
1087 }
1088
1089 #[test]
1092 fn test_close_run_handles_valid_run_finish_response() {
1093 use std::io::{Read as _, Write as _};
1094 use std::time::Duration;
1095
1096 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1097 let port = listener.local_addr().unwrap().port();
1098
1099 let response_body = r#"{
1101 "run": {
1102 "run_id": "01959e3a-0001-0000-0000-000000000000",
1103 "created_at": "2026-04-03T10:00:00Z",
1104 "heartbeat_at": "2026-04-03T10:00:30Z",
1105 "finished_at": "2026-04-03T10:01:00Z",
1106 "run_status": "finished",
1107 "tag_count": 0,
1108 "tags": []
1109 },
1110 "processing": {
1111 "status": "ok",
1112 "rows": 60,
1113 "files": null,
1114 "duration_ms": 12.5,
1115 "error": null
1116 }
1117 }"#;
1118 let http_response = format!(
1119 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1120 response_body.len(),
1121 response_body
1122 );
1123
1124 std::thread::spawn(move || {
1125 if let Ok((mut stream, _)) = listener.accept() {
1126 let mut tmp = [0u8; 4096];
1127 let _ = stream.read(&mut tmp);
1128 stream.write_all(http_response.as_bytes()).ok();
1129 }
1130 });
1131
1132 let agent = ureq::config::Config::builder()
1133 .timeout_global(Some(Duration::from_secs(30)))
1134 .build()
1135 .new_agent();
1136 let ctx = RunContext {
1137 run_id: "01959e3a-0001-0000-0000-000000000000".to_string(),
1138 upload_uri_prefix: "s3://b/p".to_string(),
1139 credentials: UploadCredentials {
1140 access_key_id: "k".to_string(),
1141 secret_access_key: "s".to_string(),
1142 session_token: "t".to_string(),
1143 expires_at: "2099-01-01T00:00:00Z".to_string(),
1144 },
1145 };
1146 let result = close_run(
1147 &agent,
1148 &format!("http://127.0.0.1:{port}"),
1149 "test-token",
1150 &ctx,
1151 Some(0),
1152 Some("timestamp,cpu_pct\n1743638400,42.5\n".to_string()),
1153 &[], );
1155 assert!(
1156 result.is_ok(),
1157 "close_run must succeed for a 200 response: {result:?}"
1158 );
1159 }
1160
1161 #[test]
1164 fn test_close_run_no_extra_fields_in_payload() {
1165 let body = capture_close_run_body(Some(0), Some("ts,v\n1,2\n"));
1166 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1167 let obj = v.as_object().expect("payload must be a JSON object");
1168 let allowed: std::collections::HashSet<&str> = [
1169 "exit_code",
1170 "run_status",
1171 "finished_at",
1172 "data_source",
1173 "data_csv",
1174 ]
1175 .iter()
1176 .copied()
1177 .collect();
1178 for key in obj.keys() {
1179 assert!(
1180 allowed.contains(key.as_str()),
1181 "unexpected field '{key}' in payload -- not allowed by RunFinishInline schema (additionalProperties: false)"
1182 );
1183 }
1184 assert!(obj.contains_key("data_source"), "data_source is required");
1186 assert!(obj.contains_key("data_csv"), "data_csv is required");
1187 }
1188
1189 fn capture_close_run_s3_body(exit_code: Option<i32>, uris: &[&str]) -> String {
1195 use std::io::{Read as _, Write as _};
1196 use std::sync::mpsc;
1197 use std::time::Duration;
1198
1199 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1200 let port = listener.local_addr().unwrap().port();
1201 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1202
1203 std::thread::spawn(move || {
1204 if let Ok((mut stream, _)) = listener.accept() {
1205 stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
1206 let mut buf = Vec::<u8>::new();
1207 let mut tmp = [0u8; 4096];
1208 loop {
1209 let n = stream.read(&mut tmp).unwrap_or(0);
1210 if n == 0 {
1211 break;
1212 }
1213 buf.extend_from_slice(&tmp[..n]);
1214 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1215 let hdr = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1216 let cl = hdr
1217 .lines()
1218 .find_map(|l| {
1219 l.trim()
1220 .strip_prefix("content-length:")
1221 .and_then(|v| v.trim().parse::<usize>().ok())
1222 })
1223 .unwrap_or(0);
1224 if buf.len() >= sep + 4 + cl {
1225 break;
1226 }
1227 }
1228 }
1229 let body_start = buf
1231 .windows(4)
1232 .position(|w| w == b"\r\n\r\n")
1233 .map(|p| p + 4)
1234 .unwrap_or(buf.len());
1235 let body_str = String::from_utf8_lossy(&buf[body_start..]);
1236 let parsed: serde_json::Value =
1237 serde_json::from_str(&body_str).unwrap_or(serde_json::Value::Null);
1238 let valid = parsed.get("data_source").and_then(|v| v.as_str()) == Some("s3")
1239 && parsed.get("data_uris").is_some();
1240 let run_status = parsed
1241 .get("run_status")
1242 .and_then(|v| v.as_str())
1243 .unwrap_or("finished");
1244 let ec = parsed
1245 .get("exit_code")
1246 .and_then(|v| v.as_i64())
1247 .map(|v| v as i32);
1248 let (status_line, resp_body) = if valid {
1249 (
1250 "HTTP/1.1 200 OK",
1251 finish_response_json("run-s3-test", run_status, ec),
1252 )
1253 } else {
1254 (
1255 "HTTP/1.1 422 Unprocessable Entity",
1256 r#"{"detail":[{"msg":"field required","type":"value_error.missing"}]}"#
1257 .to_string(),
1258 )
1259 };
1260 tx.send(buf).ok();
1261 let http = format!(
1262 "{status_line}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1263 resp_body.len(),
1264 resp_body
1265 );
1266 stream.write_all(http.as_bytes()).ok();
1267 }
1268 });
1269
1270 let agent = ureq::config::Config::builder()
1271 .timeout_global(Some(Duration::from_secs(30)))
1272 .build()
1273 .new_agent();
1274 let ctx = RunContext {
1275 run_id: "run-s3-test".to_string(),
1276 upload_uri_prefix: "s3://b/p".to_string(),
1277 credentials: UploadCredentials {
1278 access_key_id: "k".to_string(),
1279 secret_access_key: "s".to_string(),
1280 session_token: "t".to_string(),
1281 expires_at: "2099-01-01T00:00:00Z".to_string(),
1282 },
1283 };
1284 let uploaded: Vec<String> = uris.iter().map(|s| (*s).to_string()).collect();
1285 let _ = close_run(
1286 &agent,
1287 &format!("http://127.0.0.1:{port}"),
1288 "token",
1289 &ctx,
1290 exit_code,
1291 None, &uploaded,
1293 );
1294 let raw = rx.recv().expect("mock server did not capture request");
1295 let body_start = raw
1296 .windows(4)
1297 .position(|w| w == b"\r\n\r\n")
1298 .map(|p| p + 4)
1299 .unwrap_or(0);
1300 String::from_utf8_lossy(&raw[body_start..]).to_string()
1301 }
1302
1303 #[test]
1306 fn test_close_run_uses_s3_route_when_uris_present() {
1307 let uris = &[
1308 "s3://my-bucket/prefix/run-abc/000000.csv.gz",
1309 "s3://my-bucket/prefix/run-abc/000001.csv.gz",
1310 ];
1311 let body = capture_close_run_s3_body(Some(0), uris);
1312 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1313 assert_eq!(
1314 v["data_source"], "s3",
1315 "data_source must be 's3' when URIs present: {body}"
1316 );
1317 let arr = v["data_uris"]
1318 .as_array()
1319 .expect("data_uris must be a JSON array");
1320 assert_eq!(arr.len(), 2, "data_uris must have 2 elements: {body}");
1321 assert_eq!(arr[0], "s3://my-bucket/prefix/run-abc/000000.csv.gz");
1322 assert_eq!(arr[1], "s3://my-bucket/prefix/run-abc/000001.csv.gz");
1323 assert!(
1325 !body.contains("\"data_csv\""),
1326 "data_csv must be absent in S3 route: {body}"
1327 );
1328 }
1329
1330 #[test]
1333 fn test_close_run_s3_no_extra_fields() {
1334 let uris = &["s3://bucket/prefix/run/000000.csv.gz"];
1335 let body = capture_close_run_s3_body(Some(0), uris);
1336 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1337 let obj = v.as_object().expect("payload must be a JSON object");
1338 let allowed: std::collections::HashSet<&str> = [
1339 "exit_code",
1340 "run_status",
1341 "finished_at",
1342 "data_source",
1343 "data_uris",
1344 ]
1345 .iter()
1346 .copied()
1347 .collect();
1348 for key in obj.keys() {
1349 assert!(
1350 allowed.contains(key.as_str()),
1351 "unexpected field '{key}' in S3 route payload (additionalProperties: false): {body}"
1352 );
1353 }
1354 assert!(
1355 obj.contains_key("data_source"),
1356 "data_source is required in S3 route"
1357 );
1358 assert!(
1359 obj.contains_key("data_uris"),
1360 "data_uris is required in S3 route"
1361 );
1362 }
1363
1364 #[test]
1367 fn test_close_run_uses_inline_route_when_no_uris() {
1368 let body = capture_close_run_body(Some(0), Some("ts,cpu\n1,2\n"));
1369 let v: serde_json::Value = serde_json::from_str(&body).expect("body is not JSON");
1370 assert_eq!(
1371 v["data_source"], "inline",
1372 "data_source must be 'inline' when no URIs: {body}"
1373 );
1374 assert!(
1375 v.get("data_csv").is_some(),
1376 "data_csv must be present for inline route: {body}"
1377 );
1378 assert!(
1379 v.get("data_uris").is_none(),
1380 "data_uris must be absent for inline route: {body}"
1381 );
1382 }
1383
1384 #[test]
1386 fn test_refresh_credentials_updates_context() {
1387 use std::io::{Read as _, Write as _};
1388 use std::time::Duration;
1389
1390 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1391 let port = listener.local_addr().unwrap().port();
1392
1393 let response_body = r#"{"upload_credentials":{"access_key":"NEW_AK","secret_key":"NEW_SK","session_token":"NEW_ST","expiration":"2099-06-01T00:00:00Z"}}"#;
1394 let http_response = format!(
1395 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1396 response_body.len(),
1397 response_body
1398 );
1399
1400 std::thread::spawn(move || {
1401 if let Ok((mut stream, _)) = listener.accept() {
1402 let mut tmp = [0u8; 4096];
1403 let _ = stream.read(&mut tmp);
1404 stream.write_all(http_response.as_bytes()).ok();
1405 }
1406 });
1407
1408 let agent = ureq::config::Config::builder()
1409 .timeout_global(Some(Duration::from_secs(30)))
1410 .build()
1411 .new_agent();
1412
1413 let mut ctx = RunContext {
1414 run_id: "run-123".to_string(),
1415 upload_uri_prefix: "s3://b/p".to_string(),
1416 credentials: UploadCredentials {
1417 access_key_id: "OLD_AK".to_string(),
1418 secret_access_key: "OLD_SK".to_string(),
1419 session_token: "OLD_ST".to_string(),
1420 expires_at: "2099-01-01T00:00:00Z".to_string(),
1421 },
1422 };
1423
1424 let result = refresh_credentials(
1425 &agent,
1426 &format!("http://127.0.0.1:{port}"),
1427 "test-token",
1428 &mut ctx,
1429 );
1430 assert!(
1431 result.is_ok(),
1432 "refresh_credentials failed: {:?}",
1433 result.err()
1434 );
1435 assert_eq!(ctx.credentials.access_key_id, "NEW_AK");
1436 assert_eq!(ctx.credentials.secret_access_key, "NEW_SK");
1437 assert_eq!(ctx.credentials.session_token, "NEW_ST");
1438 assert_eq!(ctx.credentials.expires_at, "2099-06-01T00:00:00Z");
1439 }
1440
1441 #[test]
1443 fn test_start_run_posts_to_runs_endpoint() {
1444 use std::io::{Read as _, Write as _};
1445 use std::sync::mpsc;
1446 use std::time::Duration;
1447
1448 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1449 let port = listener.local_addr().unwrap().port();
1450 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1451
1452 let response_body = r#"{"run_id":"run-xyz","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1454 let http_response = format!(
1455 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1456 response_body.len(),
1457 response_body
1458 );
1459
1460 std::thread::spawn(move || {
1461 if let Ok((mut stream, _)) = listener.accept() {
1462 let mut buf = Vec::<u8>::new();
1463 let mut tmp = [0u8; 4096];
1464 loop {
1465 let n = stream.read(&mut tmp).unwrap_or(0);
1466 if n == 0 {
1467 break;
1468 }
1469 buf.extend_from_slice(&tmp[..n]);
1470 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1471 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1472 let cl = header_str
1473 .lines()
1474 .find_map(|l| {
1475 l.trim()
1476 .strip_prefix("content-length:")
1477 .and_then(|v| v.trim().parse::<usize>().ok())
1478 })
1479 .unwrap_or(0);
1480 if buf.len() >= sep + 4 + cl {
1481 break;
1482 }
1483 }
1484 }
1485 tx.send(buf).ok();
1486 stream.write_all(http_response.as_bytes()).ok();
1487 }
1488 });
1489
1490 let agent = ureq::config::Config::builder()
1491 .timeout_global(Some(Duration::from_secs(30)))
1492 .build()
1493 .new_agent();
1494
1495 let meta = crate::config::JobMetadata {
1496 job_name: Some("test-job".to_string()),
1497 ..Default::default()
1498 };
1499 let host = crate::metrics::HostInfo::default();
1500 let cloud = crate::metrics::CloudInfo::default();
1501
1502 let result = start_run(
1503 &agent,
1504 &format!("http://127.0.0.1:{port}"),
1505 "test-token",
1506 &meta,
1507 Some(42),
1508 &host,
1509 &cloud,
1510 );
1511 assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1512
1513 let ctx = result.unwrap();
1514 assert_eq!(ctx.run_id, "run-xyz");
1515 assert_eq!(ctx.upload_uri_prefix, "s3://b/p");
1516 assert_eq!(ctx.credentials.access_key_id, "AK");
1517
1518 let raw = rx.recv().expect("mock server did not receive request");
1519 let raw_str = String::from_utf8_lossy(&raw);
1520 assert!(
1521 raw_str.contains("POST /runs"),
1522 "must POST to /runs: {raw_str}"
1523 );
1524 assert!(
1525 raw_str.contains("\"job_name\":\"test-job\""),
1526 "job_name missing: {raw_str}"
1527 );
1528 assert!(
1530 !raw_str.contains("\"pid\""),
1531 "pid must not appear in the payload: {raw_str}"
1532 );
1533 }
1534
1535 #[test]
1540 fn test_start_run_includes_command_array_in_payload() {
1541 use std::io::{Read as _, Write as _};
1542 use std::sync::mpsc;
1543 use std::time::Duration;
1544
1545 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1546 let port = listener.local_addr().unwrap().port();
1547 let (tx, rx) = mpsc::channel::<Vec<u8>>();
1548
1549 let response_body = r#"{"run_id":"r1","upload_uri_prefix":"s3://b/p","upload_credentials":{"access_key":"AK","secret_key":"SK","session_token":"ST","expiration":"2099-01-01T00:00:00Z"}}"#;
1550 let http_response = format!(
1551 "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
1552 response_body.len(),
1553 response_body
1554 );
1555
1556 std::thread::spawn(move || {
1557 if let Ok((mut stream, _)) = listener.accept() {
1558 let mut buf = Vec::<u8>::new();
1559 let mut tmp = [0u8; 4096];
1560 loop {
1561 let n = stream.read(&mut tmp).unwrap_or(0);
1562 if n == 0 {
1563 break;
1564 }
1565 buf.extend_from_slice(&tmp[..n]);
1566 if let Some(sep) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
1567 let header_str = String::from_utf8_lossy(&buf[..sep]).to_ascii_lowercase();
1568 let cl = header_str
1569 .lines()
1570 .find_map(|l| {
1571 l.trim()
1572 .strip_prefix("content-length:")
1573 .and_then(|v| v.trim().parse::<usize>().ok())
1574 })
1575 .unwrap_or(0);
1576 if buf.len() >= sep + 4 + cl {
1577 break;
1578 }
1579 }
1580 }
1581 tx.send(buf).ok();
1582 stream.write_all(http_response.as_bytes()).ok();
1583 }
1584 });
1585
1586 let agent = ureq::config::Config::builder()
1587 .timeout_global(Some(Duration::from_secs(30)))
1588 .build()
1589 .new_agent();
1590
1591 let wrapped_command: Vec<String> = vec![
1594 "stress",
1595 "--cpu",
1596 "4",
1597 "--vm",
1598 "1",
1599 "--vm-bytes",
1600 "12024M",
1601 "--timeout",
1602 "63s",
1603 ]
1604 .into_iter()
1605 .map(String::from)
1606 .collect();
1607
1608 let meta = crate::config::JobMetadata {
1609 command: wrapped_command.clone(),
1610 ..Default::default()
1611 };
1612 let host = crate::metrics::HostInfo::default();
1613 let cloud = crate::metrics::CloudInfo::default();
1614
1615 let result = start_run(
1616 &agent,
1617 &format!("http://127.0.0.1:{port}"),
1618 "test-token",
1619 &meta,
1620 None,
1621 &host,
1622 &cloud,
1623 );
1624 assert!(result.is_ok(), "start_run failed: {:?}", result.err());
1625
1626 let raw = rx.recv().expect("mock server did not receive request");
1627 let raw_str = String::from_utf8_lossy(&raw);
1628
1629 let expected = r#""command":"[\"stress\",\"--cpu\",\"4\",\"--vm\",\"1\",\"--vm-bytes\",\"12024M\",\"--timeout\",\"63s\"]""#;
1633 assert!(
1634 raw_str.contains(expected),
1635 "command JSON string not found in payload.\nExpected: {expected}\nGot body: {raw_str}"
1636 );
1637 }
1638
1639 #[test]
1642 fn test_start_run_omits_command_when_standalone() {
1643 let req_payload = MetadataPayload {
1644 job_name: None,
1645 project_name: None,
1646 stage_name: None,
1647 task_name: None,
1648 team: None,
1649 env: None,
1650 language: None,
1651 orchestrator: None,
1652 executor: None,
1653 external_run_id: None,
1654 container_image: None,
1655 tags: &[],
1656 command: None, };
1658 let json = serde_json::to_string(&req_payload).expect("serialize failed");
1659 assert!(
1660 !json.contains("\"command\""),
1661 "command must be absent from payload in standalone mode: {json}"
1662 );
1663 }
1664
1665 #[test]
1683 fn test_real_api_finish_run_returns_ok() {
1684 use crate::config::JobMetadata;
1685 use crate::metrics::{CloudInfo, CpuMetrics, HostInfo, MemoryMetrics, Sample};
1686 use crate::sentinel::upload::samples_to_csv;
1687 use std::time::Duration;
1688
1689 let token = match std::env::var("SENTINEL_API_TOKEN") {
1690 Ok(t) if !t.is_empty() => t,
1691 _ => {
1692 eprintln!("skip: SENTINEL_API_TOKEN not set or empty");
1693 return;
1694 }
1695 };
1696 let api_base = std::env::var("SENTINEL_API_BASE")
1697 .unwrap_or_else(|_| "https://api.sentinel.sparecores.net".to_string());
1698 eprintln!("T-INT-01: using api_base={api_base}");
1699
1700 let agent = ureq::config::Config::builder()
1701 .timeout_global(Some(Duration::from_secs(30)))
1702 .build()
1703 .new_agent();
1704
1705 let metadata = JobMetadata {
1706 job_name: Some("integration-test-close-run".to_string()),
1707 ..Default::default()
1708 };
1709 let host = HostInfo::default();
1710 let cloud = CloudInfo::default();
1711
1712 eprintln!("T-INT-01: calling start_run...");
1714 let ctx = match start_run(&agent, &api_base, &token, &metadata, None, &host, &cloud) {
1715 Ok(c) => {
1716 eprintln!("T-INT-01: start_run ok -- run_id={}", c.run_id);
1717 c
1718 }
1719 Err(e) => panic!("start_run failed: {e}"),
1720 };
1721 assert!(!ctx.run_id.is_empty(), "run_id must be non-empty");
1722
1723 let timestamp_secs = std::time::SystemTime::now()
1726 .duration_since(std::time::UNIX_EPOCH)
1727 .unwrap_or_default()
1728 .as_secs();
1729 let sample = Sample {
1730 timestamp_secs,
1731 job_name: Some("integration-test-close-run".to_string()),
1732 tracked_pid: None,
1733 cpu: CpuMetrics::default(),
1734 memory: MemoryMetrics::default(),
1735 network: vec![],
1736 disk: vec![],
1737 gpu: vec![],
1738 };
1739 let csv = samples_to_csv(&[sample], 1);
1740 eprintln!(
1741 "T-INT-01: csv preview (first 120 chars): {}",
1742 &csv[..csv.len().min(120)]
1743 );
1744
1745 eprintln!("T-INT-01: calling close_run...");
1747 let result = close_run(
1748 &agent,
1749 &api_base,
1750 &token,
1751 &ctx,
1752 Some(0),
1753 Some(csv),
1754 &[], );
1756 match &result {
1757 Ok(()) => eprintln!("T-INT-01: close_run ok -- 200 received"),
1758 Err(e) => eprintln!("T-INT-01: close_run FAILED: {e}"),
1759 }
1760 assert!(
1761 result.is_ok(),
1762 "close_run must return Ok (200) against the real API: {result:?}"
1763 );
1764 }
1765}