resource_tracker/collector/clouds/mod.rs
1use crate::metrics::CloudInfo;
2use std::time::Duration;
3use ureq::config::Config as UreqConfig;
4
5mod alicloud;
6mod aws;
7mod azure;
8mod gcp;
9mod hetzner;
10mod ovh;
11mod upcloud;
12mod vultr;
13
14// ---------------------------------------------------------------------------
15// Shared IMDS helpers (available to all cloud submodules via `super::`)
16// ---------------------------------------------------------------------------
17
18/// Upper bound for each HTTP call made by a vendor probe.
19const IMDS_TIMEOUT: Duration = Duration::from_secs(1);
20
21fn new_imds_agent() -> ureq::Agent {
22 // Per-phase timeouts instead of timeout_global: timeout_global's thread-
23 // creation behavior is an undocumented ureq internal that could change
24 // across versions. Per-phase timeouts are a safer, more explicit contract:
25 // each phase (connect, recv_response) is bounded independently with no
26 // reliance on ureq internals. T-UREQ-01 verifies no extra threads are
27 // spawned per request (a regression guard against ureq version bumps).
28 UreqConfig::builder()
29 .timeout_connect(Some(IMDS_TIMEOUT))
30 .timeout_recv_response(Some(IMDS_TIMEOUT))
31 .build()
32 .new_agent()
33}
34
35fn imds_get(agent: &ureq::Agent, url: &str) -> Option<String> {
36 imds_get_headers(agent, url, &[])
37}
38
39fn imds_get_headers(agent: &ureq::Agent, url: &str, headers: &[(&str, &str)]) -> Option<String> {
40 let mut req = agent.get(url);
41 for (k, v) in headers {
42 req = req.header(*k, *v);
43 }
44 req.call()
45 .ok()
46 .and_then(|mut r| r.body_mut().read_to_string().ok())
47 .map(|s| s.trim().to_string())
48 .filter(|s| !s.is_empty())
49}
50
51// ---------------------------------------------------------------------------
52// Probe orchestration
53// ---------------------------------------------------------------------------
54
55/// Precedence order: AWS → GCP → Azure → Hetzner → UpCloud → Vultr → AliCloud → OVH.
56/// To add a new cloud: implement `pub fn probe() -> Option<CloudInfo>` in a new
57/// submodule, declare it above, and append it here.
58const PROBES: &[fn() -> Option<CloudInfo>] = &[
59 aws::probe,
60 gcp::probe,
61 azure::probe,
62 hetzner::probe,
63 upcloud::probe,
64 vultr::probe,
65 alicloud::probe,
66 ovh::probe,
67];
68
69/// Run vendor probes; parallel when threads are available, serial fallback otherwise.
70///
71/// Join order follows the `PROBES` precedence list: the first successful probe
72/// wins. Each HTTP call uses [`IMDS_TIMEOUT`]. Per-vendor threads are only used
73/// when [`crate::thread_util::spawn_named`] succeeds so EAGAIN under tight PID
74/// limits falls back to sequential probes on the caller thread.
75pub fn probe_cloud() -> CloudInfo {
76 let mut handles = Vec::new();
77 let mut deferred = Vec::new();
78
79 for &p in PROBES {
80 match crate::thread_util::spawn_named("cloud-probe", p) {
81 Some(h) => handles.push(h),
82 None => deferred.push(p),
83 }
84 }
85
86 for handle in handles {
87 if let Ok(Some(info)) = handle.join() {
88 return info;
89 }
90 }
91 for p in deferred {
92 if let Some(info) = p() {
93 return info;
94 }
95 }
96 CloudInfo::default()
97}
98
99/// Spawn a background thread that probes cloud IMDS endpoints.
100///
101/// Call this **before** the warm-up sleep so probes run **in parallel** with the
102/// main thread's warm-up (stateful collector priming + one `interval` sleep).
103/// Poll the returned [`Receiver`] with `try_recv()` after warm-up; if probes
104/// finished during the sleep the result is waiting immediately.
105///
106/// Returns `None` when no thread could be created (EAGAIN under PID limits); the
107/// caller should treat cloud info as permanently unavailable in that case.
108///
109/// [`Receiver`]: std::sync::mpsc::Receiver
110pub fn spawn_cloud_discovery() -> Option<std::sync::mpsc::Receiver<CloudInfo>> {
111 let (tx, rx) = std::sync::mpsc::channel();
112 // Drop the JoinHandle: the thread detaches and sends its result via tx.
113 // If spawn fails (EAGAIN under PID limits), `?` returns None to the caller.
114 crate::thread_util::spawn_named("cloud-discovery", move || {
115 let _ = tx.send(probe_cloud());
116 })?;
117 Some(rx)
118}
119
120// ---------------------------------------------------------------------------
121// Tests
122// ---------------------------------------------------------------------------
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 // T-CLOUD-01: spawn_cloud_discovery resolves without panic.
129 // Each vendor's HTTP calls use IMDS_TIMEOUT; all vendor probes run in parallel.
130 #[test]
131 fn test_spawn_cloud_discovery_joins_without_panic() {
132 let cloud = match spawn_cloud_discovery() {
133 Some(rx) => rx.recv().unwrap_or_default(),
134 None => probe_cloud(),
135 };
136 let _cloud = cloud;
137 // Result may be default (no cloud) or populated (running on a cloud VM).
138 // Either outcome is valid; the test only checks for no panic.
139 }
140
141 // -----------------------------------------------------------------------
142 // T-UREQ-01: ureq thread-count regression guard
143 // -----------------------------------------------------------------------
144 //
145 // Verifies that requests made with new_imds_agent() (per-phase timeouts)
146 // do not spawn extra threads beyond the request thread itself. Guards
147 // against ureq version bumps that increase per-request thread creation,
148 // consuming PID budget under tight pids.max limits.
149 //
150 // Linux-only: thread count is read from /proc/self/status.
151 // A slow mock server (200 ms delay) holds the connection open so any
152 // short-lived helper thread is alive when we sample mid-request.
153 //
154 // The URL uses 127.0.0.1 (numeric IP), NOT "localhost". Rust's
155 // ToSocketAddrs resolves numeric IPs via IpAddr::from_str without calling
156 // getaddrinfo -- no NSS stack, no systemd-resolved IPC, no resolver
157 // plugin on any platform. Using "localhost" caused GitHub Actions CI
158 // runners (both x86_64 and aarch64) to spawn a systemd-resolved NSS
159 // worker thread (Ubuntu 24.04 default: hosts: files resolve dns), inflating
160 // the count to baseline+2 and producing false CI failures. Local machines
161 // with hosts: files dns (no resolve plugin) did not reproduce this.
162 // Using 127.0.0.1 also matches production: all IMDS endpoints are
163 // link-local IPs (169.254.169.254), never hostnames.
164 //
165 // Expected on all Linux configurations: during = baseline + 1.
166 //
167 // The panic message includes /proc/self/task/*/comm thread names so any
168 // future CI failure is self-diagnosing without a second investigation round.
169 //
170 // NOTE: A timeout_global negative control is not practical: ureq 3.x does
171 // not spawn extra threads for timeout_global on HTTP connections to IPs.
172
173 /// Read the `Threads:` field from /proc/self/status.
174 #[cfg(target_os = "linux")]
175 fn thread_count() -> usize {
176 std::fs::read_to_string("/proc/self/status")
177 .unwrap_or_default()
178 .lines()
179 .find(|l| l.starts_with("Threads:"))
180 .and_then(|l| l.split_whitespace().nth(1))
181 .and_then(|n| n.parse().ok())
182 .unwrap_or(0)
183 }
184
185 /// Bind a TCP listener on 127.0.0.1:0, spawn a thread that accepts one
186 /// connection, sleeps `delay`, then sends a minimal HTTP 200 response.
187 /// Returns the port number.
188 #[cfg(target_os = "linux")]
189 fn slow_mock_server(delay: std::time::Duration) -> u16 {
190 use std::io::Write;
191 use std::net::TcpListener;
192 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
193 let port = listener.local_addr().unwrap().port();
194 std::thread::spawn(move || {
195 if let Ok((mut s, _)) = listener.accept() {
196 std::thread::sleep(delay);
197 let _ = s.write_all(b"HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n");
198 }
199 });
200 port
201 }
202
203 #[cfg(target_os = "linux")]
204 #[test]
205 fn test_per_phase_timeout_no_helper_thread() {
206 use std::sync::mpsc;
207 let port = slow_mock_server(Duration::from_millis(200));
208 let url = format!("http://127.0.0.1:{port}");
209 let baseline = thread_count();
210 let (done_tx, done_rx) = mpsc::channel::<()>();
211 std::thread::spawn(move || {
212 let agent = new_imds_agent();
213 let _ = agent.get(&url).call();
214 let _ = done_tx.send(());
215 });
216 std::thread::sleep(Duration::from_millis(50));
217 let during = thread_count();
218 done_rx.recv().unwrap();
219 let pid = std::process::id();
220 let thread_names: Vec<String> = std::fs::read_dir(format!("/proc/{pid}/task"))
221 .into_iter()
222 .flatten()
223 .filter_map(|e| e.ok())
224 .filter_map(|e| std::fs::read_to_string(e.path().join("comm")).ok())
225 .map(|s| s.trim().to_string())
226 .collect();
227 assert!(
228 during <= baseline + 1,
229 "ureq spawned extra thread(s) under per-phase timeout: \
230 baseline={baseline} during={during} threads={thread_names:?}"
231 );
232 }
233}