Skip to main content

AirLibrary/Resilience/
Retry.rs

1#![allow(unused_variables, dead_code, unused_imports)]
2
3//! Exponential-backoff retry logic with jitter and budget management.
4//!
5//! Three cooperating types:
6//! - `ErrorClass` - classifies errors so `RetryManager` can pick the right
7//!   delay strategy (transient, rate-limited, server error, non-retryable).
8//! - `RetryPolicy` - configurable max-attempts, intervals, backoff multiplier,
9//!   jitter factor, and a per-service call budget.
10//! - `RetryManager` - applies the policy: computes delays, tracks per-service
11//!   budgets, classifies errors, and publishes `RetryEvent` to a broadcast
12//!   channel for telemetry subscribers.
13
14use std::{
15	collections::HashMap,
16	sync::Arc,
17	time::{Duration, Instant},
18};
19
20use serde::{Deserialize, Serialize};
21use tokio::sync::{Mutex, broadcast};
22
23use crate::dev_log;
24
25/// Error classification for adaptive retry policies.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum ErrorClass {
28	/// Transient: network timeouts, temporary failures.
29	Transient,
30	/// Non-retryable: authentication errors, invalid requests.
31	NonRetryable,
32	/// Rate-limited: 429 Too Many Requests.
33	RateLimited,
34	/// Server errors: 500-599.
35	ServerError,
36	/// Unrecognised error pattern.
37	Unknown,
38}
39
40/// Retry policy configuration - controls all delay and budget parameters.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct RetryPolicy {
43	pub MaxRetries:u32,
44	pub InitialIntervalMs:u64,
45	pub MaxIntervalMs:u64,
46	pub BackoffMultiplier:f64,
47	/// Jitter fraction 0-1 added on top of the base delay.
48	pub JitterFactor:f64,
49	pub BudgetPerMinute:u32,
50	pub ErrorClassification:HashMap<String, ErrorClass>,
51}
52
53impl Default for RetryPolicy {
54	fn default() -> Self {
55		let mut ErrorClassification = HashMap::new();
56		ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
57		ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
58		ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
59		ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
60		ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
61		ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
62		ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
63		ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
64		ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
65		ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
66		ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
67		Self {
68			MaxRetries:3,
69			InitialIntervalMs:1000,
70			MaxIntervalMs:32000,
71			BackoffMultiplier:2.0,
72			JitterFactor:0.1,
73			BudgetPerMinute:100,
74			ErrorClassification,
75		}
76	}
77}
78
79/// Per-service retry budget: tracks attempt timestamps and enforces the
80/// calls-per-minute cap from `RetryPolicy::BudgetPerMinute`.
81#[derive(Debug, Clone)]
82struct RetryBudget {
83	Attempts:Vec<Instant>,
84	MaxPerMinute:u32,
85}
86
87impl RetryBudget {
88	fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
89
90	fn can_retry(&mut self) -> bool {
91		let Now = Instant::now();
92		let OneMinuteAgo = Now - Duration::from_secs(60);
93		self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
94		if self.Attempts.len() < self.MaxPerMinute as usize {
95			self.Attempts.push(Now);
96			true
97		} else {
98			false
99		}
100	}
101}
102
103/// Telemetry event published after every retry attempt.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct RetryEvent {
106	pub Service:String,
107	pub Attempt:u32,
108	pub ErrorClass:ErrorClass,
109	pub DelayMs:u64,
110	pub Success:bool,
111	pub ErrorMessage:Option<String>,
112}
113
114/// Applies `RetryPolicy`: computes delays, tracks budgets per service,
115/// classifies errors, and publishes `RetryEvent`s.
116#[derive(Debug)]
117pub struct RetryManager {
118	Policy:RetryPolicy,
119	Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
120	EventTx:Arc<broadcast::Sender<RetryEvent>>,
121}
122
123impl RetryManager {
124	pub fn new(policy:RetryPolicy) -> Self {
125		let (EventTx, _) = broadcast::channel(1000);
126		Self {
127			Policy:policy,
128			Budgets:Arc::new(Mutex::new(HashMap::new())),
129			EventTx:Arc::new(EventTx),
130		}
131	}
132
133	pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
134
135	/// Exponential backoff with jitter: `base * multiplier^(attempt-1) +
136	/// jitter`.
137	pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
138		if Attempt == 0 {
139			return Duration::from_millis(0);
140		}
141		let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
142			.min(self.Policy.MaxIntervalMs as f64) as u64;
143		let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
144		let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
145		Duration::from_millis(BaseDelay + RandomJitter)
146	}
147
148	/// Choose delay strategy based on classified error type.
149	pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
150		let Class = self
151			.Policy
152			.ErrorClassification
153			.get(ErrorType)
154			.copied()
155			.unwrap_or(ErrorClass::Unknown);
156		match Class {
157			ErrorClass::RateLimited => Duration::from_millis(((attempt + 1) * 5000) as u64),
158			ErrorClass::ServerError => {
159				let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
160				Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
161			},
162			ErrorClass::Transient => self.CalculateRetryDelay(attempt),
163			ErrorClass::NonRetryable | ErrorClass::Unknown => Duration::from_millis(100),
164		}
165	}
166
167	pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
168		let Lower = ErrorMessage.to_lowercase();
169		for (pattern, class) in &self.Policy.ErrorClassification {
170			if Lower.contains(pattern) {
171				return *class;
172			}
173		}
174		ErrorClass::Unknown
175	}
176
177	pub async fn CanRetry(&self, service:&str) -> bool {
178		let mut budgets = self.Budgets.lock().await;
179		let budget = budgets
180			.entry(service.to_string())
181			.or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
182		budget.can_retry()
183	}
184
185	pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
186
187	pub fn ValidatePolicy(&self) -> Result<(), String> {
188		if self.Policy.MaxRetries == 0 {
189			return Err("MaxRetries must be greater than 0".to_string());
190		}
191		if self.Policy.InitialIntervalMs == 0 {
192			return Err("InitialIntervalMs must be greater than 0".to_string());
193		}
194		if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
195			return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
196		}
197		if self.Policy.BackoffMultiplier <= 1.0 {
198			return Err("BackoffMultiplier must be greater than 1.0".to_string());
199		}
200		if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
201			return Err("JitterFactor must be between 0 and 1".to_string());
202		}
203		if self.Policy.BudgetPerMinute == 0 {
204			return Err("BudgetPerMinute must be greater than 0".to_string());
205		}
206		Ok(())
207	}
208}