CommonLibrary/Telemetry/
EmitOTLPSpan.rs1
2use std::{
9 collections::hash_map::DefaultHasher,
10 hash::{Hash, Hasher},
11 sync::{
12 OnceLock,
13 atomic::{AtomicBool, Ordering},
14 },
15 time::{SystemTime, UNIX_EPOCH},
16};
17
18use crate::Telemetry::{Client, IsAllowed};
19
20static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
21
22static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
23
24fn NowNano() -> u64 {
25 SystemTime::now()
26 .duration_since(UNIX_EPOCH)
27 .map(|D| D.as_nanos() as u64)
28 .unwrap_or(0)
29}
30
31fn TraceId() -> &'static str {
32 OTLP_TRACE_ID.get_or_init(|| {
33 let mut H = DefaultHasher::new();
34 std::process::id().hash(&mut H);
35 NowNano().hash(&mut H);
36 format!("{:032x}", H.finish() as u128)
37 })
38}
39
40fn RandU64() -> u64 {
41 let mut H = DefaultHasher::new();
42
43 std::thread::current().id().hash(&mut H);
44
45 NowNano().hash(&mut H);
46
47 H.finish()
48}
49
50fn ParseEndpoint(Endpoint:&str) -> (String, String) {
51 let WithoutScheme = Endpoint
52 .strip_prefix("http://")
53 .or_else(|| Endpoint.strip_prefix("https://"))
54 .unwrap_or(Endpoint);
55
56 let (HostPort, Path) = match WithoutScheme.split_once('/') {
57 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
58
59 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
60 };
61
62 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
63
64 (HostPort, PathFinal)
65}
66
67pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
70 if !IsAllowed::OTLP() {
71 return;
72 }
73
74 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
75 return;
76 }
77
78 let Configuration = IsAllowed::Cached();
79
80 let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
81
82 let SpanId = format!("{:016x}", RandU64());
83
84 let TraceIdString = TraceId().to_string();
85
86 let SpanName = Name.to_string();
87
88 let AttributesJson:Vec<String> = Attributes
89 .iter()
90 .map(|(K, V)| {
91 format!(
92 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
93 K,
94 V.replace('\\', "\\\\").replace('"', "\\\"")
95 )
96 })
97 .collect();
98
99 let IsError = SpanName.contains("error");
100
101 let StatusCode = if IsError { 2 } else { 1 };
102
103 let ServiceName = format!("land-editor-{}", TierStr);
104
105 let Payload = format!(
106 concat!(
107 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
108 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
109 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
110 r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
111 r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
112 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
113 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
114 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
115 ),
116 ServiceName,
117 TierStr,
118 TierStr,
119 TraceIdString,
120 SpanId,
121 SpanName,
122 StartNano,
123 EndNano,
124 AttributesJson.join(","),
125 StatusCode,
126 );
127
128 let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.OTLPEndpoint);
129
130 std::thread::spawn(move || {
131 use std::{
132 io::{Read as IoRead, Write as IoWrite},
133 net::TcpStream,
134 time::Duration,
135 };
136
137 let Ok(SocketAddress) = HostAddress.parse() else {
138 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
139 return;
140 };
141 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
142 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
143 return;
144 };
145 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
146 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
147
148 let HttpReq = format!(
149 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
150 close\r\n\r\n",
151 PathSegment,
152 HostAddress,
153 Payload.len()
154 );
155 if Stream.write_all(HttpReq.as_bytes()).is_err() {
156 return;
157 }
158 if Stream.write_all(Payload.as_bytes()).is_err() {
159 return;
160 }
161 let mut Buf = [0u8; 32];
162 let _ = Stream.read(&mut Buf);
163 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
164 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
165 }
166 });
167}
168
169pub fn NowNanoPub() -> u64 { NowNano() }