AirLibrary/Resilience/
Retry.rs1#![allow(unused_variables, dead_code, unused_imports)]
2
3use std::{
15 collections::HashMap,
16 sync::Arc,
17 time::{Duration, Instant},
18};
19
20use serde::{Deserialize, Serialize};
21use tokio::sync::{Mutex, broadcast};
22
23use crate::dev_log;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum ErrorClass {
28 Transient,
30 NonRetryable,
32 RateLimited,
34 ServerError,
36 Unknown,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct RetryPolicy {
43 pub MaxRetries:u32,
44 pub InitialIntervalMs:u64,
45 pub MaxIntervalMs:u64,
46 pub BackoffMultiplier:f64,
47 pub JitterFactor:f64,
49 pub BudgetPerMinute:u32,
50 pub ErrorClassification:HashMap<String, ErrorClass>,
51}
52
53impl Default for RetryPolicy {
54 fn default() -> Self {
55 let mut ErrorClassification = HashMap::new();
56 ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
57 ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
58 ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
59 ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
60 ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
61 ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
62 ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
63 ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
64 ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
65 ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
66 ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
67 Self {
68 MaxRetries:3,
69 InitialIntervalMs:1000,
70 MaxIntervalMs:32000,
71 BackoffMultiplier:2.0,
72 JitterFactor:0.1,
73 BudgetPerMinute:100,
74 ErrorClassification,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
82struct RetryBudget {
83 Attempts:Vec<Instant>,
84 MaxPerMinute:u32,
85}
86
87impl RetryBudget {
88 fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
89
90 fn can_retry(&mut self) -> bool {
91 let Now = Instant::now();
92 let OneMinuteAgo = Now - Duration::from_secs(60);
93 self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
94 if self.Attempts.len() < self.MaxPerMinute as usize {
95 self.Attempts.push(Now);
96 true
97 } else {
98 false
99 }
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct RetryEvent {
106 pub Service:String,
107 pub Attempt:u32,
108 pub ErrorClass:ErrorClass,
109 pub DelayMs:u64,
110 pub Success:bool,
111 pub ErrorMessage:Option<String>,
112}
113
114#[derive(Debug)]
117pub struct RetryManager {
118 Policy:RetryPolicy,
119 Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
120 EventTx:Arc<broadcast::Sender<RetryEvent>>,
121}
122
123impl RetryManager {
124 pub fn new(policy:RetryPolicy) -> Self {
125 let (EventTx, _) = broadcast::channel(1000);
126 Self {
127 Policy:policy,
128 Budgets:Arc::new(Mutex::new(HashMap::new())),
129 EventTx:Arc::new(EventTx),
130 }
131 }
132
133 pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
134
135 pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
138 if Attempt == 0 {
139 return Duration::from_millis(0);
140 }
141 let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
142 .min(self.Policy.MaxIntervalMs as f64) as u64;
143 let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
144 let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
145 Duration::from_millis(BaseDelay + RandomJitter)
146 }
147
148 pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
150 let Class = self
151 .Policy
152 .ErrorClassification
153 .get(ErrorType)
154 .copied()
155 .unwrap_or(ErrorClass::Unknown);
156 match Class {
157 ErrorClass::RateLimited => Duration::from_millis(((attempt + 1) * 5000) as u64),
158 ErrorClass::ServerError => {
159 let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
160 Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
161 },
162 ErrorClass::Transient => self.CalculateRetryDelay(attempt),
163 ErrorClass::NonRetryable | ErrorClass::Unknown => Duration::from_millis(100),
164 }
165 }
166
167 pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
168 let Lower = ErrorMessage.to_lowercase();
169 for (pattern, class) in &self.Policy.ErrorClassification {
170 if Lower.contains(pattern) {
171 return *class;
172 }
173 }
174 ErrorClass::Unknown
175 }
176
177 pub async fn CanRetry(&self, service:&str) -> bool {
178 let mut budgets = self.Budgets.lock().await;
179 let budget = budgets
180 .entry(service.to_string())
181 .or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
182 budget.can_retry()
183 }
184
185 pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
186
187 pub fn ValidatePolicy(&self) -> Result<(), String> {
188 if self.Policy.MaxRetries == 0 {
189 return Err("MaxRetries must be greater than 0".to_string());
190 }
191 if self.Policy.InitialIntervalMs == 0 {
192 return Err("InitialIntervalMs must be greater than 0".to_string());
193 }
194 if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
195 return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
196 }
197 if self.Policy.BackoffMultiplier <= 1.0 {
198 return Err("BackoffMultiplier must be greater than 1.0".to_string());
199 }
200 if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
201 return Err("JitterFactor must be between 0 and 1".to_string());
202 }
203 if self.Policy.BudgetPerMinute == 0 {
204 return Err("BudgetPerMinute must be greater than 0".to_string());
205 }
206 Ok(())
207 }
208}