Skip to main content

CommonLibrary/Telemetry/
EmitOTLPSpan.rs

1
2//! Fire-and-forget OTLP span exporter. Lifted from Mountain's
3//! `IPC/DevLog/EmitOTLPSpan` so Air / Echo / Rest / Grove / Mist /
4//! SideCar all share the same raw HTTP path. Single failed POST flips
5//! `OTLP_AVAILABLE` to false so a missing collector doesn't tax every
6//! emit. Release builds compile out via `cfg!(debug_assertions)`.
7
8use std::{
9	collections::hash_map::DefaultHasher,
10	hash::{Hash, Hasher},
11	sync::{
12		OnceLock,
13		atomic::{AtomicBool, Ordering},
14	},
15	time::{SystemTime, UNIX_EPOCH},
16};
17
18use crate::Telemetry::{Client, IsAllowed};
19
20static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
21
22static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
23
24fn NowNano() -> u64 {
25	SystemTime::now()
26		.duration_since(UNIX_EPOCH)
27		.map(|D| D.as_nanos() as u64)
28		.unwrap_or(0)
29}
30
31fn TraceId() -> &'static str {
32	OTLP_TRACE_ID.get_or_init(|| {
33		let mut H = DefaultHasher::new();
34		std::process::id().hash(&mut H);
35		NowNano().hash(&mut H);
36		format!("{:032x}", H.finish() as u128)
37	})
38}
39
40fn RandU64() -> u64 {
41	let mut H = DefaultHasher::new();
42
43	std::thread::current().id().hash(&mut H);
44
45	NowNano().hash(&mut H);
46
47	H.finish()
48}
49
50fn ParseEndpoint(Endpoint:&str) -> (String, String) {
51	let WithoutScheme = Endpoint
52		.strip_prefix("http://")
53		.or_else(|| Endpoint.strip_prefix("https://"))
54		.unwrap_or(Endpoint);
55
56	let (HostPort, Path) = match WithoutScheme.split_once('/') {
57		Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
58
59		None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
60	};
61
62	let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
63
64	(HostPort, PathFinal)
65}
66
67/// Emit one span. `StartNano` / `EndNano` are wall-clock (not monotonic)
68/// nanosecond timestamps - use `NowNano()` from the caller's start.
69pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
70	if !IsAllowed::OTLP() {
71		return;
72	}
73
74	if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
75		return;
76	}
77
78	let Configuration = IsAllowed::Cached();
79
80	let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
81
82	let SpanId = format!("{:016x}", RandU64());
83
84	let TraceIdString = TraceId().to_string();
85
86	let SpanName = Name.to_string();
87
88	let AttributesJson:Vec<String> = Attributes
89		.iter()
90		.map(|(K, V)| {
91			format!(
92				r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
93				K,
94				V.replace('\\', "\\\\").replace('"', "\\\"")
95			)
96		})
97		.collect();
98
99	let IsError = SpanName.contains("error");
100
101	let StatusCode = if IsError { 2 } else { 1 };
102
103	let ServiceName = format!("land-editor-{}", TierStr);
104
105	let Payload = format!(
106		concat!(
107			r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
108			r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
109			r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
110			r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
111			r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
112			r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
113			r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
114			r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
115		),
116		ServiceName,
117		TierStr,
118		TierStr,
119		TraceIdString,
120		SpanId,
121		SpanName,
122		StartNano,
123		EndNano,
124		AttributesJson.join(","),
125		StatusCode,
126	);
127
128	let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.OTLPEndpoint);
129
130	std::thread::spawn(move || {
131		use std::{
132			io::{Read as IoRead, Write as IoWrite},
133			net::TcpStream,
134			time::Duration,
135		};
136
137		let Ok(SocketAddress) = HostAddress.parse() else {
138			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
139			return;
140		};
141		let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
142			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
143			return;
144		};
145		let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
146		let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
147
148		let HttpReq = format!(
149			"POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
150			 close\r\n\r\n",
151			PathSegment,
152			HostAddress,
153			Payload.len()
154		);
155		if Stream.write_all(HttpReq.as_bytes()).is_err() {
156			return;
157		}
158		if Stream.write_all(Payload.as_bytes()).is_err() {
159			return;
160		}
161		let mut Buf = [0u8; 32];
162		let _ = Stream.read(&mut Buf);
163		if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
164			OTLP_AVAILABLE.store(false, Ordering::Relaxed);
165		}
166	});
167}
168
169/// Helper exposed to callers that need a span window timestamp.
170pub fn NowNanoPub() -> u64 { NowNano() }