1pub mod Retry;
71
72pub use Retry::{ErrorClass, RetryEvent, RetryManager, RetryPolicy};
73
74pub mod Timeout;
75
76use std::{
77 collections::HashMap,
78 sync::Arc,
79 time::{Duration, Instant},
80};
81
82pub use Timeout::TimeoutManager;
83use tokio::sync::{RwLock, broadcast};
84use serde::{Deserialize, Serialize};
85
86use crate::dev_log;
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92pub enum CircuitState {
93 Closed,
95
96 Open,
98
99 HalfOpen,
101}
102
103#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
105pub struct CircuitBreakerConfig {
106 pub FailureThreshold:u32,
108
109 pub SuccessThreshold:u32,
111
112 pub TimeoutSecs:u64,
114}
115
116impl Default for CircuitBreakerConfig {
117 fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct CircuitEvent {
123 pub name:String,
124
125 pub FromState:CircuitState,
126
127 pub ToState:CircuitState,
128
129 pub timestamp:u64,
130
131 pub reason:String,
132}
133
134#[derive(Debug)]
137pub struct CircuitBreaker {
138 Name:String,
139
140 State:Arc<RwLock<CircuitState>>,
141
142 Config:CircuitBreakerConfig,
143
144 FailureCount:Arc<RwLock<u32>>,
145
146 SuccessCount:Arc<RwLock<u32>>,
147
148 LastFailureTime:Arc<RwLock<Option<Instant>>>,
149
150 EventTx:Arc<broadcast::Sender<CircuitEvent>>,
151
152 StateTransitionCounter:Arc<RwLock<u32>>,
153}
154
155impl CircuitBreaker {
156 pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
158 let (EventTx, _) = broadcast::channel(1000);
159
160 Self {
161 Name:name.clone(),
162
163 State:Arc::new(RwLock::new(CircuitState::Closed)),
164
165 Config,
166
167 FailureCount:Arc::new(RwLock::new(0)),
168
169 SuccessCount:Arc::new(RwLock::new(0)),
170
171 LastFailureTime:Arc::new(RwLock::new(None)),
172
173 EventTx:Arc::new(EventTx),
174
175 StateTransitionCounter:Arc::new(RwLock::new(0)),
176 }
177 }
178
179 pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
181
182 pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
184
185 pub async fn ValidateState(&self) -> Result<(), String> {
187 let state = *self.State.read().await;
188
189 let failures = *self.FailureCount.read().await;
190
191 let successes = *self.SuccessCount.read().await;
192
193 match state {
194 CircuitState::Closed => {
195 if successes != 0 {
196 return Err(format!("Inconsistent state: Closed but has {} successes", successes));
197 }
198
199 if failures >= self.Config.FailureThreshold {
200 dev_log!(
201 "resilience",
202 "warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
203 failures,
204 self.Config.FailureThreshold
205 );
206 }
207 },
208
209 CircuitState::Open => {
210 if failures < self.Config.FailureThreshold {
211 dev_log!(
212 "resilience",
213 "warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
214 failures,
215 self.Config.FailureThreshold
216 );
217 }
218 },
219
220 CircuitState::HalfOpen => {
221 if successes >= self.Config.SuccessThreshold {
222 return Err(format!(
223 "Inconsistent state: HalfOpen but has {} successes (should be Closed)",
224 successes
225 ));
226 }
227 },
228 }
229
230 Ok(())
231 }
232
233 async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
235 let CurrentState = self.GetState().await;
236
237 if CurrentState == NewState {
238 return Ok(());
240 }
241
242 match (CurrentState, NewState) {
244 (CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
245
246 },
248
249 (CircuitState::Open, CircuitState::HalfOpen) => {
250
251 },
253
254 (CircuitState::HalfOpen, CircuitState::Closed) => {
255
256 },
258
259 _ => {
260 return Err(format!(
261 "Invalid state transition from {:?} to {:?} for {}",
262 CurrentState, NewState, self.Name
263 ));
264 },
265 }
266
267 let event = CircuitEvent {
269 name:self.Name.clone(),
270
271 FromState:CurrentState,
272
273 ToState:NewState,
274
275 timestamp:crate::Utility::CurrentTimestamp(),
276
277 reason:reason.to_string(),
278 };
279
280 let _ = self.EventTx.send(event);
281
282 *self.State.write().await = NewState;
284
285 *self.StateTransitionCounter.write().await += 1;
287
288 dev_log!(
289 "resilience",
290 "[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
291 self.Name,
292 CurrentState,
293 NewState,
294 reason
295 );
296
297 self.ValidateState().await.map_err(|e| {
299 dev_log!(
300 "resilience",
301 "error: [CircuitBreaker] State validation failed after transition: {}",
302 e
303 );
304 e
305 })?;
306
307 Ok(())
308 }
309
310 pub async fn RecordSuccess(&self) {
312 let state = self.GetState().await;
313
314 match state {
315 CircuitState::Closed => {
316 *self.FailureCount.write().await = 0;
318 },
319
320 CircuitState::HalfOpen => {
321 let mut SuccessCount = self.SuccessCount.write().await;
323
324 *SuccessCount += 1;
325
326 if *SuccessCount >= self.Config.SuccessThreshold {
327 let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
329
330 *self.FailureCount.write().await = 0;
331
332 *self.SuccessCount.write().await = 0;
333 }
334 },
335
336 _ => {},
337 }
338 }
339
340 pub async fn RecordFailure(&self) {
342 let State = self.GetState().await;
343
344 *self.LastFailureTime.write().await = Some(Instant::now());
345
346 match State {
347 CircuitState::Closed => {
348 let mut FailureCount = self.FailureCount.write().await;
350
351 *FailureCount += 1;
352
353 if *FailureCount >= self.Config.FailureThreshold {
354 let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
356
357 *self.SuccessCount.write().await = 0;
358 }
359 },
360
361 CircuitState::HalfOpen => {
362 let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
364
365 *self.SuccessCount.write().await = 0;
366 },
367
368 _ => {},
369 }
370 }
371
372 pub async fn AttemptRecovery(&self) -> bool {
375 let state = self.GetState().await;
376
377 if state != CircuitState::Open {
378 return state == CircuitState::HalfOpen;
379 }
380
381 if let Some(last_failure) = *self.LastFailureTime.read().await {
382 if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
383 let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
384
385 *self.SuccessCount.write().await = 0;
386
387 return true;
388 }
389 }
390
391 false
392 }
393
394 pub async fn GetStatistics(&self) -> CircuitStatistics {
396 CircuitStatistics {
397 Name:self.Name.clone(),
398
399 State:self.GetState().await,
400
401 Failures:*self.FailureCount.read().await,
402
403 Successes:*self.SuccessCount.read().await,
404
405 StateTransitions:*self.StateTransitionCounter.read().await,
406
407 LastFailureTime:*self.LastFailureTime.read().await,
408 }
409 }
410
411 pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
413 if config.FailureThreshold == 0 {
414 return Err("FailureThreshold must be greater than 0".to_string());
415 }
416
417 if config.SuccessThreshold == 0 {
418 return Err("SuccessThreshold must be greater than 0".to_string());
419 }
420
421 if config.TimeoutSecs == 0 {
422 return Err("TimeoutSecs must be greater than 0".to_string());
423 }
424
425 Ok(())
426 }
427}
428
429#[derive(Debug, Clone, Serialize)]
431pub struct CircuitStatistics {
432 pub Name:String,
433
434 pub State:CircuitState,
435
436 pub Failures:u32,
437
438 pub Successes:u32,
439
440 pub StateTransitions:u32,
441
442 #[serde(skip_serializing)]
443 pub LastFailureTime:Option<Instant>,
444}
445
446impl<'de> Deserialize<'de> for CircuitStatistics {
447 fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
448 where
449 D: serde::Deserializer<'de>, {
450 use serde::de::{self, Visitor};
451
452 struct CircuitStatisticsVisitor;
453
454 impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
455 type Value = CircuitStatistics;
456
457 fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
458 formatter.write_str("struct CircuitStatistics")
459 }
460
461 fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
462 where
463 A: de::MapAccess<'de>, {
464 let mut Name = None;
465
466 let mut State = None;
467
468 let mut Failures = None;
469
470 let mut Successes = None;
471
472 let mut StateTransitions = None;
473
474 while let Some(key) = map.next_key::<String>()? {
475 match key.as_str() {
476 "name" => Name = Some(map.next_value()?),
477
478 "state" => State = Some(map.next_value()?),
479
480 "failures" => Failures = Some(map.next_value()?),
481
482 "successes" => Successes = Some(map.next_value()?),
483
484 "state_transitions" => StateTransitions = Some(map.next_value()?),
485
486 _ => {
487 map.next_value::<de::IgnoredAny>()?;
488 },
489 }
490 }
491
492 Ok(CircuitStatistics {
493 Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
494
495 State:State.ok_or_else(|| de::Error::missing_field("state"))?,
496
497 Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
498
499 Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
500
501 StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
502
503 LastFailureTime:None,
504 })
505 }
506 }
507
508 const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
509
510 Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
511 }
512}
513
514impl Clone for CircuitBreaker {
515 fn clone(&self) -> Self {
516 Self {
517 Name:self.Name.clone(),
518
519 State:self.State.clone(),
520
521 Config:self.Config.clone(),
522
523 FailureCount:self.FailureCount.clone(),
524
525 SuccessCount:self.SuccessCount.clone(),
526
527 LastFailureTime:self.LastFailureTime.clone(),
528
529 EventTx:self.EventTx.clone(),
530
531 StateTransitionCounter:self.StateTransitionCounter.clone(),
532 }
533 }
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
538pub struct BulkheadConfig {
539 pub max_concurrent:usize,
541
542 pub max_queue:usize,
544
545 pub timeout_secs:u64,
547}
548
549impl Default for BulkheadConfig {
550 fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
551}
552
553#[derive(Debug, Clone, Serialize, Deserialize)]
555pub struct BulkheadStatistics {
556 pub name:String,
557
558 pub current_concurrent:u32,
559
560 pub current_queue:u32,
561
562 pub max_concurrent:usize,
563
564 pub max_queue:usize,
565
566 pub total_rejected:u64,
567
568 pub total_completed:u64,
569
570 pub total_timed_out:u64,
571}
572
573#[derive(Debug)]
575pub struct BulkheadExecutor {
576 name:String,
577
578 semaphore:Arc<tokio::sync::Semaphore>,
579
580 config:BulkheadConfig,
581
582 current_requests:Arc<RwLock<u32>>,
583
584 queue_size:Arc<RwLock<u32>>,
585
586 total_rejected:Arc<RwLock<u64>>,
587
588 total_completed:Arc<RwLock<u64>>,
589
590 total_timed_out:Arc<RwLock<u64>>,
591}
592
593impl BulkheadExecutor {
594 pub fn new(name:String, config:BulkheadConfig) -> Self {
596 Self {
597 name:name.clone(),
598
599 semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
600
601 config,
602
603 current_requests:Arc::new(RwLock::new(0)),
604
605 queue_size:Arc::new(RwLock::new(0)),
606
607 total_rejected:Arc::new(RwLock::new(0)),
608
609 total_completed:Arc::new(RwLock::new(0)),
610
611 total_timed_out:Arc::new(RwLock::new(0)),
612 }
613 }
614
615 pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
617 if config.max_concurrent == 0 {
618 return Err("max_concurrent must be greater than 0".to_string());
619 }
620
621 if config.max_queue == 0 {
622 return Err("max_queue must be greater than 0".to_string());
623 }
624
625 if config.timeout_secs == 0 {
626 return Err("timeout_secs must be greater than 0".to_string());
627 }
628
629 Ok(())
630 }
631
632 pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
634 where
635 F: std::future::Future<Output = Result<R, String>>, {
636 async {
637 if self.config.timeout_secs == 0 {
639 return Err("Bulkhead timeout must be greater than 0".to_string());
640 }
641
642 let queue = *self.queue_size.read().await;
644
645 if queue >= self.config.max_queue as u32 {
646 *self.total_rejected.write().await += 1;
647
648 dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
649
650 return Err("Bulkhead queue full".to_string());
651 }
652
653 *self.queue_size.write().await += 1;
655
656 let _Permit =
658 match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
659 .await
660 {
661 Ok(Ok(_)) => {
662 *self.queue_size.write().await -= 1;
665 },
666
667 Ok(Err(e)) => {
668 *self.queue_size.write().await -= 1;
669
670 return Err(format!("Bulkhead semaphore error: {}", e));
671 },
672
673 Err(_) => {
674 *self.queue_size.write().await -= 1;
675
676 *self.total_timed_out.write().await += 1;
677
678 dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
679
680 return Err("Bulkhead timeout waiting for permit".to_string());
681 },
682 };
683
684 *self.queue_size.write().await -= 1;
686
687 *self.current_requests.write().await += 1;
688
689 let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
691
692 let execution_result:Result<R, String> = match execution_result {
693 Ok(Ok(value)) => Ok(value),
694
695 Ok(Err(e)) => Err(e),
696
697 Err(_) => {
698 *self.total_timed_out.write().await += 1;
699
700 Err("Bulkhead execution timeout".to_string())
701 },
702 };
703
704 if execution_result.is_ok() {
705 *self.total_completed.write().await += 1;
706 }
707
708 execution_result
709 }
710 .await
711 }
712
713 pub async fn GetLoad(&self) -> (u32, u32) {
715 async {
716 let current = *self.current_requests.read().await;
717
718 let queue = *self.queue_size.read().await;
719
720 (current, queue)
721 }
722 .await
723 }
724
725 pub async fn GetStatistics(&self) -> BulkheadStatistics {
727 async {
728 BulkheadStatistics {
729 name:self.name.clone(),
730
731 current_concurrent:*self.current_requests.read().await,
732
733 current_queue:*self.queue_size.read().await,
734
735 max_concurrent:self.config.max_concurrent,
736
737 max_queue:self.config.max_queue,
738
739 total_rejected:*self.total_rejected.read().await,
740
741 total_completed:*self.total_completed.read().await,
742
743 total_timed_out:*self.total_timed_out.read().await,
744 }
745 }
746 .await
747 }
748
749 pub async fn GetUtilization(&self) -> f64 {
751 let (current, _) = self.GetLoad().await;
752
753 if self.config.max_concurrent == 0 {
754 return 0.0;
755 }
756
757 (current as f64 / self.config.max_concurrent as f64) * 100.0
758 }
759}
760
761impl Clone for BulkheadExecutor {
762 fn clone(&self) -> Self {
763 Self {
764 name:self.name.clone(),
765
766 semaphore:self.semaphore.clone(),
767
768 config:self.config.clone(),
769
770 current_requests:self.current_requests.clone(),
771
772 queue_size:self.queue_size.clone(),
773
774 total_rejected:self.total_rejected.clone(),
775
776 total_completed:self.total_completed.clone(),
777
778 total_timed_out:self.total_timed_out.clone(),
779 }
780 }
781}
782
783#[derive(Debug)]
785pub struct ResilienceOrchestrator {
786 retry_manager:Arc<RetryManager>,
787
788 circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
789
790 bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
791}
792
793impl ResilienceOrchestrator {
794 pub fn new(retry_policy:RetryPolicy) -> Self {
796 Self {
797 retry_manager:Arc::new(RetryManager::new(retry_policy)),
798
799 circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
800
801 bulkheads:Arc::new(RwLock::new(HashMap::new())),
802 }
803 }
804
805 pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
807 let mut breakers = self.circuit_breakers.write().await;
808
809 Arc::new(
810 breakers
811 .entry(service.to_string())
812 .or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
813 .clone(),
814 )
815 }
816
817 pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
819 let mut bulkheads = self.bulkheads.write().await;
820
821 Arc::new(
822 bulkheads
823 .entry(service.to_string())
824 .or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
825 .clone(),
826 )
827 }
828
829 pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
831 let breakers = self.circuit_breakers.read().await;
832
833 let mut stats = Vec::new();
834
835 for breaker in breakers.values() {
836 stats.push(breaker.GetStatistics().await);
837 }
838
839 stats
840 }
841
842 pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
844 let bulkheads = self.bulkheads.read().await;
845
846 let mut stats = Vec::new();
847
848 for bulkhead in bulkheads.values() {
849 stats.push(bulkhead.GetStatistics().await);
850 }
851
852 stats
853 }
854
855 pub async fn ExecuteResilient<F, R>(
857 &self,
858
859 service:&str,
860
861 retry_policy:&RetryPolicy,
862
863 circuit_config:CircuitBreakerConfig,
864
865 bulkhead_config:BulkheadConfig,
866
867 f:F,
868 ) -> Result<R, String>
869 where
870 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
871 if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
873 return Err(format!("Invalid circuit breaker config: {}", e));
874 }
875
876 if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
877 return Err(format!("Invalid bulkhead config: {}", e));
878 }
879
880 let breaker = self.GetCircuitBreaker(service, circuit_config).await;
881
882 let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
883
884 if breaker.GetState().await == CircuitState::Open {
886 if !breaker.AttemptRecovery().await {
887 return Err("Circuit breaker is open".to_string());
888 }
889 }
890
891 let mut Attempt = 0;
893
894 let _LastError = "".to_string();
895
896 loop {
897 let result = bulkhead.Execute(f()).await;
898
899 match result {
900 Ok(Value) => {
901 breaker.RecordSuccess().await;
902
903 let Event = RetryEvent {
905 Service:service.to_string(),
906
907 Attempt,
908
909 ErrorClass:ErrorClass::Unknown,
910
911 DelayMs:0,
912
913 Success:true,
914
915 ErrorMessage:None,
916 };
917
918 self.retry_manager.PublishRetryEvent(Event);
919
920 return Ok(Value);
921 },
922
923 Err(E) => {
924 let ErrorClass = self.retry_manager.ClassifyError(&E);
925
926 breaker.RecordFailure().await;
927
928 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
930
931 let Event = RetryEvent {
932 Service:service.to_string(),
933
934 Attempt,
935
936 ErrorClass,
937
938 DelayMs:Delay.as_millis() as u64,
939
940 Success:false,
941
942 ErrorMessage:Some(self.redact_sensitive_data(&E)),
943 };
944
945 self.retry_manager.PublishRetryEvent(Event);
946
947 if Attempt < retry_policy.MaxRetries
948 && ErrorClass != ErrorClass::NonRetryable
949 && self.retry_manager.CanRetry(service).await
950 {
951 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
952
953 dev_log!(
954 "resilience",
955 "[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
956 service,
957 Attempt + 1,
958 retry_policy.MaxRetries,
959 Delay,
960 self.redact_sensitive_data(&E)
961 );
962
963 tokio::time::sleep(Delay).await;
964
965 Attempt += 1;
966 } else {
967 return Err(E);
968 }
969 },
970 }
971 }
972 }
973
974 fn redact_sensitive_data(&self, message:&str) -> String {
977 let mut redacted = message.to_string();
978
979 let patterns = vec![
981 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
982 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
983 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
984 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
985 (
986 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
987 "Authorization: Bearer [REDACTED]",
988 ),
989 (r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
990 (r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
991 ];
992
993 for (pattern, replacement) in patterns {
994 if let Ok(re) = regex::Regex::new(pattern) {
995 redacted = re.replace_all(&redacted, replacement).to_string();
996 }
997 }
998
999 redacted
1000 }
1001
1002 pub fn ValidateConfigurations(
1004 &self,
1005
1006 _RetryPolicy:&RetryPolicy,
1007
1008 CircuitConfig:&CircuitBreakerConfig,
1009
1010 BulkheadConfig:&BulkheadConfig,
1011 ) -> Result<(), String> {
1012 self.retry_manager.ValidatePolicy()?;
1013
1014 CircuitBreaker::ValidateConfig(CircuitConfig)?;
1015
1016 BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1017
1018 TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1019
1020 Ok(())
1021 }
1022}
1023
1024impl Clone for ResilienceOrchestrator {
1025 fn clone(&self) -> Self {
1026 Self {
1027 retry_manager:self.retry_manager.clone(),
1028
1029 circuit_breakers:self.circuit_breakers.clone(),
1030
1031 bulkheads:self.bulkheads.clone(),
1032 }
1033 }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038
1039 use super::*;
1040
1041 #[test]
1042 fn test_retry_delay_calculation() {
1043 let policy = RetryPolicy::default();
1044
1045 let manager = RetryManager::new(policy);
1046
1047 let delay_1 = manager.CalculateRetryDelay(1);
1048
1049 let delay_2 = manager.CalculateRetryDelay(2);
1050
1051 assert!(delay_2 >= delay_1);
1053 }
1054
1055 #[test]
1056 fn test_adaptive_retry_delay() {
1057 let policy = RetryPolicy::default();
1058
1059 let manager = RetryManager::new(policy);
1060
1061 let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1063
1064 let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1065
1066 assert!(rate_limit_delay >= transient_delay);
1067 }
1068
1069 #[test]
1070 fn test_error_classification() {
1071 let policy = RetryPolicy::default();
1072
1073 let manager = RetryManager::new(policy);
1074
1075 assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1076
1077 assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1078
1079 assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1080
1081 assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1082 }
1083
1084 #[test]
1085 fn test_policy_validation() {
1086 let policy = RetryPolicy::default();
1087
1088 let manager = RetryManager::new(policy);
1089
1090 assert!(manager.ValidatePolicy().is_ok());
1091
1092 let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1093
1094 let invalid_manager = RetryManager::new(invalid_policy);
1095
1096 assert!(invalid_manager.ValidatePolicy().is_err());
1097 }
1098
1099 #[tokio::test]
1100 async fn test_circuit_breaker_state_transitions() {
1101 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1102
1103 let breaker = CircuitBreaker::new("test".to_string(), config);
1104
1105 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1106
1107 breaker.RecordFailure().await;
1108
1109 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1110
1111 breaker.RecordFailure().await;
1112
1113 assert_eq!(breaker.GetState().await, CircuitState::Open);
1114
1115 assert!(breaker.AttemptRecovery().await);
1116
1117 assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1118
1119 breaker.RecordSuccess().await;
1120
1121 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1122 }
1123
1124 #[tokio::test]
1125 async fn test_circuit_breaker_validation() {
1126 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1127
1128 let breaker = CircuitBreaker::new("test".to_string(), config);
1129
1130 assert!(breaker.ValidateState().await.is_ok());
1132
1133 breaker.RecordFailure().await;
1135
1136 breaker.RecordFailure().await;
1137
1138 let validate_result = breaker.ValidateState().await;
1139
1140 assert!(validate_result.is_ok() || validate_result.is_err());
1142 }
1143
1144 #[test]
1145 fn test_circuit_breaker_config_validation() {
1146 let valid_config = CircuitBreakerConfig::default();
1147
1148 assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1149
1150 let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1151
1152 assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1153 }
1154
1155 #[tokio::test]
1156 async fn test_bulkhead_resource_isolation() {
1157 let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1158
1159 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1160
1161 let (_current, _queue) = bulkhead.GetLoad().await;
1162
1163 assert_eq!(_current, 0);
1164
1165 assert_eq!(_queue, 0);
1166
1167 let stats = bulkhead.GetStatistics().await;
1168
1169 assert_eq!(stats.current_concurrent, 0);
1170
1171 assert_eq!(stats.current_queue, 0);
1172
1173 assert_eq!(stats.max_concurrent, 2);
1174
1175 assert_eq!(stats.max_queue, 5);
1176 }
1177
1178 #[tokio::test]
1179 async fn test_bulkhead_utilization() {
1180 let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1181
1182 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1183
1184 let utilization = bulkhead.GetUtilization().await;
1185
1186 assert_eq!(utilization, 0.0);
1187 }
1188
1189 #[test]
1190 fn test_bulkhead_config_validation() {
1191 let valid_config = BulkheadConfig::default();
1192
1193 assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1194
1195 let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1196
1197 assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1198 }
1199
1200 #[test]
1201 fn test_timeout_manager() {
1202 let manager = TimeoutManager::new(Duration::from_secs(30));
1203
1204 assert!(!manager.IsExceeded());
1205
1206 assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1207
1208 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1209
1210 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1211 }
1212
1213 #[test]
1214 fn test_timeout_manager_with_deadline() {
1215 let deadline = Instant::now() + Duration::from_secs(60);
1216
1217 let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1218
1219 let remaining = manager.Remaining();
1220
1221 assert!(remaining.is_some());
1222
1223 assert!(remaining.unwrap() <= Duration::from_secs(60));
1224 }
1225}