Skip to main content

Mountain/Vine/
Multiplexer.rs

1
2//! Bidirectional streaming multiplexer for the Vine gRPC bus.
3//!
4//! Owns one bidirectional h2 stream per sidecar. Inbound notifications
5//! fan out to the process-wide broadcast
6//! (`Vine::Client::SubscribeNotifications`); inbound responses route to the
7//! matching pending-request `oneshot` sender. Inbound reverse-RPC requests and
8//! cancellations are TODO for a follow-up phase.
9//!
10//! This is the P14.1 foundation of Patch 14 - it lands the open(),
11//! Notify(), Request(), and ReadPump skeleton so subsequent phases can
12//! wire `SendNotification` / `SendRequest` to consult the multiplexer
13//! when `LAND_VINE_STREAMING=1` is set.
14
15use std::{
16	collections::HashMap,
17	sync::{
18		Arc,
19		atomic::{AtomicBool, AtomicU64, Ordering},
20	},
21	time::Duration,
22};
23
24use dashmap::DashMap;
25use lazy_static::lazy_static;
26use parking_lot::Mutex;
27use serde_json::Value;
28use tokio::sync::{mpsc, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30use tonic::Streaming;
31
32use super::{
33	Error::VineError,
34	Generated::{
35		CancelOperationRequest,
36		Envelope,
37		GenericNotification,
38		GenericRequest,
39		GenericResponse,
40		RpcError,
41		cocoon_service_client::CocoonServiceClient,
42		envelope::Payload,
43	},
44};
45use crate::dev_log;
46
47/// Outbound queue capacity per multiplexer. Bounded so a stalled
48/// sidecar applies backpressure to the producer side instead of
49/// burning unbounded heap.
50const SINK_CAPACITY:usize = 1024;
51
52/// One multiplexer per sidecar connection. Holds the outbound sink,
53/// the pending-request correlation map, and a shared-state shutdown
54/// flag.
55pub struct Multiplexer {
56	SideCarIdentifier:String,
57
58	Sink:mpsc::Sender<Envelope>,
59
60	Pending:Arc<DashMap<u64, oneshot::Sender<GenericResponse>>>,
61
62	NextRequestIdentifier:AtomicU64,
63
64	Closed:AtomicBool,
65}
66
67lazy_static! {
68
69	/// Process-wide registry, one entry per sidecar identifier.
70	/// Lookup site for `SendNotification` / `SendRequest` to consult
71	/// when `LAND_VINE_STREAMING=1`.
72	static ref MULTIPLEXERS:Arc<Mutex<HashMap<String, Arc<Multiplexer>>>> = Arc::new(Mutex::new(HashMap::new()));
73}
74
75impl Multiplexer {
76	/// Open a bidirectional streaming channel against an existing
77	/// `CocoonServiceClient`. Spawns the read pump as a detached
78	/// tokio task and registers the multiplexer in the global
79	/// registry. Returns once the stream is established.
80	pub async fn Open(
81		SideCarIdentifier:String,
82
83		mut Client:CocoonServiceClient<tonic::transport::Channel>,
84	) -> Result<Arc<Self>, VineError> {
85		let (Sink, OutboundReceiver) = mpsc::channel::<Envelope>(SINK_CAPACITY);
86
87		let OutboundStream = ReceiverStream::new(OutboundReceiver);
88
89		let Response = Client
90			.open_channel_from_mountain(OutboundStream)
91			.await
92			.map_err(|S| VineError::RPCError(format!("OpenChannelFromMountain failed: {}", S)))?;
93
94		let InboundStream:Streaming<Envelope> = Response.into_inner();
95
96		let SelfReference = Arc::new(Self {
97			SideCarIdentifier:SideCarIdentifier.clone(),
98			Sink,
99			Pending:Arc::new(DashMap::new()),
100			NextRequestIdentifier:AtomicU64::new(1),
101			Closed:AtomicBool::new(false),
102		});
103
104		// Spawn the read pump.
105		let SelfForReadPump = SelfReference.clone();
106
107		tokio::spawn(async move {
108			ReadPump(InboundStream, SelfForReadPump).await;
109		});
110
111		// Register globally so consumers can look us up.
112		MULTIPLEXERS.lock().insert(SideCarIdentifier, SelfReference.clone());
113
114		Ok(SelfReference)
115	}
116
117	/// Look up the multiplexer for a sidecar. Returns `None` if no
118	/// streaming connection has been opened for that sidecar (the
119	/// caller should fall back to the unary path).
120	pub fn Lookup(SideCarIdentifier:&str) -> Option<Arc<Self>> { MULTIPLEXERS.lock().get(SideCarIdentifier).cloned() }
121
122	/// Drop the registry entry. Called by the read-pump when the
123	/// stream closes.
124	pub fn Deregister(SideCarIdentifier:&str) { MULTIPLEXERS.lock().remove(SideCarIdentifier); }
125
126	/// Send a notification frame (fire-and-forget). Non-blocking
127	/// modulo Sink backpressure (capacity `SINK_CAPACITY`).
128	pub async fn Notify(&self, Method:String, Parameters:Value) -> Result<(), VineError> {
129		if self.Closed.load(Ordering::Relaxed) {
130			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
131		}
132
133		let Bytes = serde_json::to_vec(&Parameters)?;
134
135		let Frame = Envelope {
136			payload:Some(Payload::Notification(GenericNotification { method:Method, parameter:Bytes })),
137		};
138
139		self.Sink
140			.send(Frame)
141			.await
142			.map_err(|_| VineError::RPCError(format!("Sink closed for sidecar {}", self.SideCarIdentifier)))
143	}
144
145	/// Send a request and await the matching response. Cancels the
146	/// pending entry on timeout. The future is `Send + 'static`-clean
147	/// so callers can drive it inside `tokio::select!` for finer-
148	/// grained cancellation.
149	pub async fn Request(&self, Method:String, Parameters:Value, Timeout:Duration) -> Result<Value, VineError> {
150		if self.Closed.load(Ordering::Relaxed) {
151			return Err(VineError::ClientNotConnected(self.SideCarIdentifier.clone()));
152		}
153
154		let Identifier = self.NextRequestIdentifier.fetch_add(1, Ordering::Relaxed);
155
156		let (Tx, Rx) = oneshot::channel();
157
158		self.Pending.insert(Identifier, Tx);
159
160		let Bytes = serde_json::to_vec(&Parameters)?;
161
162		let MethodForError = Method.clone();
163
164		let Frame = Envelope {
165			payload:Some(Payload::Request(GenericRequest {
166				request_identifier:Identifier,
167				method:Method,
168				parameter:Bytes,
169			})),
170		};
171
172		if self.Sink.send(Frame).await.is_err() {
173			self.Pending.remove(&Identifier);
174
175			return Err(VineError::RPCError(format!(
176				"Sink closed for sidecar {}",
177				self.SideCarIdentifier
178			)));
179		}
180
181		match tokio::time::timeout(Timeout, Rx).await {
182			Ok(Ok(Response)) => {
183				if let Some(Error) = Response.error {
184					return Err(VineError::RPCError(format!("code={} message={}", Error.code, Error.message)));
185				}
186
187				if Response.result.is_empty() {
188					return Ok(Value::Null);
189				}
190
191				serde_json::from_slice::<Value>(&Response.result).map_err(|E| VineError::SerializationError(E))
192			},
193
194			Ok(Err(_)) => {
195				self.Pending.remove(&Identifier);
196
197				Err(VineError::RPCError(
198					"response sender closed (peer disconnect mid-request)".into(),
199				))
200			},
201
202			Err(_) => {
203				self.Pending.remove(&Identifier);
204
205				Err(VineError::RequestTimeout {
206					SideCarIdentifier:self.SideCarIdentifier.clone(),
207					MethodName:MethodForError,
208					TimeoutMilliseconds:Timeout.as_millis() as u64,
209				})
210			},
211		}
212	}
213
214	/// Send a Cancel frame asking the peer to abort an in-flight
215	/// request matching `RequestIdentifier`. Best-effort; the peer
216	/// chooses whether to honour it.
217	pub async fn Cancel(&self, RequestIdentifier:u64) -> Result<(), VineError> {
218		if self.Closed.load(Ordering::Relaxed) {
219			return Ok(());
220		}
221
222		let Frame = Envelope {
223			payload:Some(Payload::Cancel(CancelOperationRequest {
224				request_identifier_to_cancel:RequestIdentifier,
225			})),
226		};
227
228		let _ = self.Sink.send(Frame).await;
229
230		Ok(())
231	}
232
233	pub fn IsClosed(&self) -> bool { self.Closed.load(Ordering::Relaxed) }
234
235	pub fn SideCarIdentifierBorrow(&self) -> &str { &self.SideCarIdentifier }
236}
237
238/// Drain the inbound side of the bidirectional stream. Notifications
239/// fan out to the process-wide broadcast; responses wake the parked
240/// `Request` future. Reverse-RPC requests and cancellations are
241/// recorded for a follow-up phase.
242async fn ReadPump(mut Stream:Streaming<Envelope>, State:Arc<Multiplexer>) {
243	use futures_util::StreamExt;
244
245	while let Some(FrameResult) = Stream.next().await {
246		let Frame = match FrameResult {
247			Ok(F) => F,
248
249			Err(Status) => {
250				dev_log!(
251					"grpc",
252					"[Vine::Multiplexer] read err on {}: {}",
253					State.SideCarIdentifier,
254					Status
255				);
256
257				break;
258			},
259		};
260
261		let Payload = match Frame.payload {
262			Some(P) => P,
263
264			None => continue,
265		};
266
267		match Payload {
268			Payload::Notification(N) => {
269				let Parameters:Value = if N.parameter.is_empty() {
270					Value::Null
271				} else {
272					serde_json::from_slice(&N.parameter).unwrap_or(Value::Null)
273				};
274
275				super::Client::PublishNotificationFromMux::Fn(&State.SideCarIdentifier, &N.method, &Parameters);
276			},
277
278			Payload::Response(R) => {
279				let Identifier = R.request_identifier;
280
281				if let Some((_, Sender)) = State.Pending.remove(&Identifier) {
282					let _ = Sender.send(R);
283				}
284
285				// A Response with no matching pending entry is a
286				// duplicate or post-cancel arrival; drop silently.
287			},
288
289			Payload::Request(_) => {
290
291				// TODO P14.1.1: dispatch the inbound (reverse-RPC)
292				// request to the same handler tree the unary path
293				// uses, then enqueue the GenericResponse onto Sink.
294				// For now we drop, which is safe: the unary path is
295				// still authoritative until phase P14.4 lands the
296				// streaming handler tree on Cocoon side.
297			},
298
299			Payload::Cancel(_) => {
300
301				// TODO P14.1.2: signal abort for the in-flight
302				// handler. For now no-op (the unary path doesn't
303				// support cancel either).
304			},
305		}
306	}
307
308	State.Closed.store(true, Ordering::Relaxed);
309
310	// Drain pending senders with disconnect errors so awaiting
311	// fibers don't hang forever.
312	let Keys:Vec<u64> = State.Pending.iter().map(|R| *R.key()).collect();
313
314	for Key in Keys {
315		if let Some((_, Sender)) = State.Pending.remove(&Key) {
316			let _ = Sender.send(GenericResponse {
317				request_identifier:Key,
318				result:Vec::new(),
319				error:Some(RpcError { code:-32099, message:"stream closed".into(), data:Vec::new() }),
320			});
321		}
322	}
323
324	Multiplexer::Deregister(&State.SideCarIdentifier);
325
326	dev_log!("grpc", "[Vine::Multiplexer] closed sidecar={}", State.SideCarIdentifier);
327}