Skip to main content

AirLibrary/Downloader/
RateLimit.rs

1#![allow(unused_variables, dead_code, unused_imports)]
2
3//! Token-bucket rate limiter for per-download bandwidth throttling.
4//!
5//! Tokens represent bytes that can be consumed. They refill at `refill_rate`
6//! bytes/second up to `capacity`. Downloads call `consume(bytes).await` which
7//! parks the task until enough tokens are available, keeping the observed
8//! throughput at or below the configured limit while still allowing short
9//! bursts up to `capacity_factor` seconds' worth of data.
10
11use std::time::{Duration, Instant};
12
13use crate::Result;
14
15/// Token-bucket rate limiter. Stores the bucket state; wrap in `Arc<RwLock<_>>`
16/// to share across concurrent download tasks.
17#[derive(Debug)]
18pub struct TokenBucket {
19	/// Available tokens (bytes).
20	tokens:f64,
21	/// Burst capacity (bytes).
22	capacity:f64,
23	/// Bytes-per-second refill rate.
24	refill_rate:f64,
25	/// Monotonic timestamp of the last refill.
26	last_refill:Instant,
27}
28
29impl TokenBucket {
30	/// Create a bucket with `bytes_per_sec` sustained throughput and
31	/// a burst buffer of `capacity_factor` seconds' worth of tokens.
32	pub fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
33		let refill_rate = bytes_per_sec as f64;
34		let capacity = refill_rate * capacity_factor;
35		Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
36	}
37
38	/// Replenish tokens based on elapsed wall time. Call before every consume.
39	pub fn refill(&mut self) {
40		let elapsed = self.last_refill.elapsed().as_secs_f64();
41		if elapsed > 0.0 {
42			self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
43			self.last_refill = Instant::now();
44		}
45	}
46
47	/// Consume up to `bytes` tokens immediately. Returns how many were
48	/// consumed. Does not block - the caller decides what to do with remaining
49	/// deficit.
50	pub fn try_consume(&mut self, bytes:u64) -> u64 {
51		self.refill();
52		let bytes = bytes as f64;
53		if self.tokens >= bytes {
54			self.tokens -= bytes;
55			bytes as u64
56		} else {
57			let available = self.tokens;
58			self.tokens = 0.0;
59			available as u64
60		}
61	}
62
63	/// Async-wait until `bytes` tokens are available, then consume them.
64	/// Polls at most every 100 ms so Tokio's timer wheel stays responsive.
65	pub async fn consume(&mut self, bytes:u64) -> Result<()> {
66		let bytes_needed = bytes as f64;
67		loop {
68			self.refill();
69			if self.tokens >= bytes_needed {
70				self.tokens -= bytes_needed;
71				return Ok(());
72			}
73			let tokens_needed = bytes_needed - self.tokens;
74			let wait_secs = (tokens_needed / self.refill_rate).min(0.1);
75			tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
76		}
77	}
78
79	/// Adjust the sustained rate. Burst capacity is reset to 5× the new rate.
80	pub fn set_rate(&mut self, bytes_per_sec:u64) {
81		self.refill_rate = bytes_per_sec as f64;
82		self.capacity = self.refill_rate * 5.0;
83	}
84}