1
2use std::{
16 collections::HashMap,
17 sync::{
18 Arc,
19 atomic::{AtomicBool, AtomicU64, Ordering},
20 },
21 time::Duration,
22};
23
24use dashmap::DashMap;
25use lazy_static::lazy_static;
26use parking_lot::Mutex;
27use serde_json::Value;
28use tokio::sync::{mpsc, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::Streaming;
31
32use super::{
33 Error::VineError,
34 Generated::{
35 CancelOperationRequest,
36 Envelope,
37 GenericNotification,
38 GenericRequest,
39 GenericResponse,
40 RpcError,
41 cocoon_service_client::CocoonServiceClient,
42 envelope::Payload,
43 },
44};
45use crate::dev_log;
46
47const SINK_CAPACITY:usize = 1024;
51
52pub struct Multiplexer {
56 SideCarIdentifier:String,
57
58 Sink:mpsc::Sender<Envelope>,
59
60 Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
61
62 NextRequestIdentifier:AtomicU64,
63
64 Closed:AtomicBool,
65}
66
67lazy_static! {
68
69 static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
73}
74
75impl Multiplexer {
76 pub async fn Open(
81 SideCarIdentifier:String,
82
83 mut Client:CocoonServiceClient<tonic::transport::Channel>,
84 ) -> Result<Arc<Self>, VineError> {
85 let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
86
87 let OutboundStream = ReceiverStream::new(OutboundReceiver);
88
89 let Response = Client
90 .open_channel_from_mountain(OutboundStream)
91 .await
92 .map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
93
94 let InboundStream:Streaming<Envelope> = Response.into_inner();
95
96 let SelfReference = Arc::new(Self {
97 SideCarIdentifier:SideCarIdentifier.clone(),
98 Sink,
99 Pending:Arc::new(DashMap::new()),
100 NextRequestIdentifier:AtomicU64::new(1),
101 Closed:AtomicBool::new(false),
102 });
103
104 let SelfForReadPump = SelfReference.clone();
106
107 tokio::spawn(async move {
108 ReadPump(InboundStream, SelfForReadPump).await;
109 });
110
111 MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
113
114 Ok(SelfReference)
115 }
116
117 pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
121
122 pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
125
126 pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
129 if self.Closed.load(Ordering::Relaxed) {
130 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
131 }
132
133 let Bytes = serde_json::to_vec(&Parameters)?;
134
135 let Frame = Envelope {
136 payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
137 };
138
139 self.Sink
140 .send(Frame)
141 .await
142 .map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
143 }
144
145 pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
150 if self.Closed.load(Ordering::Relaxed) {
151 return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
152 }
153
154 let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
155
156 let (Tx, Rx) = oneshot::channel();
157
158 self.Pending.insert(Identifier, Tx);
159
160 let Bytes = serde_json::to_vec(&Parameters)?;
161
162 let MethodForError = Method.clone();
163
164 let Frame = Envelope {
165 payload:Some(Payload::Request(GenericRequest {
166 request_identifier:Identifier,
167 method:Method,
168 parameter:Bytes,
169 })),
170 };
171
172 if self.Sink.send(Frame).await.is_err() {
173 self.Pending.remove(&Identifier);
174
175 return Err(VineError::RPCError(format!(
176 "Sink closed for sidecar {}",
177 self.SideCarIdentifier
178 )));
179 }
180
181 match tokio::time::timeout(Timeout, Rx).await {
182 Ok(Ok(Response)) => {
183 if let Some(Error) = Response.error {
184 return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
185 }
186
187 if Response.result.is_empty() {
188 return Ok(Value::Null);
189 }
190
191 serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
192 },
193
194 Ok(Err(_)) => {
195 self.Pending.remove(&Identifier);
196
197 Err(VineError::RPCError(
198 "response sender closed (peer disconnect mid-request)".into(),
199 ))
200 },
201
202 Err(_) => {
203 self.Pending.remove(&Identifier);
204
205 Err(VineError::RequestTimeout {
206 SideCarIdentifier:self.SideCarIdentifier.clone(),
207 MethodName:MethodForError,
208 TimeoutMilliseconds:Timeout.as_millis() as u64,
209 })
210 },
211 }
212 }
213
214 pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
218 if self.Closed.load(Ordering::Relaxed) {
219 return Ok(());
220 }
221
222 let Frame = Envelope {
223 payload:Some(Payload::Cancel(CancelOperationRequest {
224 request_identifier_to_cancel:RequestIdentifier,
225 })),
226 };
227
228 let _ = self.Sink.send(Frame).await;
229
230 Ok(())
231 }
232
233 pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
234
235 pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
236}
237
238async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
243 use futures_util::StreamExt;
244
245 while let Some(FrameResult) = Stream.next().await {
246 let Frame = match FrameResult {
247 Ok(F) => F,
248
249 Err(Status) => {
250 dev_log!(
251 "grpc",
252 "[Vine::Multiplexer] read err on {}: {}",
253 State.SideCarIdentifier,
254 Status
255 );
256
257 break;
258 },
259 };
260
261 let Payload = match Frame.payload {
262 Some(P) => P,
263
264 None => continue,
265 };
266
267 match Payload {
268 Payload::Notification(N) => {
269 let Parameters:Value = if N.parameter.is_empty() {
270 Value::Null
271 } else {
272 serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
273 };
274
275 super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
276 },
277
278 Payload::Response(R) => {
279 let Identifier = R.request_identifier;
280
281 if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
282 let _ = Sender.send(R);
283 }
284
285 },
288
289 Payload::Request(_) => {
290
291 },
298
299 Payload::Cancel(_) => {
300
301 },
305 }
306 }
307
308 State.Closed.store(true, Ordering::Relaxed);
309
310 let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
313
314 for Key in Keys {
315 if let Some((_, Sender)) = State.Pending.remove(&Key) {
316 let _ = Sender.send(GenericResponse {
317 request_identifier:Key,
318 result:Vec::new(),
319 error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
320 });
321 }
322 }
323
324 Multiplexer::Deregister(&State.SideCarIdentifier);
325
326 dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
327}