Mountain/IPC/Enhanced/ConnectionPool/
Pool.rs1
2use std::{
10 collections::HashMap,
11 sync::Arc,
12 time::{Duration, Instant},
13};
14
15use tokio::{
16 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
17 time::{interval, timeout},
18};
19
20use crate::{
21 IPC::Enhanced::ConnectionPool::{
22 ConnectionHandle::Struct as ConnectionHandle,
23 HealthChecker::Struct as HealthChecker,
24 PoolConfig::Struct as PoolConfig,
25 PoolStats::Struct as PoolStats,
26 },
27 dev_log,
28};
29
30pub struct Struct {
31 pub config:PoolConfig,
32
33 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
34
35 pub semaphore:Arc<Semaphore>,
36
37 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
38
39 pub stats:Arc<RwLock<PoolStats>>,
40
41 pub health_checker:Arc<AsyncMutex<HealthChecker>>,
42
43 pub is_running:Arc<AsyncMutex<bool>>,
44}
45
46impl Struct {
47 pub fn new(config:PoolConfig) -> Self {
48 let max_connections = config.max_connections;
49
50 let min_connections = config.min_connections;
51
52 let pool = Self {
53 config:config.clone(),
54
55 connections:Arc::new(AsyncMutex::new(HashMap::new())),
56
57 semaphore:Arc::new(Semaphore::new(max_connections)),
58
59 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
60
61 stats:Arc::new(RwLock::new(PoolStats {
62 total_connections:0,
63 active_connections:0,
64 idle_connections:0,
65 healthy_connections:0,
66 max_connections,
67 min_connections,
68 wait_queue_size:0,
69 average_wait_time_ms:0.0,
70 total_operations:0,
71 successful_operations:0,
72 error_rate:0.0,
73 })),
74
75 health_checker:Arc::new(AsyncMutex::new(HealthChecker::new())),
76
77 is_running:Arc::new(AsyncMutex::new(false)),
78 };
79
80 dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
81
82 pool
83 }
84
85 pub async fn start(&self) -> Result<(), String> {
86 {
87 let mut running = self.is_running.lock().await;
88
89 if *running {
90 return Ok(());
91 }
92
93 *running = true;
94 }
95
96 self.start_health_monitoring().await;
97
98 self.start_connection_cleanup().await;
99
100 self.initialize_min_connections().await;
101
102 dev_log!("ipc", "[ConnectionPool] Started connection pool");
103
104 Ok(())
105 }
106
107 pub async fn stop(&self) -> Result<(), String> {
108 {
109 let mut running = self.is_running.lock().await;
110
111 if !*running {
112 return Ok(());
113 }
114
115 *running = false;
116 }
117
118 {
119 let mut connections = self.connections.lock().await;
120
121 connections.clear();
122 }
123
124 {
125 let mut wait_queue = self.wait_queue.lock().await;
126
127 for notifier in wait_queue.drain(..) {
128 notifier.notify_one();
129 }
130 }
131
132 dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
133
134 Ok(())
135 }
136
137 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
138 let start_time = Instant::now();
139
140 let _permit = timeout(
141 Duration::from_millis(self.config.connection_timeout_ms),
142 self.semaphore.acquire(),
143 )
144 .await
145 .map_err(|_| "Connection timeout".to_string())?
146 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
147
148 let wait_time = start_time.elapsed().as_millis() as f64;
149
150 {
151 let mut stats = self.stats.write().await;
152
153 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
154 / (stats.total_operations as f64 + 1.0);
155 }
156
157 let connection = self.find_or_create_connection().await?;
158
159 {
160 let mut stats = self.stats.write().await;
161
162 stats.active_connections += 1;
163
164 stats.total_operations += 1;
165 }
166
167 dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
168
169 Ok(connection)
170 }
171
172 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
173 let connection_id = handle.id.clone();
174
175 handle.last_used = Instant::now();
176
177 {
178 let mut connections = self.connections.lock().await;
179
180 connections.insert(handle.id.clone(), handle.clone());
181 }
182
183 {
184 let mut stats = self.stats.write().await;
185
186 stats.active_connections = stats.active_connections.saturating_sub(1);
187
188 stats.idle_connections += 1;
189 }
190
191 drop(handle);
192
193 dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
194 }
195
196 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
197 let mut connections = self.connections.lock().await;
198
199 for (_id, handle) in connections.iter_mut() {
200 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
201 handle.last_used = Instant::now();
202
203 return Ok(handle.clone());
204 }
205 }
206
207 let new_handle = ConnectionHandle::new();
208
209 connections.insert(new_handle.id.clone(), new_handle.clone());
210
211 {
212 let mut stats = self.stats.write().await;
213
214 stats.total_connections += 1;
215
216 stats.healthy_connections += 1;
217 }
218
219 Ok(new_handle)
220 }
221
222 async fn start_health_monitoring(&self) {
223 let pool = Arc::new(self.clone());
224
225 tokio::spawn(async move {
226 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
227
228 while *pool.is_running.lock().await {
229 interval.tick().await;
230
231 if let Err(e) = pool.check_connection_health().await {
232 dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
233 }
234 }
235 });
236 }
237
238 async fn start_connection_cleanup(&self) {
239 let pool = Arc::new(self.clone());
240
241 tokio::spawn(async move {
242 let mut interval = interval(Duration::from_secs(60));
243
244 while *pool.is_running.lock().await {
245 interval.tick().await;
246
247 let cleaned_count = pool.cleanup_stale_connections().await;
248 if cleaned_count > 0 {
249 dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
250 }
251 }
252 });
253 }
254
255 async fn initialize_min_connections(&self) {
256 let current_count = self.connections.lock().await.len();
257
258 if current_count < self.config.min_connections {
259 let needed = self.config.min_connections - current_count;
260
261 for _ in 0..needed {
262 let handle = ConnectionHandle::new();
263
264 let mut connections = self.connections.lock().await;
265
266 connections.insert(handle.id.clone(), handle);
267 }
268
269 dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
270 }
271 }
272
273 async fn check_connection_health(&self) -> Result<(), String> {
274 let mut connections = self.connections.lock().await;
275
276 let mut _health_checker = self.health_checker.lock().await;
277
278 let mut healthy_count = 0;
279
280 for (_id, handle) in connections.iter_mut() {
281 let is_healthy = _health_checker.check_connection_health(handle).await;
282
283 handle.update_health(is_healthy);
284
285 if handle.is_healthy() {
286 healthy_count += 1;
287 }
288 }
289
290 {
291 let mut stats = self.stats.write().await;
292
293 stats.healthy_connections = healthy_count;
294
295 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
296
297 if stats.total_operations > 0 {
298 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
299 }
300 }
301
302 Ok(())
303 }
304
305 pub async fn cleanup_stale_connections(&self) -> usize {
306 let mut connections = self.connections.lock().await;
307
308 let stale_ids:Vec<String> = connections
309 .iter()
310 .filter(|(_, handle)| {
311 handle.age().as_millis() > self.config.max_lifetime_ms as u128
312 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
313 || !handle.is_healthy()
314 })
315 .map(|(id, _)| id.clone())
316 .collect();
317
318 for id in &stale_ids {
319 connections.remove(id);
320 }
321
322 {
323 let mut stats = self.stats.write().await;
324
325 stats.total_connections = connections.len();
326
327 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
328 }
329
330 stale_ids.len()
331 }
332
333 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
334
335 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
336
337 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
338
339 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
340
341 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
342
343 pub fn high_performance_pool() -> Self {
344 Self::new(PoolConfig {
345 max_connections:50,
346 min_connections:10,
347 connection_timeout_ms:10000,
348 max_lifetime_ms:180000,
349 idle_timeout_ms:30000,
350 health_check_interval_ms:15000,
351 })
352 }
353
354 pub fn conservative_pool() -> Self {
355 Self::new(PoolConfig {
356 max_connections:5,
357 min_connections:1,
358 connection_timeout_ms:60000,
359 max_lifetime_ms:600000,
360 idle_timeout_ms:120000,
361 health_check_interval_ms:60000,
362 })
363 }
364
365 pub fn calculate_optimal_pool_size() -> usize {
366 let num_cpus = num_cpus::get();
367
368 (num_cpus * 2).max(4).min(50)
369 }
370}
371
372impl Clone for Struct {
373 fn clone(&self) -> Self {
374 Self {
375 config:self.config.clone(),
376
377 connections:self.connections.clone(),
378
379 semaphore:self.semaphore.clone(),
380
381 wait_queue:self.wait_queue.clone(),
382
383 stats:self.stats.clone(),
384
385 health_checker:self.health_checker.clone(),
386
387 is_running:self.is_running.clone(),
388 }
389 }
390}