Skip to main content

resource_tracker/collector/clouds/
mod.rs

1use crate::metrics::CloudInfo;
2use std::time::Duration;
3use ureq::config::Config as UreqConfig;
4
5mod alicloud;
6mod aws;
7mod azure;
8mod gcp;
9mod hetzner;
10mod ovh;
11mod upcloud;
12mod vultr;
13
14// ---------------------------------------------------------------------------
15// Shared IMDS helpers (available to all cloud submodules via `super::`)
16// ---------------------------------------------------------------------------
17
18/// Upper bound for each HTTP call made by a vendor probe.
19const IMDS_TIMEOUT: Duration = Duration::from_secs(1);
20
21fn new_imds_agent() -> ureq::Agent {
22    // Per-phase timeouts instead of timeout_global: timeout_global's thread-
23    // creation behavior is an undocumented ureq internal that could change
24    // across versions. Per-phase timeouts are a safer, more explicit contract:
25    // each phase (connect, recv_response) is bounded independently with no
26    // reliance on ureq internals. T-UREQ-01 verifies no extra threads are
27    // spawned per request (a regression guard against ureq version bumps).
28    UreqConfig::builder()
29        .timeout_connect(Some(IMDS_TIMEOUT))
30        .timeout_recv_response(Some(IMDS_TIMEOUT))
31        .build()
32        .new_agent()
33}
34
35fn imds_get(agent: &ureq::Agent, url: &str) -> Option<String> {
36    imds_get_headers(agent, url, &[])
37}
38
39fn imds_get_headers(agent: &ureq::Agent, url: &str, headers: &[(&str, &str)]) -> Option<String> {
40    let mut req = agent.get(url);
41    for (k, v) in headers {
42        req = req.header(*k, *v);
43    }
44    req.call()
45        .ok()
46        .and_then(|mut r| r.body_mut().read_to_string().ok())
47        .map(|s| s.trim().to_string())
48        .filter(|s| !s.is_empty())
49}
50
51// ---------------------------------------------------------------------------
52// Probe orchestration
53// ---------------------------------------------------------------------------
54
55/// Precedence order: AWS → GCP → Azure → Hetzner → UpCloud → Vultr → AliCloud → OVH.
56/// To add a new cloud: implement `pub fn probe() -> Option<CloudInfo>` in a new
57/// submodule, declare it above, and append it here.
58const PROBES: &[fn() -> Option<CloudInfo>] = &[
59    aws::probe,
60    gcp::probe,
61    azure::probe,
62    hetzner::probe,
63    upcloud::probe,
64    vultr::probe,
65    alicloud::probe,
66    ovh::probe,
67];
68
69/// Run vendor probes; parallel when threads are available, serial fallback otherwise.
70///
71/// Join order follows the `PROBES` precedence list: the first successful probe
72/// wins. Each HTTP call uses [`IMDS_TIMEOUT`]. Per-vendor threads are only used
73/// when [`crate::thread_util::spawn_named`] succeeds so EAGAIN under tight PID
74/// limits falls back to sequential probes on the caller thread.
75pub fn probe_cloud() -> CloudInfo {
76    let mut handles = Vec::new();
77    let mut deferred = Vec::new();
78
79    for &p in PROBES {
80        match crate::thread_util::spawn_named("cloud-probe", p) {
81            Some(h) => handles.push(h),
82            None => deferred.push(p),
83        }
84    }
85
86    for handle in handles {
87        if let Ok(Some(info)) = handle.join() {
88            return info;
89        }
90    }
91    for p in deferred {
92        if let Some(info) = p() {
93            return info;
94        }
95    }
96    CloudInfo::default()
97}
98
99/// Spawn a background thread that probes cloud IMDS endpoints.
100///
101/// Call this **before** the warm-up sleep so probes run **in parallel** with the
102/// main thread's warm-up (stateful collector priming + one `interval` sleep).
103/// Poll the returned [`Receiver`] with `try_recv()` after warm-up; if probes
104/// finished during the sleep the result is waiting immediately.
105///
106/// Returns `None` when no thread could be created (EAGAIN under PID limits); the
107/// caller should treat cloud info as permanently unavailable in that case.
108///
109/// [`Receiver`]: std::sync::mpsc::Receiver
110pub fn spawn_cloud_discovery() -> Option<std::sync::mpsc::Receiver<CloudInfo>> {
111    let (tx, rx) = std::sync::mpsc::channel();
112    // Drop the JoinHandle: the thread detaches and sends its result via tx.
113    // If spawn fails (EAGAIN under PID limits), `?` returns None to the caller.
114    crate::thread_util::spawn_named("cloud-discovery", move || {
115        let _ = tx.send(probe_cloud());
116    })?;
117    Some(rx)
118}
119
120// ---------------------------------------------------------------------------
121// Tests
122// ---------------------------------------------------------------------------
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    // T-CLOUD-01: spawn_cloud_discovery resolves without panic.
129    // Each vendor's HTTP calls use IMDS_TIMEOUT; all vendor probes run in parallel.
130    #[test]
131    fn test_spawn_cloud_discovery_joins_without_panic() {
132        let cloud = match spawn_cloud_discovery() {
133            Some(rx) => rx.recv().unwrap_or_default(),
134            None => probe_cloud(),
135        };
136        let _cloud = cloud;
137        // Result may be default (no cloud) or populated (running on a cloud VM).
138        // Either outcome is valid; the test only checks for no panic.
139    }
140
141    // -----------------------------------------------------------------------
142    // T-UREQ-01: ureq thread-count regression guard
143    // -----------------------------------------------------------------------
144    //
145    // Verifies that requests made with new_imds_agent() (per-phase timeouts)
146    // do not spawn extra threads beyond the request thread itself. Guards
147    // against ureq version bumps that increase per-request thread creation,
148    // consuming PID budget under tight pids.max limits.
149    //
150    // Linux-only: thread count is read from /proc/self/status.
151    // A slow mock server (200 ms delay) holds the connection open so any
152    // short-lived helper thread is alive when we sample mid-request.
153    //
154    // The URL uses 127.0.0.1 (numeric IP), NOT "localhost". Rust's
155    // ToSocketAddrs resolves numeric IPs via IpAddr::from_str without calling
156    // getaddrinfo -- no NSS stack, no systemd-resolved IPC, no resolver
157    // plugin on any platform. Using "localhost" caused GitHub Actions CI
158    // runners (both x86_64 and aarch64) to spawn a systemd-resolved NSS
159    // worker thread (Ubuntu 24.04 default: hosts: files resolve dns), inflating
160    // the count to baseline+2 and producing false CI failures. Local machines
161    // with hosts: files dns (no resolve plugin) did not reproduce this.
162    // Using 127.0.0.1 also matches production: all IMDS endpoints are
163    // link-local IPs (169.254.169.254), never hostnames.
164    //
165    // Expected on all Linux configurations: during = baseline + 1.
166    //
167    // The panic message includes /proc/self/task/*/comm thread names so any
168    // future CI failure is self-diagnosing without a second investigation round.
169    //
170    // NOTE: A timeout_global negative control is not practical: ureq 3.x does
171    // not spawn extra threads for timeout_global on HTTP connections to IPs.
172
173    /// Read the `Threads:` field from /proc/self/status.
174    #[cfg(target_os = "linux")]
175    fn thread_count() -> usize {
176        std::fs::read_to_string("/proc/self/status")
177            .unwrap_or_default()
178            .lines()
179            .find(|l| l.starts_with("Threads:"))
180            .and_then(|l| l.split_whitespace().nth(1))
181            .and_then(|n| n.parse().ok())
182            .unwrap_or(0)
183    }
184
185    /// Bind a TCP listener on 127.0.0.1:0, spawn a thread that accepts one
186    /// connection, sleeps `delay`, then sends a minimal HTTP 200 response.
187    /// Returns the port number.
188    #[cfg(target_os = "linux")]
189    fn slow_mock_server(delay: std::time::Duration) -> u16 {
190        use std::io::Write;
191        use std::net::TcpListener;
192        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
193        let port = listener.local_addr().unwrap().port();
194        std::thread::spawn(move || {
195            if let Ok((mut s, _)) = listener.accept() {
196                std::thread::sleep(delay);
197                let _ = s.write_all(b"HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n");
198            }
199        });
200        port
201    }
202
203    #[cfg(target_os = "linux")]
204    #[test]
205    fn test_per_phase_timeout_no_helper_thread() {
206        use std::sync::mpsc;
207        let port = slow_mock_server(Duration::from_millis(200));
208        let url = format!("http://127.0.0.1:{port}");
209        let baseline = thread_count();
210        let (done_tx, done_rx) = mpsc::channel::<()>();
211        std::thread::spawn(move || {
212            let agent = new_imds_agent();
213            let _ = agent.get(&url).call();
214            let _ = done_tx.send(());
215        });
216        std::thread::sleep(Duration::from_millis(50));
217        let during = thread_count();
218        done_rx.recv().unwrap();
219        let pid = std::process::id();
220        let thread_names: Vec<String> = std::fs::read_dir(format!("/proc/{pid}/task"))
221            .into_iter()
222            .flatten()
223            .filter_map(|e| e.ok())
224            .filter_map(|e| std::fs::read_to_string(e.path().join("comm")).ok())
225            .map(|s| s.trim().to_string())
226            .collect();
227        assert!(
228            during <= baseline + 1,
229            "ureq spawned extra thread(s) under per-phase timeout: \
230             baseline={baseline} during={during} threads={thread_names:?}"
231        );
232    }
233}