Mountain/Vine/Server/Notification/
OutputChannelCoalesce.rs1
2use std::{
40 collections::HashMap,
41 sync::{Mutex as StandardMutex, OnceLock},
42 time::Duration,
43};
44
45use serde_json::{Value, json};
46use tauri::{AppHandle, Emitter};
47use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
48
49use crate::dev_log;
50
51const COALESCE_WINDOW:Duration = Duration::from_millis(50);
56
57const MAX_BUFFERED_BYTES:usize = 64 * 1024;
61
62struct PendingAppend {
63 Channel:String,
64
65 Value:String,
66}
67
68struct CoalesceChannel {
69 Sender:UnboundedSender<(AppHandle, PendingAppend)>,
70}
71
72static COALESCE_CHANNEL:OnceLock<CoalesceChannel> = OnceLock::new();
73
74fn IsDisabled() -> bool { matches!(std::env::var("OutputCoalesce").as_deref(), Ok("0") | Ok("false")) }
75
76fn GetOrInitChannel() -> &'static CoalesceChannel {
77 COALESCE_CHANNEL.get_or_init(|| {
78 let (Tx, mut Rx) = unbounded_channel::<(AppHandle, PendingAppend)>();
79
80 tokio::spawn(async move {
81 let Buffers:StandardMutex<HashMap<String, String>> = StandardMutex::new(HashMap::new());
85
86 loop {
87 let Received = Rx.recv().await;
88
89 let (Handle, First) = match Received {
90 None => break,
91 Some(Pair) => Pair,
92 };
93
94 {
96 let mut Guard = match Buffers.lock() {
97 Ok(G) => G,
98 Err(_) => continue,
99 };
100
101 let Slot = Guard.entry(First.Channel.clone()).or_default();
102
103 Slot.push_str(&First.Value);
104
105 if Slot.len() >= MAX_BUFFERED_BYTES {
106 let Payload = std::mem::take(Slot);
107
108 drop(Guard);
109
110 FlushOne(&Handle, &First.Channel, &Payload);
111
112 continue;
113 }
114 }
115
116 let mut Drain:Vec<(AppHandle, PendingAppend)> = Vec::new();
118
119 let Drained = Rx.recv_many(&mut Drain, 4096).await;
120
121 let _ = Drained;
122
123 for (_, Pending) in Drain.drain(..) {
124 if let Ok(mut Guard) = Buffers.lock() {
125 let Slot = Guard.entry(Pending.Channel).or_default();
126 Slot.push_str(&Pending.Value);
127 }
128 }
129
130 tokio::time::sleep(COALESCE_WINDOW).await;
134
135 let mut LateDrain:Vec<(AppHandle, PendingAppend)> = Vec::new();
136
137 let _ = Rx.recv_many(&mut LateDrain, 4096).await;
138
139 for (_, Pending) in LateDrain.drain(..) {
140 if let Ok(mut Guard) = Buffers.lock() {
141 let Slot = Guard.entry(Pending.Channel).or_default();
142 Slot.push_str(&Pending.Value);
143 }
144 }
145
146 let HandleForFlush = Handle.clone();
148
149 let Snapshots = {
150 match Buffers.lock() {
151 Ok(mut Guard) => {
152 Guard
153 .iter_mut()
154 .filter(|(_, V)| !V.is_empty())
155 .map(|(K, V)| (K.clone(), std::mem::take(V)))
156 .collect::<Vec<_>>()
157 },
158 Err(_) => continue,
159 }
160 };
161
162 for (Channel, Payload) in Snapshots {
163 FlushOne(&HandleForFlush, &Channel, &Payload);
164 }
165 }
166 });
167
168 CoalesceChannel { Sender:Tx }
169 })
170}
171
172fn FlushOne(Handle:&AppHandle, Channel:&str, Payload:&str) {
173 let _ = Handle.emit(
174 "sky://output/append",
175 json!({
176 "channel": Channel,
177 "value": Payload,
178 }),
179 );
180
181 let IsGitFamily = Channel.eq_ignore_ascii_case("git")
185 || Channel.eq_ignore_ascii_case("source control")
186 || Channel.eq_ignore_ascii_case("scm");
187
188 let LineCount = Payload.matches('\n').count();
189
190 if IsGitFamily {
191 dev_log!(
192 "grpc",
193 "[OutputChannel:{}] flush bytes={} lines~{}",
194 Channel,
195 Payload.len(),
196 LineCount
197 );
198 } else {
199 dev_log!(
200 "output-verbose",
201 "[OutputChannel] flush channel={} bytes={} lines~{}",
202 Channel,
203 Payload.len(),
204 LineCount
205 );
206 }
207}
208
209pub fn TryEnqueue(Handle:&AppHandle, Channel:String, Value:String) -> bool {
214 if IsDisabled() {
215 return false;
216 }
217
218 let Ch = GetOrInitChannel();
219
220 let _ = Ch.Sender.send((Handle.clone(), PendingAppend { Channel, Value }));
221
222 true
223}