Skip to main content

AirLibrary/Resilience/
mod.rs

1//! # Resilience Patterns Module
2//!
3//! Provides robust resilience patterns for external service calls:
4//! - Exponential backoff retry logic with jitter
5//! - Circuit breaker pattern for fault isolation
6//! - Bulkhead pattern for resource isolation
7//! - Timeout management with cascading deadlines
8//!
9//! ## Responsibilities
10//!
11//! ### Retry Patterns
12//! - Exponential backoff with jitter for distributed systems
13//! - Adaptive retry policies based on error classification
14//! - Retry budget management for service rate limiting
15//! - Panic recovery for background retry tasks
16//!
17//! ### Circuit Breaker
18//! - Automatic fault detection and isolation
19//! - State consistency validation across transitions
20//! - Event publishing for telemetry integration
21//! - Half-open state monitoring for recovery testing
22//!
23//! ### Bulkhead Pattern
24//! - Concurrent request limiting for resource protection
25//! - Queue management with overflow protection
26//! - Load monitoring and metrics collection
27//! - Timeout validation for all operations
28//!
29//! ### Timeout Management
30//! - Cascading deadline propagation
31//! - Global deadline coordination
32//! - Operation timeout enforcement
33//! - Panic-safe timeout cancellation
34//!
35//! ## Integration with Mountain
36//!
37//! Resilience patterns directly support Mountain's stability by:
38//! - preventing cascading failures through circuit breaker isolation
39//! - managing load through bulkhead resource limits
40//! - providing event publishing for Mountain's telemetry dashboard
41//! - enabling adaptive retry behavior for improved service availability
42//!
43//! ## VSCode Stability References
44//!
45//! Similar patterns used in VSCode for:
46//! - External service resilience (telemetry, updates, extensions)
47//! - Editor process isolation and recovery
48//! - Background task fault tolerance
49//!
50//! Reference:
51//! vs/base/common/errors
52//!
53//! # FUTURE Enhancements
54//!
55//! - [DISTRIBUTED TRACING] Integrate with Tracing module for retry/circuit span
56//! correlation
57//! - [CUSTOM METRICS] Add detailed bulkhead load metrics to Metrics module
58//! - [EVENT PUBLISHING] Extend circuit breaker events with OpenTelemetry
59//! support
60//! - [ADAPTIVE POLICIES] Enhance retry policies with machine learning-based
61//! error prediction
62//! - [METRICS INTEGRATION] Export resilience metrics to Mountain's telemetry UI
63//! ## Sensitive Data Handling
64//!
65//! This module does not process sensitive data directly but should:
66//! - Redact error messages before logging/event publishing
67//! - Avoid including request payloads in resilience events
68//! - Sanitize service names before publishing to telemetry
69
70pub mod Retry;
71
72pub use Retry::{ErrorClass, RetryEvent, RetryManager, RetryPolicy};
73
74pub mod Timeout;
75
76use std::{
77	collections::HashMap,
78	sync::Arc,
79	time::{Duration, Instant},
80};
81
82pub use Timeout::TimeoutManager;
83use tokio::sync::{RwLock, broadcast};
84use serde::{Deserialize, Serialize};
85
86use crate::dev_log;
87
88// Retry types (ErrorClass, RetryPolicy, RetryManager, RetryEvent) → Retry.rs
89
90/// Circuit breaker states
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92pub enum CircuitState {
93	/// Circuit is closed (normal operation)
94	Closed,
95
96	/// Circuit is open (failing fast)
97	Open,
98
99	/// Circuit is half-open (testing recovery)
100	HalfOpen,
101}
102
103/// Circuit breaker configuration
104#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
105pub struct CircuitBreakerConfig {
106	/// Failure threshold before tripping
107	pub FailureThreshold:u32,
108
109	/// Success threshold before closing
110	pub SuccessThreshold:u32,
111
112	/// Timeout before attempting recovery (in seconds)
113	pub TimeoutSecs:u64,
114}
115
116impl Default for CircuitBreakerConfig {
117	fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
118}
119
120/// Circuit breaker events for metrics and telemetry integration
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct CircuitEvent {
123	pub name:String,
124
125	pub FromState:CircuitState,
126
127	pub ToState:CircuitState,
128
129	pub timestamp:u64,
130
131	pub reason:String,
132}
133
134/// Circuit breaker for fault isolation with state consistency validation and
135/// event publishing
136#[derive(Debug)]
137pub struct CircuitBreaker {
138	Name:String,
139
140	State:Arc<RwLock<CircuitState>>,
141
142	Config:CircuitBreakerConfig,
143
144	FailureCount:Arc<RwLock<u32>>,
145
146	SuccessCount:Arc<RwLock<u32>>,
147
148	LastFailureTime:Arc<RwLock<Option<Instant>>>,
149
150	EventTx:Arc<broadcast::Sender<CircuitEvent>>,
151
152	StateTransitionCounter:Arc<RwLock<u32>>,
153}
154
155impl CircuitBreaker {
156	/// Create a new circuit breaker with event publishing
157	pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
158		let (EventTx, _) = broadcast::channel(1000);
159
160		Self {
161			Name:name.clone(),
162
163			State:Arc::new(RwLock::new(CircuitState::Closed)),
164
165			Config,
166
167			FailureCount:Arc::new(RwLock::new(0)),
168
169			SuccessCount:Arc::new(RwLock::new(0)),
170
171			LastFailureTime:Arc::new(RwLock::new(None)),
172
173			EventTx:Arc::new(EventTx),
174
175			StateTransitionCounter:Arc::new(RwLock::new(0)),
176		}
177	}
178
179	/// Get the circuit breaker event transmitter for subscription
180	pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
181
182	/// Get current state with panic recovery
183	pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
184
185	/// Validate state consistency across all counters
186	pub async fn ValidateState(&self) -> Result<(), String> {
187		let state = *self.State.read().await;
188
189		let failures = *self.FailureCount.read().await;
190
191		let successes = *self.SuccessCount.read().await;
192
193		match state {
194			CircuitState::Closed => {
195				if successes != 0 {
196					return Err(format!("Inconsistent state: Closed but has {} successes", successes));
197				}
198
199				if failures >= self.Config.FailureThreshold {
200					dev_log!(
201						"resilience",
202						"warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
203						failures,
204						self.Config.FailureThreshold
205					);
206				}
207			},
208
209			CircuitState::Open => {
210				if failures < self.Config.FailureThreshold {
211					dev_log!(
212						"resilience",
213						"warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
214						failures,
215						self.Config.FailureThreshold
216					);
217				}
218			},
219
220			CircuitState::HalfOpen => {
221				if successes >= self.Config.SuccessThreshold {
222					return Err(format!(
223						"Inconsistent state: HalfOpen but has {} successes (should be Closed)",
224						successes
225					));
226				}
227			},
228		}
229
230		Ok(())
231	}
232
233	/// Transition state with validation and event publishing
234	async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
235		let CurrentState = self.GetState().await;
236
237		if CurrentState == NewState {
238			// No transition needed
239			return Ok(());
240		}
241
242		// Validate the proposed transition
243		match (CurrentState, NewState) {
244			(CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
245
246				// Valid transitions
247			},
248
249			(CircuitState::Open, CircuitState::HalfOpen) => {
250
251				// Valid transition through recovery
252			},
253
254			(CircuitState::HalfOpen, CircuitState::Closed) => {
255
256				// Valid recovery transition
257			},
258
259			_ => {
260				return Err(format!(
261					"Invalid state transition from {:?} to {:?} for {}",
262					CurrentState, NewState, self.Name
263				));
264			},
265		}
266
267		// Publish state transition event
268		let event = CircuitEvent {
269			name:self.Name.clone(),
270
271			FromState:CurrentState,
272
273			ToState:NewState,
274
275			timestamp:crate::Utility::CurrentTimestamp(),
276
277			reason:reason.to_string(),
278		};
279
280		let _ = self.EventTx.send(event);
281
282		// Transition state
283		*self.State.write().await = NewState;
284
285		// Increment transition counter
286		*self.StateTransitionCounter.write().await += 1;
287
288		dev_log!(
289			"resilience",
290			"[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
291			self.Name,
292			CurrentState,
293			NewState,
294			reason
295		);
296
297		// Validate new state consistency
298		self.ValidateState().await.map_err(|e| {
299			dev_log!(
300				"resilience",
301				"error: [CircuitBreaker] State validation failed after transition: {}",
302				e
303			);
304			e
305		})?;
306
307		Ok(())
308	}
309
310	/// Record a successful call with panic recovery
311	pub async fn RecordSuccess(&self) {
312		let state = self.GetState().await;
313
314		match state {
315			CircuitState::Closed => {
316				// Reset counters
317				*self.FailureCount.write().await = 0;
318			},
319
320			CircuitState::HalfOpen => {
321				// Increment success count
322				let mut SuccessCount = self.SuccessCount.write().await;
323
324				*SuccessCount += 1;
325
326				if *SuccessCount >= self.Config.SuccessThreshold {
327					// Close the circuit
328					let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
329
330					*self.FailureCount.write().await = 0;
331
332					*self.SuccessCount.write().await = 0;
333				}
334			},
335
336			_ => {},
337		}
338	}
339
340	/// Record a failed call with panic recovery
341	pub async fn RecordFailure(&self) {
342		let State = self.GetState().await;
343
344		*self.LastFailureTime.write().await = Some(Instant::now());
345
346		match State {
347			CircuitState::Closed => {
348				// Increment failure count
349				let mut FailureCount = self.FailureCount.write().await;
350
351				*FailureCount += 1;
352
353				if *FailureCount >= self.Config.FailureThreshold {
354					// Open the circuit
355					let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
356
357					*self.SuccessCount.write().await = 0;
358				}
359			},
360
361			CircuitState::HalfOpen => {
362				// Return to open state
363				let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
364
365				*self.SuccessCount.write().await = 0;
366			},
367
368			_ => {},
369		}
370	}
371
372	/// Attempt to transition to half-open if timeout has elapsed with panic
373	/// recovery
374	pub async fn AttemptRecovery(&self) -> bool {
375		let state = self.GetState().await;
376
377		if state != CircuitState::Open {
378			return state == CircuitState::HalfOpen;
379		}
380
381		if let Some(last_failure) = *self.LastFailureTime.read().await {
382			if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
383				let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
384
385				*self.SuccessCount.write().await = 0;
386
387				return true;
388			}
389		}
390
391		false
392	}
393
394	/// Get circuit breaker statistics for metrics
395	pub async fn GetStatistics(&self) -> CircuitStatistics {
396		CircuitStatistics {
397			Name:self.Name.clone(),
398
399			State:self.GetState().await,
400
401			Failures:*self.FailureCount.read().await,
402
403			Successes:*self.SuccessCount.read().await,
404
405			StateTransitions:*self.StateTransitionCounter.read().await,
406
407			LastFailureTime:*self.LastFailureTime.read().await,
408		}
409	}
410
411	/// Validate circuit breaker configuration
412	pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
413		if config.FailureThreshold == 0 {
414			return Err("FailureThreshold must be greater than 0".to_string());
415		}
416
417		if config.SuccessThreshold == 0 {
418			return Err("SuccessThreshold must be greater than 0".to_string());
419		}
420
421		if config.TimeoutSecs == 0 {
422			return Err("TimeoutSecs must be greater than 0".to_string());
423		}
424
425		Ok(())
426	}
427}
428
429/// Circuit breaker statistics for metrics export
430#[derive(Debug, Clone, Serialize)]
431pub struct CircuitStatistics {
432	pub Name:String,
433
434	pub State:CircuitState,
435
436	pub Failures:u32,
437
438	pub Successes:u32,
439
440	pub StateTransitions:u32,
441
442	#[serde(skip_serializing)]
443	pub LastFailureTime:Option<Instant>,
444}
445
446impl<'de> Deserialize<'de> for CircuitStatistics {
447	fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
448	where
449		D: serde::Deserializer<'de>, {
450		use serde::de::{self, Visitor};
451
452		struct CircuitStatisticsVisitor;
453
454		impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
455			type Value = CircuitStatistics;
456
457			fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
458				formatter.write_str("struct CircuitStatistics")
459			}
460
461			fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
462			where
463				A: de::MapAccess<'de>, {
464				let mut Name = None;
465
466				let mut State = None;
467
468				let mut Failures = None;
469
470				let mut Successes = None;
471
472				let mut StateTransitions = None;
473
474				while let Some(key) = map.next_key::<String>()? {
475					match key.as_str() {
476						"name" => Name = Some(map.next_value()?),
477
478						"state" => State = Some(map.next_value()?),
479
480						"failures" => Failures = Some(map.next_value()?),
481
482						"successes" => Successes = Some(map.next_value()?),
483
484						"state_transitions" => StateTransitions = Some(map.next_value()?),
485
486						_ => {
487							map.next_value::<de::IgnoredAny>()?;
488						},
489					}
490				}
491
492				Ok(CircuitStatistics {
493					Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
494
495					State:State.ok_or_else(|| de::Error::missing_field("state"))?,
496
497					Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
498
499					Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
500
501					StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
502
503					LastFailureTime:None,
504				})
505			}
506		}
507
508		const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
509
510		Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
511	}
512}
513
514impl Clone for CircuitBreaker {
515	fn clone(&self) -> Self {
516		Self {
517			Name:self.Name.clone(),
518
519			State:self.State.clone(),
520
521			Config:self.Config.clone(),
522
523			FailureCount:self.FailureCount.clone(),
524
525			SuccessCount:self.SuccessCount.clone(),
526
527			LastFailureTime:self.LastFailureTime.clone(),
528
529			EventTx:self.EventTx.clone(),
530
531			StateTransitionCounter:self.StateTransitionCounter.clone(),
532		}
533	}
534}
535
536/// Bulkhead configuration
537#[derive(Debug, Clone, Serialize, Deserialize)]
538pub struct BulkheadConfig {
539	/// Maximum concurrent requests
540	pub max_concurrent:usize,
541
542	/// Maximum queue size
543	pub max_queue:usize,
544
545	/// Request timeout (in seconds)
546	pub timeout_secs:u64,
547}
548
549impl Default for BulkheadConfig {
550	fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
551}
552
553/// Bulkhead statistics for metrics export
554#[derive(Debug, Clone, Serialize, Deserialize)]
555pub struct BulkheadStatistics {
556	pub name:String,
557
558	pub current_concurrent:u32,
559
560	pub current_queue:u32,
561
562	pub max_concurrent:usize,
563
564	pub max_queue:usize,
565
566	pub total_rejected:u64,
567
568	pub total_completed:u64,
569
570	pub total_timed_out:u64,
571}
572
573/// Bulkhead semaphore for resource isolation with metrics and panic recovery
574#[derive(Debug)]
575pub struct BulkheadExecutor {
576	name:String,
577
578	semaphore:Arc<tokio::sync::Semaphore>,
579
580	config:BulkheadConfig,
581
582	current_requests:Arc<RwLock<u32>>,
583
584	queue_size:Arc<RwLock<u32>>,
585
586	total_rejected:Arc<RwLock<u64>>,
587
588	total_completed:Arc<RwLock<u64>>,
589
590	total_timed_out:Arc<RwLock<u64>>,
591}
592
593impl BulkheadExecutor {
594	/// Create a new bulkhead executor with metrics tracking
595	pub fn new(name:String, config:BulkheadConfig) -> Self {
596		Self {
597			name:name.clone(),
598
599			semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
600
601			config,
602
603			current_requests:Arc::new(RwLock::new(0)),
604
605			queue_size:Arc::new(RwLock::new(0)),
606
607			total_rejected:Arc::new(RwLock::new(0)),
608
609			total_completed:Arc::new(RwLock::new(0)),
610
611			total_timed_out:Arc::new(RwLock::new(0)),
612		}
613	}
614
615	/// Validate bulkhead configuration
616	pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
617		if config.max_concurrent == 0 {
618			return Err("max_concurrent must be greater than 0".to_string());
619		}
620
621		if config.max_queue == 0 {
622			return Err("max_queue must be greater than 0".to_string());
623		}
624
625		if config.timeout_secs == 0 {
626			return Err("timeout_secs must be greater than 0".to_string());
627		}
628
629		Ok(())
630	}
631
632	/// Execute with bulkhead protection and panic recovery
633	pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
634	where
635		F: std::future::Future<Output = Result<R, String>>, {
636		async {
637			// Validate timeout
638			if self.config.timeout_secs == 0 {
639				return Err("Bulkhead timeout must be greater than 0".to_string());
640			}
641
642			// Check queue size
643			let queue = *self.queue_size.read().await;
644
645			if queue >= self.config.max_queue as u32 {
646				*self.total_rejected.write().await += 1;
647
648				dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
649
650				return Err("Bulkhead queue full".to_string());
651			}
652
653			// Increment queue size
654			*self.queue_size.write().await += 1;
655
656			// Acquire permit with timeout
657			let _Permit =
658				match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
659					.await
660				{
661					Ok(Ok(_)) => {
662						// Permit acquired, proceed with execution
663						// Decrement queue size
664						*self.queue_size.write().await -= 1;
665					},
666
667					Ok(Err(e)) => {
668						*self.queue_size.write().await -= 1;
669
670						return Err(format!("Bulkhead semaphore error: {}", e));
671					},
672
673					Err(_) => {
674						*self.queue_size.write().await -= 1;
675
676						*self.total_timed_out.write().await += 1;
677
678						dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
679
680						return Err("Bulkhead timeout waiting for permit".to_string());
681					},
682				};
683
684			// Decrement queue size, increment current requests
685			*self.queue_size.write().await -= 1;
686
687			*self.current_requests.write().await += 1;
688
689			// Execute with timeout (no catch_unwind to avoid interior mutability issues)
690			let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
691
692			let execution_result:Result<R, String> = match execution_result {
693				Ok(Ok(value)) => Ok(value),
694
695				Ok(Err(e)) => Err(e),
696
697				Err(_) => {
698					*self.total_timed_out.write().await += 1;
699
700					Err("Bulkhead execution timeout".to_string())
701				},
702			};
703
704			if execution_result.is_ok() {
705				*self.total_completed.write().await += 1;
706			}
707
708			execution_result
709		}
710		.await
711	}
712
713	/// Get current load with panic recovery
714	pub async fn GetLoad(&self) -> (u32, u32) {
715		async {
716			let current = *self.current_requests.read().await;
717
718			let queue = *self.queue_size.read().await;
719
720			(current, queue)
721		}
722		.await
723	}
724
725	/// Get bulkhead statistics for metrics
726	pub async fn GetStatistics(&self) -> BulkheadStatistics {
727		async {
728			BulkheadStatistics {
729				name:self.name.clone(),
730
731				current_concurrent:*self.current_requests.read().await,
732
733				current_queue:*self.queue_size.read().await,
734
735				max_concurrent:self.config.max_concurrent,
736
737				max_queue:self.config.max_queue,
738
739				total_rejected:*self.total_rejected.read().await,
740
741				total_completed:*self.total_completed.read().await,
742
743				total_timed_out:*self.total_timed_out.read().await,
744			}
745		}
746		.await
747	}
748
749	/// Calculate utilization percentage
750	pub async fn GetUtilization(&self) -> f64 {
751		let (current, _) = self.GetLoad().await;
752
753		if self.config.max_concurrent == 0 {
754			return 0.0;
755		}
756
757		(current as f64 / self.config.max_concurrent as f64) * 100.0
758	}
759}
760
761impl Clone for BulkheadExecutor {
762	fn clone(&self) -> Self {
763		Self {
764			name:self.name.clone(),
765
766			semaphore:self.semaphore.clone(),
767
768			config:self.config.clone(),
769
770			current_requests:self.current_requests.clone(),
771
772			queue_size:self.queue_size.clone(),
773
774			total_rejected:self.total_rejected.clone(),
775
776			total_completed:self.total_completed.clone(),
777
778			total_timed_out:self.total_timed_out.clone(),
779		}
780	}
781}
782
783/// Resilience orchestrator combining all patterns
784#[derive(Debug)]
785pub struct ResilienceOrchestrator {
786	retry_manager:Arc<RetryManager>,
787
788	circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
789
790	bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
791}
792
793impl ResilienceOrchestrator {
794	/// Create a new resilience orchestrator
795	pub fn new(retry_policy:RetryPolicy) -> Self {
796		Self {
797			retry_manager:Arc::new(RetryManager::new(retry_policy)),
798
799			circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
800
801			bulkheads:Arc::new(RwLock::new(HashMap::new())),
802		}
803	}
804
805	/// Get or create circuit breaker with configuration validation
806	pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
807		let mut breakers = self.circuit_breakers.write().await;
808
809		Arc::new(
810			breakers
811				.entry(service.to_string())
812				.or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
813				.clone(),
814		)
815	}
816
817	/// Get or create bulkhead with configuration validation
818	pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
819		let mut bulkheads = self.bulkheads.write().await;
820
821		Arc::new(
822			bulkheads
823				.entry(service.to_string())
824				.or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
825				.clone(),
826		)
827	}
828
829	/// Get all circuit breaker statistics
830	pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
831		let breakers = self.circuit_breakers.read().await;
832
833		let mut stats = Vec::new();
834
835		for breaker in breakers.values() {
836			stats.push(breaker.GetStatistics().await);
837		}
838
839		stats
840	}
841
842	/// Get all bulkhead statistics
843	pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
844		let bulkheads = self.bulkheads.read().await;
845
846		let mut stats = Vec::new();
847
848		for bulkhead in bulkheads.values() {
849			stats.push(bulkhead.GetStatistics().await);
850		}
851
852		stats
853	}
854
855	/// Execute with full resilience and event publishing
856	pub async fn ExecuteResilient<F, R>(
857		&self,
858
859		service:&str,
860
861		retry_policy:&RetryPolicy,
862
863		circuit_config:CircuitBreakerConfig,
864
865		bulkhead_config:BulkheadConfig,
866
867		f:F,
868	) -> Result<R, String>
869	where
870		F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
871		// Validate configurations
872		if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
873			return Err(format!("Invalid circuit breaker config: {}", e));
874		}
875
876		if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
877			return Err(format!("Invalid bulkhead config: {}", e));
878		}
879
880		let breaker = self.GetCircuitBreaker(service, circuit_config).await;
881
882		let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
883
884		// Check circuit state
885		if breaker.GetState().await == CircuitState::Open {
886			if !breaker.AttemptRecovery().await {
887				return Err("Circuit breaker is open".to_string());
888			}
889		}
890
891		// Execute with bulkhead protection and retry logic
892		let mut Attempt = 0;
893
894		let _LastError = "".to_string();
895
896		loop {
897			let result = bulkhead.Execute(f()).await;
898
899			match result {
900				Ok(Value) => {
901					breaker.RecordSuccess().await;
902
903					// Publish retry success event
904					let Event = RetryEvent {
905						Service:service.to_string(),
906
907						Attempt,
908
909						ErrorClass:ErrorClass::Unknown,
910
911						DelayMs:0,
912
913						Success:true,
914
915						ErrorMessage:None,
916					};
917
918					self.retry_manager.PublishRetryEvent(Event);
919
920					return Ok(Value);
921				},
922
923				Err(E) => {
924					let ErrorClass = self.retry_manager.ClassifyError(&E);
925
926					breaker.RecordFailure().await;
927
928					// Publish retry failure event
929					let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
930
931					let Event = RetryEvent {
932						Service:service.to_string(),
933
934						Attempt,
935
936						ErrorClass,
937
938						DelayMs:Delay.as_millis() as u64,
939
940						Success:false,
941
942						ErrorMessage:Some(self.redact_sensitive_data(&E)),
943					};
944
945					self.retry_manager.PublishRetryEvent(Event);
946
947					if Attempt < retry_policy.MaxRetries
948						&& ErrorClass != ErrorClass::NonRetryable
949						&& self.retry_manager.CanRetry(service).await
950					{
951						let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
952
953						dev_log!(
954							"resilience",
955							"[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
956							service,
957							Attempt + 1,
958							retry_policy.MaxRetries,
959							Delay,
960							self.redact_sensitive_data(&E)
961						);
962
963						tokio::time::sleep(Delay).await;
964
965						Attempt += 1;
966					} else {
967						return Err(E);
968					}
969				},
970			}
971		}
972	}
973
974	/// Redact sensitive data from error messages before logging/event
975	/// publishing
976	fn redact_sensitive_data(&self, message:&str) -> String {
977		let mut redacted = message.to_string();
978
979		// Redact common patterns - simplified to avoid escaping issues
980		let patterns = vec![
981			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
982			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
983			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
984			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
985			(
986				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
987				"Authorization: Bearer [REDACTED]",
988			),
989			(r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
990			(r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
991		];
992
993		for (pattern, replacement) in patterns {
994			if let Ok(re) = regex::Regex::new(pattern) {
995				redacted = re.replace_all(&redacted, replacement).to_string();
996			}
997		}
998
999		redacted
1000	}
1001
1002	/// Validate all configurations
1003	pub fn ValidateConfigurations(
1004		&self,
1005
1006		_RetryPolicy:&RetryPolicy,
1007
1008		CircuitConfig:&CircuitBreakerConfig,
1009
1010		BulkheadConfig:&BulkheadConfig,
1011	) -> Result<(), String> {
1012		self.retry_manager.ValidatePolicy()?;
1013
1014		CircuitBreaker::ValidateConfig(CircuitConfig)?;
1015
1016		BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1017
1018		TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1019
1020		Ok(())
1021	}
1022}
1023
1024impl Clone for ResilienceOrchestrator {
1025	fn clone(&self) -> Self {
1026		Self {
1027			retry_manager:self.retry_manager.clone(),
1028
1029			circuit_breakers:self.circuit_breakers.clone(),
1030
1031			bulkheads:self.bulkheads.clone(),
1032		}
1033	}
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038
1039	use super::*;
1040
1041	#[test]
1042	fn test_retry_delay_calculation() {
1043		let policy = RetryPolicy::default();
1044
1045		let manager = RetryManager::new(policy);
1046
1047		let delay_1 = manager.CalculateRetryDelay(1);
1048
1049		let delay_2 = manager.CalculateRetryDelay(2);
1050
1051		// delay_2 should be roughly double delay_1 (with some jitter)
1052		assert!(delay_2 >= delay_1);
1053	}
1054
1055	#[test]
1056	fn test_adaptive_retry_delay() {
1057		let policy = RetryPolicy::default();
1058
1059		let manager = RetryManager::new(policy);
1060
1061		// Rate limited errors should have longer delays
1062		let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1063
1064		let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1065
1066		assert!(rate_limit_delay >= transient_delay);
1067	}
1068
1069	#[test]
1070	fn test_error_classification() {
1071		let policy = RetryPolicy::default();
1072
1073		let manager = RetryManager::new(policy);
1074
1075		assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1076
1077		assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1078
1079		assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1080
1081		assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1082	}
1083
1084	#[test]
1085	fn test_policy_validation() {
1086		let policy = RetryPolicy::default();
1087
1088		let manager = RetryManager::new(policy);
1089
1090		assert!(manager.ValidatePolicy().is_ok());
1091
1092		let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1093
1094		let invalid_manager = RetryManager::new(invalid_policy);
1095
1096		assert!(invalid_manager.ValidatePolicy().is_err());
1097	}
1098
1099	#[tokio::test]
1100	async fn test_circuit_breaker_state_transitions() {
1101		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1102
1103		let breaker = CircuitBreaker::new("test".to_string(), config);
1104
1105		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1106
1107		breaker.RecordFailure().await;
1108
1109		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1110
1111		breaker.RecordFailure().await;
1112
1113		assert_eq!(breaker.GetState().await, CircuitState::Open);
1114
1115		assert!(breaker.AttemptRecovery().await);
1116
1117		assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1118
1119		breaker.RecordSuccess().await;
1120
1121		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1122	}
1123
1124	#[tokio::test]
1125	async fn test_circuit_breaker_validation() {
1126		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1127
1128		let breaker = CircuitBreaker::new("test".to_string(), config);
1129
1130		// Validate initial state
1131		assert!(breaker.ValidateState().await.is_ok());
1132
1133		// Trigger state transition to open
1134		breaker.RecordFailure().await;
1135
1136		breaker.RecordFailure().await;
1137
1138		let validate_result = breaker.ValidateState().await;
1139
1140		// May be valid due to timeout behavior
1141		assert!(validate_result.is_ok() || validate_result.is_err());
1142	}
1143
1144	#[test]
1145	fn test_circuit_breaker_config_validation() {
1146		let valid_config = CircuitBreakerConfig::default();
1147
1148		assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1149
1150		let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1151
1152		assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1153	}
1154
1155	#[tokio::test]
1156	async fn test_bulkhead_resource_isolation() {
1157		let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1158
1159		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1160
1161		let (_current, _queue) = bulkhead.GetLoad().await;
1162
1163		assert_eq!(_current, 0);
1164
1165		assert_eq!(_queue, 0);
1166
1167		let stats = bulkhead.GetStatistics().await;
1168
1169		assert_eq!(stats.current_concurrent, 0);
1170
1171		assert_eq!(stats.current_queue, 0);
1172
1173		assert_eq!(stats.max_concurrent, 2);
1174
1175		assert_eq!(stats.max_queue, 5);
1176	}
1177
1178	#[tokio::test]
1179	async fn test_bulkhead_utilization() {
1180		let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1181
1182		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1183
1184		let utilization = bulkhead.GetUtilization().await;
1185
1186		assert_eq!(utilization, 0.0);
1187	}
1188
1189	#[test]
1190	fn test_bulkhead_config_validation() {
1191		let valid_config = BulkheadConfig::default();
1192
1193		assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1194
1195		let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1196
1197		assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1198	}
1199
1200	#[test]
1201	fn test_timeout_manager() {
1202		let manager = TimeoutManager::new(Duration::from_secs(30));
1203
1204		assert!(!manager.IsExceeded());
1205
1206		assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1207
1208		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1209
1210		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1211	}
1212
1213	#[test]
1214	fn test_timeout_manager_with_deadline() {
1215		let deadline = Instant::now() + Duration::from_secs(60);
1216
1217		let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1218
1219		let remaining = manager.Remaining();
1220
1221		assert!(remaining.is_some());
1222
1223		assert!(remaining.unwrap() <= Duration::from_secs(60));
1224	}
1225}