1pub mod RateLimit;
104
105pub mod Types;
106
107use std::{
108 collections::{HashMap, VecDeque},
109 path::{Path, PathBuf},
110 sync::Arc,
111 time::{Duration, Instant},
112};
113
114use serde::{Deserialize, Serialize};
115use tokio::sync::{RwLock, Semaphore};
116
117use crate::{
118 AirError,
119 ApplicationState::ApplicationState,
120 Configuration::ConfigurationManager,
121 Result,
122 Utility,
123 dev_log,
124};
125
126pub struct DownloadManager {
128 AppState:Arc<ApplicationState>,
130
131 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
133
134 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
136
137 CacheDirectory:PathBuf,
139
140 client:reqwest::Client,
142
143 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
145
146 BandwidthLimiter:Arc<Semaphore>,
148
149 TokenBucket:Arc<RwLock<TokenBucket>>,
151
152 ConcurrentLimiter:Arc<Semaphore>,
154
155 statistics:Arc<RwLock<DownloadStatistics>>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DownloadStatus {
162 pub DownloadId:String,
163
164 pub url:String,
165
166 pub destination:PathBuf,
167
168 pub TotalSize:u64,
169
170 pub downloaded:u64,
171
172 pub progress:f32,
173
174 pub status:DownloadState,
175
176 pub error:Option<String>,
177
178 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
179
180 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
181
182 pub ChunksCompleted:usize,
183
184 pub TotalChunks:usize,
185
186 pub DownloadRateBytesPerSec:u64,
187
188 pub ExpectedChecksum:Option<String>,
189
190 pub ActualChecksum:Option<String>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub enum DownloadState {
196 Pending,
197
198 Queued,
199
200 Downloading,
201
202 Verifying,
203
204 Completed,
205
206 Failed,
207
208 Cancelled,
209
210 Paused,
211
212 Resuming,
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
217pub enum DownloadPriority {
218 High = 3,
219
220 Normal = 2,
221
222 Low = 1,
223
224 Background = 0,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct QueuedDownload {
230 DownloadId:String,
231
232 url:String,
233
234 destination:PathBuf,
235
236 checksum:String,
237
238 priority:DownloadPriority,
239
240 AddedAt:chrono::DateTime<chrono::Utc>,
241
242 MaxFileSize:Option<u64>,
243
244 ValidateDiskSpace:bool,
245}
246
247#[derive(Debug, Clone)]
249pub struct DownloadResult {
250 pub path:String,
251
252 pub size:u64,
253
254 pub checksum:String,
255
256 pub duration:Duration,
257
258 pub AverageRate:u64,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct DownloadStatistics {
264 pub TotalDownloads:u64,
265
266 pub SuccessfulDownloads:u64,
267
268 pub FailedDownloads:u64,
269
270 pub CancelledDownloads:u64,
271
272 pub TotalBytesDownloaded:u64,
273
274 pub TotalDownloadTimeSecs:f64,
275
276 pub AverageDownloadRate:f64,
277
278 pub PeakDownloadRate:u64,
279
280 pub ActiveDownloads:usize,
281
282 pub QueuedDownloads:usize,
283}
284
285pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
287
288#[derive(Debug)]
296struct TokenBucket {
297 tokens:f64,
299
300 capacity:f64,
302
303 refill_rate:f64,
305
306 last_refill:Instant,
308}
309
310impl TokenBucket {
311 fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
313 let refill_rate = bytes_per_sec as f64;
314
315 let capacity = refill_rate * capacity_factor; Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
318 }
319
320 fn refill(&mut self) {
322 let elapsed = self.last_refill.elapsed().as_secs_f64();
323
324 if elapsed > 0.0 {
325 let new_tokens = elapsed * self.refill_rate;
326
327 self.tokens = (self.tokens + new_tokens).min(self.capacity);
328
329 self.last_refill = Instant::now();
330 }
331 }
332
333 #[allow(dead_code)]
336 fn try_consume(&mut self, bytes:u64) -> u64 {
337 self.refill();
338
339 let bytes = bytes as f64;
340
341 if self.tokens >= bytes {
342 self.tokens -= bytes;
343
344 return bytes as u64;
345 }
346
347 let available = self.tokens;
349
350 self.tokens = 0.0;
351
352 available as u64
353 }
354
355 async fn consume(&mut self, bytes:u64) -> Result<()> {
357 let bytes_needed = bytes as f64;
358
359 loop {
360 self.refill();
361
362 if self.tokens >= bytes_needed {
363 self.tokens -= bytes_needed;
364
365 return Ok(());
366 }
367
368 let tokens_needed = bytes_needed - self.tokens;
370
371 let wait_duration = tokens_needed / self.refill_rate;
372
373 let sleep_duration = Duration::from_secs_f64(wait_duration.min(0.1));
375
376 tokio::time::sleep(sleep_duration).await;
377 }
378 }
379
380 fn set_rate(&mut self, bytes_per_sec:u64) {
382 self.refill_rate = bytes_per_sec as f64;
383
384 self.capacity = self.refill_rate * 5.0; }
386}
387
388#[derive(Debug, Clone)]
390pub struct DownloadConfig {
391 pub url:String,
392
393 pub destination:String,
394
395 pub checksum:String,
396
397 pub MaxFileSize:Option<u64>,
398
399 pub ChunkSize:usize,
400
401 pub MaxRetries:u32,
402
403 pub TimeoutSecs:u64,
404
405 pub priority:DownloadPriority,
406
407 pub ValidateDiskSpace:bool,
408}
409
410impl Default for DownloadConfig {
411 fn default() -> Self {
412 Self {
413 url:String::new(),
414
415 destination:String::new(),
416
417 checksum:String::new(),
418
419 MaxFileSize:None,
420
421 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
423
424 TimeoutSecs:300,
425
426 priority:DownloadPriority::Normal,
427
428 ValidateDiskSpace:true,
429 }
430 }
431}
432
433impl DownloadManager {
434 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
436 let config = &AppState.Configuration.Downloader;
437
438 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
440
441 let CacheDirectoryClone = CacheDirectory.clone();
443
444 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
446
447 tokio::fs::create_dir_all(&CacheDirectory)
449 .await
450 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
451
452 let dns_port = Mist::dns_port();
454
455 let client = crate::HTTP::Client::secured_client_builder(dns_port)
456 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?
457 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
458 .connect_timeout(Duration::from_secs(30))
459 .pool_idle_timeout(Duration::from_secs(90))
460 .pool_max_idle_per_host(10)
461 .tcp_keepalive(Duration::from_secs(60))
462 .user_agent("Land-AirDownloader/0.1.0")
463 .build()
464 .map_err(|e| AirError::Network(format!("Failed to build HTTP client: {}", e)))?;
465
466 let BandwidthLimiter = Arc::new(Semaphore::new(100));
468
469 let TokenBucket = Arc::new(RwLock::new(TokenBucket::new(100 * 1024 * 1024, 5.0)));
471
472 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
474
475 let manager = Self {
476 AppState,
477
478 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
479
480 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
481
482 CacheDirectory:CacheDirectoryCloneForInit,
483
484 client,
485
486 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
487
488 BandwidthLimiter,
489
490 TokenBucket,
491
492 ConcurrentLimiter,
493
494 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
495 };
496
497 manager
499 .AppState
500 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
501 .await
502 .map_err(|e| AirError::Internal(e.to_string()))?;
503
504 dev_log!(
505 "update",
506 "[DownloadManager] Initialized with cache directory: {}",
507 CacheDirectory.display()
508 );
509
510 Ok(manager)
511 }
512
513 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
515 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
516 .await
517 }
518
519 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
521 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
523
524 let DownloadId = Utility::GenerateRequestId();
526
527 dev_log!(
528 "update",
529 "[DownloadManager] Starting download [ID: {}] - URL: {}",
530 DownloadId,
531 SanitizedUrl
532 );
533
534 if SanitizedUrl.is_empty() {
536 return Err(AirError::Network("URL cannot be empty".to_string()));
537 }
538
539 let Destination = if config.destination.is_empty() {
541 let Filename = SanitizedUrl
543 .split('/')
544 .last()
545 .and_then(|s| s.split('?').next())
546 .unwrap_or("download.bin");
547
548 self.CacheDirectory.join(Filename)
549 } else {
550 ConfigurationManager::ExpandPath(&config.destination)?
551 };
552
553 Utility::ValidateFilePath(
555 Destination
556 .to_str()
557 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
558 )?;
559
560 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
562
563 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
565 .await?;
566
567 if config.ValidateDiskSpace {
569 if let Some(MaxSize) = config.MaxFileSize {
570 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
571 } else {
572 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
574 }
575
576 if let Some(Parent) = Destination.parent() {
578 tokio::fs::create_dir_all(Parent)
579 .await
580 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
581 }
582
583 let StartTime = Instant::now();
584
585 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
587
588 let Duration = StartTime.elapsed();
589
590 match Result {
591 Ok(mut FileInfo) => {
592 FileInfo.duration = Duration;
593
594 self.UpdateStatistics(true, FileInfo.size, Duration).await;
596
597 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
598 .await?;
599
600 dev_log!(
601 "update",
602 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
603 DownloadId,
604 FileInfo.size,
605 Duration.as_secs_f64(),
606 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
607 );
608
609 Ok(FileInfo)
610 },
611
612 Err(E) => {
613 self.UpdateStatistics(false, 0, Duration).await;
615
616 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
617 .await?;
618
619 if Destination.exists() {
621 let _ = tokio::fs::remove_file(&Destination).await;
622
623 dev_log!(
624 "update",
625 "warn: [DownloadManager] Cleaned up failed download: {}",
626 Destination.display()
627 );
628 }
629
630 dev_log!(
631 "update",
632 "error: [DownloadManager] Download failed [ID: {}] - Error: {}",
633 DownloadId,
634 E
635 );
636
637 Err(E)
638 },
639 }
640 }
641
642 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
644 let url = url.trim();
645
646 if url.is_empty() {
648 return Err(AirError::Network("URL cannot be empty".to_string()));
649 }
650
651 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
653
654 match parsed.scheme() {
656 "http" | "https" => (),
657
658 scheme => {
659 return Err(AirError::Network(format!(
660 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
661 scheme
662 )));
663 },
664 }
665
666 if parsed.host().is_none() {
668 return Err(AirError::Network("URL must have a valid host".to_string()));
669 }
670
671 #[cfg(debug_assertions)]
673 {
674
675 }
677
678 #[cfg(not(debug_assertions))]
679 {
680 if let Some(host) = parsed.host_str() {
681 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
682 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
683 }
684
685 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
686 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
687 }
688 }
689 }
690
691 let mut sanitized = parsed.clone();
693
694 if sanitized.password().is_some() {
696 sanitized.set_password(Some("")).ok();
697 }
698
699 Ok(sanitized.to_string())
700 }
701
702 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
704 let DestPath = if destination.is_absolute() {
706 destination.to_path_buf()
707 } else {
708 std::env::current_dir()
709 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
710 .join(destination)
711 };
712
713 let MountPoint = self.FindMountPoint(&DestPath)?;
715
716 dev_log!(
718 "update",
719 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
720 url,
721 RequiredBytes,
722 MountPoint.display()
723 );
724
725 #[cfg(unix)]
726 {
727 match self.GetDiskStatvfs(&MountPoint) {
728 Ok((AvailableBytes, TotalBytes)) => {
729 if AvailableBytes < RequiredBytes {
730 dev_log!(
731 "update",
732 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
733 AvailableBytes,
734 RequiredBytes
735 );
736
737 return Err(AirError::FileSystem(format!(
738 "Insufficient disk space: {} bytes available, {} bytes required",
739 AvailableBytes, RequiredBytes
740 )));
741 }
742
743 dev_log!(
744 "update",
745 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
746 AvailableBytes,
747 RequiredBytes,
748 TotalBytes
749 );
750 },
751
752 Err(e) => {
753 dev_log!(
754 "update",
755 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
756 e
757 );
758 },
759 }
760 }
761
762 #[cfg(windows)]
763 {
764 match self.GetDiskSpaceWindows(&MountPoint) {
765 Ok(AvailableBytes) => {
766 if AvailableBytes < RequiredBytes {
767 dev_log!(
768 "update",
769 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
770 AvailableBytes,
771 RequiredBytes
772 );
773
774 return Err(AirError::FileSystem(format!(
775 "Insufficient disk space: {} bytes available, {} bytes required",
776 available_bytes, RequiredBytes
777 )));
778 }
779
780 dev_log!(
781 "update",
782 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
783 available_bytes,
784 RequiredBytes
785 );
786 },
787
788 Err(e) => {
789 dev_log!(
790 "update",
791 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
792 e
793 );
794 },
795 }
796 }
797
798 #[cfg(not(any(unix, windows)))]
799 {
800 dev_log!(
801 "update",
802 "warn: [DownloadManager] Disk space validation not available on this platform"
803 );
804 }
805
806 Ok(())
807 }
808
809 #[cfg(unix)]
811 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
812 use std::{ffi::CString, os::unix::ffi::OsStrExt};
813
814 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
815
816 let path_cstr = CString::new(path.as_os_str().as_bytes())
818 .map_err(|e| AirError::FileSystem(format!("Failed to convert path to C string: {}", e)))?;
819
820 let mut stat:libc::statvfs = unsafe { std::mem::zeroed() };
822
823 let result = unsafe { libc::statvfs(path_cstr.as_ptr(), &mut stat) };
824
825 if result != 0 {
826 let err = std::io::Error::last_os_error();
827
828 return Err(AirError::FileSystem(format!("Failed to get disk stats: {}", err)));
829 }
830
831 let fragment_size = stat.f_frsize as u64;
833
834 let available_bytes = fragment_size * stat.f_bavail as u64;
835
836 let total_bytes = fragment_size * stat.f_blocks as u64;
837
838 dev_log!(
839 "update",
840 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
841 path.display(),
842 available_bytes,
843 total_bytes
844 );
845
846 Ok((available_bytes, total_bytes))
847 }
848
849 #[cfg(windows)]
851 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
852 use std::os::windows::ffi::OsStrExt;
853
854 use windows::Win32::Storage::FileSystem::GetDiskFreeSpaceExW;
855
856 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
857
858 let path_str:Vec<u16> = path.as_os_str().encode_wide().chain(std::iter::once(0)).collect();
860
861 let mut free_bytes_available:u64 = 0;
862
863 let mut total_bytes:u64 = 0;
864
865 let mut total_free_bytes:u64 = 0;
866
867 let result = unsafe {
868 GetDiskFreeSpaceExW(
869 windows::core::PCWSTR(path_str.as_ptr()),
870 &mut free_bytes_available as *mut _ as _,
871 &mut total_bytes as *mut _ as _,
872 &mut total_free_bytes as *mut _ as _,
873 )
874 };
875
876 if !result.as_bool() {
877 let err = std::io::Error::last_os_error();
878
879 return Err(AirError::FileSystem(format!("Failed to get disk space: {}", err)));
880 }
881
882 dev_log!(
883 "update",
884 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
885 path.display(),
886 free_bytes_available,
887 total_bytes
888 );
889
890 Ok(free_bytes_available)
891 }
892
893 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
895 #[cfg(unix)]
896 {
897 let mut current = path
898 .canonicalize()
899 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
900
901 loop {
902 if current.as_os_str().is_empty() || current == Path::new("/") {
903 return Ok(PathBuf::from("/"));
904 }
905
906 let metadata = std::fs::metadata(¤t)
907 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
908
909 #[cfg(unix)]
911 let CurrentDevice = {
912 use std::os::unix::fs::MetadataExt;
913
914 metadata.dev()
915 };
916
917 #[cfg(not(unix))]
918 let CurrentDevice = 0u64; let parent = current.parent();
921
922 if let Some(parent_path) = parent {
923 let ParentMetadata = std::fs::metadata(parent_path)
924 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
925
926 #[cfg(unix)]
927 let ParentDevice = {
928 use std::os::unix::fs::MetadataExt;
929
930 ParentMetadata.dev()
931 };
932
933 #[cfg(not(unix))]
934 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
937 return Ok(current);
938 }
939 } else {
940 return Ok(current);
941 }
942
943 current.pop();
944 }
945 }
946
947 #[cfg(windows)]
948 {
949 let PathStr = path.to_string_lossy();
951
952 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
953 return Ok(PathBuf::from(&PathStr[..3]));
954 }
955
956 Ok(PathBuf::from("C:\\"))
957 }
958
959 #[cfg(not(any(unix, windows)))]
960 {
961 Ok(path.to_path_buf())
962 }
963 }
964
965 async fn DownloadWithRetry(
967 &self,
968
969 DownloadId:&str,
970
971 url:&str,
972
973 destination:&PathBuf,
974
975 config:&DownloadConfig,
976 ) -> Result<DownloadResult> {
977 let RetryPolicy = crate::Resilience::RetryPolicy {
978 MaxRetries:config.MaxRetries,
979
980 InitialIntervalMs:1000,
981
982 MaxIntervalMs:32000,
983
984 BackoffMultiplier:2.0,
985
986 JitterFactor:0.1,
987
988 BudgetPerMinute:100,
989
990 ErrorClassification:std::collections::HashMap::new(),
991 };
992
993 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
994
995 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
996 "downloader".to_string(),
997 crate::Resilience::CircuitBreakerConfig::default(),
998 );
999
1000 let mut attempt = 0;
1001
1002 loop {
1003 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
1005 if !CircuitBreaker.AttemptRecovery().await {
1006 return Err(AirError::Network(
1007 "Circuit breaker is open, too many recent failures".to_string(),
1008 ));
1009 }
1010 }
1011
1012 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1014 if status.status == DownloadState::Cancelled {
1015 return Err(AirError::Network("Download cancelled".to_string()));
1016 }
1017 }
1018
1019 match self.PerformDownload(DownloadId, url, destination, config).await {
1020 Ok(file_info) => {
1021 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
1023 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
1024 .await?;
1025
1026 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
1027 dev_log!(
1028 "update",
1029 "warn: [DownloadManager] Checksum verification failed [ID: {}]: {}",
1030 DownloadId,
1031 e
1032 );
1033
1034 CircuitBreaker.RecordFailure().await;
1035
1036 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1037 attempt += 1;
1038
1039 let delay = RetryManager.CalculateRetryDelay(attempt);
1040
1041 dev_log!(
1042 "update",
1043 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
1044 DownloadId,
1045 attempt + 1,
1046 config.MaxRetries + 1,
1047 delay
1048 );
1049
1050 tokio::time::sleep(delay).await;
1051
1052 continue;
1053 } else {
1054 return Err(AirError::Network(format!(
1055 "Checksum verification failed after {} retries: {}",
1056 attempt, e
1057 )));
1058 }
1059 }
1060 }
1061
1062 CircuitBreaker.RecordSuccess().await;
1063
1064 return Ok(file_info);
1065 },
1066
1067 Err(e) => {
1068 CircuitBreaker.RecordFailure().await;
1069
1070 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1071 attempt += 1;
1072
1073 dev_log!(
1074 "update",
1075 "warn: [DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
1076 DownloadId,
1077 attempt + 1,
1078 config.MaxRetries + 1,
1079 e
1080 );
1081
1082 let delay = RetryManager.CalculateRetryDelay(attempt);
1083
1084 tokio::time::sleep(delay).await;
1085 } else {
1086 return Err(e);
1087 }
1088 },
1089 }
1090 }
1091 }
1092
1093 async fn PerformDownload(
1095 &self,
1096
1097 DownloadId:&str,
1098
1099 url:&str,
1100
1101 destination:&PathBuf,
1102
1103 config:&DownloadConfig,
1104 ) -> Result<DownloadResult> {
1105 let _concurrent_permit = self
1107 .ConcurrentLimiter
1108 .acquire()
1109 .await
1110 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
1111
1112 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
1113 .await?;
1114
1115 let TempDestination = destination.with_extension("tmp");
1117
1118 let mut ExistingSize:u64 = 0;
1120
1121 if TempDestination.exists() {
1122 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
1123 ExistingSize = metadata.len();
1124
1125 dev_log!("update", "[DownloadManager] Resuming download from {} bytes", ExistingSize);
1126 }
1127 }
1128
1129 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
1131
1132 if ExistingSize > 0 {
1133 let RangeHeader = format!("bytes={}-", ExistingSize);
1134
1135 req = req.header(reqwest::header::RANGE, RangeHeader);
1136
1137 req = req.header(reqwest::header::IF_MATCH, "*"); }
1139
1140 let response = req
1141 .send()
1142 .await
1143 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
1144
1145 let FinalUrl = response.url().clone();
1147
1148 let response = if FinalUrl.as_str() != url {
1149 dev_log!("update", "[DownloadManager] Redirected to: {}", FinalUrl);
1150
1151 response
1152 } else {
1153 response
1154 };
1155
1156 let StatusCode = response.status();
1158
1159 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
1160 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
1161 }
1162
1163 let TotalSize = if let Some(cl) = response.content_length() {
1165 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
1166 cl + ExistingSize
1167 } else {
1168 cl
1169 }
1170 } else {
1171 0
1172 };
1173
1174 if let Some(max_size) = config.MaxFileSize {
1176 if TotalSize > 0 && TotalSize > max_size {
1177 return Err(AirError::Network(format!(
1178 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
1179 TotalSize, max_size
1180 )));
1181 }
1182 }
1183
1184 let mut file = tokio::fs::OpenOptions::new()
1186 .create(true)
1187 .append(true)
1188 .open(&TempDestination)
1189 .await
1190 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
1191
1192 use tokio::io::AsyncWriteExt;
1193 use futures_util::StreamExt;
1194
1195 let mut downloaded = ExistingSize;
1196
1197 let mut LastProgressUpdate = Instant::now();
1198
1199 let BytesStream = response.bytes_stream();
1200
1201 tokio::pin!(BytesStream);
1202
1203 while let Some(result) = BytesStream.next().await {
1204 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1206 match status.status {
1207 DownloadState::Cancelled => {
1208 let _ = tokio::fs::remove_file(&TempDestination).await;
1210
1211 return Err(AirError::Network("Download cancelled".to_string()));
1212 },
1213
1214 DownloadState::Paused => {
1215 loop {
1217 tokio::time::sleep(Duration::from_millis(250)).await;
1218
1219 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
1220 match s.status {
1221 DownloadState::Paused => continue,
1222
1223 DownloadState::Cancelled => {
1224 let _ = tokio::fs::remove_file(&TempDestination).await;
1225
1226 return Err(AirError::Network("Download cancelled".to_string()));
1227 },
1228
1229 _ => {
1230 dev_log!(
1231 "update",
1232 "[DownloadManager] Resuming paused download [ID: {}]",
1233 DownloadId
1234 );
1235
1236 break;
1237 },
1238 }
1239 } else {
1240 break;
1241 }
1242 }
1243 },
1244
1245 _ => {},
1246 }
1247 }
1248
1249 match result {
1250 Ok(chunk) => {
1251 let ChunkSize = chunk.len();
1253
1254 {
1255 let mut bucket = self.TokenBucket.write().await;
1256
1257 if let Err(e) = bucket.consume(ChunkSize as u64).await {
1258 dev_log!(
1259 "update",
1260 "warn: [DownloadManager] Bandwidth throttling error: {}, continuing anyway",
1261 e
1262 );
1263 }
1264 }
1265
1266 file.write_all(&chunk)
1267 .await
1268 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
1269
1270 downloaded += ChunkSize as u64;
1271
1272 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
1274 LastProgressUpdate = Instant::now();
1275
1276 if TotalSize > 0 {
1277 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
1278
1279 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
1280 .await?;
1281 }
1282
1283 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
1285
1286 self.UpdateDownloadRate(DownloadId, rate).await;
1287 }
1288 },
1289
1290 Err(e) => {
1291 if e.is_timeout() || e.is_connect() {
1293 dev_log!("update", "warn: [DownloadManager] Connection/timeout error, may retry: {}", e);
1294
1295 return Err(AirError::Network(format!("Network error: {}", e)));
1296 }
1297
1298 return Err(AirError::Network(format!("Failed to read response: {}", e)));
1299 },
1300 }
1301 }
1302
1303 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
1305 .await?;
1306
1307 file.flush()
1309 .await
1310 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1311
1312 tokio::fs::rename(&TempDestination, destination)
1314 .await
1315 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
1316
1317 let checksum = self.CalculateChecksum(destination).await?;
1319
1320 self.UpdateActualChecksum(DownloadId, &checksum).await;
1322
1323 Ok(DownloadResult {
1324 path:destination.to_string_lossy().to_string(),
1325 size:downloaded,
1326 checksum,
1327 duration:Duration::from_secs(0),
1328 AverageRate:0,
1329 })
1330 }
1331
1332 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
1334 if !FilePath.exists() {
1336 return Err(AirError::FileSystem(format!(
1337 "File not found for checksum verification: {}",
1338 FilePath.display()
1339 )));
1340 }
1341
1342 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
1343
1344 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
1346
1347 let NormalizedActual = ActualChecksum.trim().to_lowercase();
1348
1349 if NormalizedActual != NormalizedExpected {
1350 dev_log!(
1351 "update",
1352 "error: [DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1353 FilePath.display(),
1354 NormalizedExpected,
1355 NormalizedActual
1356 );
1357
1358 return Err(AirError::Network(format!(
1359 "Checksum verification failed: expected {}, got {}",
1360 NormalizedExpected, NormalizedActual
1361 )));
1362 }
1363
1364 dev_log!("update", "[DownloadManager] Checksum verified for file: {}", FilePath.display());
1365
1366 Ok(())
1367 }
1368
1369 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1371 if !FilePath.exists() {
1373 return Err(AirError::FileSystem(format!(
1374 "File not found for checksum calculation: {}",
1375 FilePath.display()
1376 )));
1377 }
1378
1379 self.ChecksumVerifier.CalculateSha256(FilePath).await
1380 }
1381
1382 async fn RegisterDownload(
1384 &self,
1385
1386 DownloadId:&str,
1387
1388 url:&str,
1389
1390 destination:&PathBuf,
1391
1392 ExpectedChecksum:Option<String>,
1393 ) -> Result<()> {
1394 let mut downloads = self.ActiveDownloads.write().await;
1395
1396 let mut stats = self.statistics.write().await;
1397
1398 stats.ActiveDownloads += 1;
1399
1400 downloads.insert(
1401 DownloadId.to_string(),
1402 DownloadStatus {
1403 DownloadId:DownloadId.to_string(),
1404 url:url.to_string(),
1405 destination:destination.clone(),
1406 TotalSize:0,
1407 downloaded:0,
1408 progress:0.0,
1409 status:DownloadState::Pending,
1410 error:None,
1411 StartedAt:Some(chrono::Utc::now()),
1412 CompletedAt:None,
1413 ChunksCompleted:0,
1414 TotalChunks:1,
1415 DownloadRateBytesPerSec:0,
1416 ExpectedChecksum:ExpectedChecksum.clone(),
1417 ActualChecksum:None,
1418 },
1419 );
1420
1421 Ok(())
1422 }
1423
1424 async fn UpdateDownloadStatus(
1426 &self,
1427
1428 DownloadId:&str,
1429
1430 status:DownloadState,
1431
1432 progress:Option<f32>,
1433
1434 error:Option<String>,
1435 ) -> Result<()> {
1436 let mut downloads = self.ActiveDownloads.write().await;
1437
1438 if let Some(download) = downloads.get_mut(DownloadId) {
1439 if status == DownloadState::Completed || status == DownloadState::Failed {
1440 download.CompletedAt = Some(chrono::Utc::now());
1441 }
1442
1443 download.status = status;
1444
1445 if let Some(progress) = progress {
1446 download.progress = progress;
1447 }
1448
1449 download.error = error;
1450 }
1451
1452 Ok(())
1453 }
1454
1455 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1457 let mut downloads = self.ActiveDownloads.write().await;
1458
1459 if let Some(download) = downloads.get_mut(DownloadId) {
1460 download.DownloadRateBytesPerSec = rate;
1461 }
1462 }
1463
1464 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1466 let mut downloads = self.ActiveDownloads.write().await;
1467
1468 if let Some(download) = downloads.get_mut(DownloadId) {
1469 download.ActualChecksum = Some(checksum.to_string());
1470 }
1471 }
1472
1473 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1475 let downloads = self.ActiveDownloads.read().await;
1476
1477 if let Some(download) = downloads.get(DownloadId) {
1478 if let Some(StartedAt) = download.StartedAt {
1479 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1480
1481 let ElapsedSecs = elapsed.num_seconds() as u64;
1482
1483 if ElapsedSecs > 0 {
1484 return CurrentBytes / ElapsedSecs;
1485 }
1486 }
1487 }
1488
1489 0
1490 }
1491
1492 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1494 let mut stats = self.statistics.write().await;
1495
1496 if success {
1497 stats.SuccessfulDownloads += 1;
1498
1499 stats.TotalBytesDownloaded += bytes;
1500
1501 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1502
1503 if stats.TotalDownloadTimeSecs > 0.0 {
1504 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1505 }
1506
1507 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1509 (bytes as f64 / duration.as_secs_f64()) as u64
1510 } else {
1511 0
1512 };
1513
1514 if CurrentRate > stats.PeakDownloadRate {
1515 stats.PeakDownloadRate = CurrentRate;
1516 }
1517 } else {
1518 stats.FailedDownloads += 1;
1519 }
1520
1521 stats.TotalDownloads += 1;
1522
1523 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1524 }
1525
1526 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1528 let downloads = self.ActiveDownloads.read().await;
1529
1530 downloads.get(DownloadId).cloned()
1531 }
1532
1533 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1535 let downloads = self.ActiveDownloads.read().await;
1536
1537 downloads.values().cloned().collect()
1538 }
1539
1540 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1542 dev_log!("update", "[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1543
1544 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1545 .await?;
1546
1547 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1549 let TempPath = status.destination.with_extension("tmp");
1550
1551 if TempPath.exists() {
1552 let _ = tokio::fs::remove_file(&TempPath).await;
1553 }
1554 }
1555
1556 {
1558 let mut stats = self.statistics.write().await;
1559
1560 stats.CancelledDownloads += 1;
1561
1562 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1563 }
1564
1565 Ok(())
1566 }
1567
1568 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1570 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1571
1572 dev_log!("update", "[DownloadManager] Download paused [ID: {}]", DownloadId);
1573
1574 Ok(())
1575 }
1576
1577 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1579 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1580 if status.status == DownloadState::Paused {
1581 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1582 .await?;
1583
1584 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1586 .await?;
1587
1588 dev_log!("update", "[DownloadManager] Download resumed [ID: {}]", DownloadId);
1589 } else {
1590 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1591 }
1592 } else {
1593 return Err(AirError::Network("Download not found".to_string()));
1594 }
1595
1596 Ok(())
1597 }
1598
1599 pub async fn GetActiveDownloadCount(&self) -> usize {
1601 let downloads = self.ActiveDownloads.read().await;
1602
1603 downloads
1604 .iter()
1605 .filter(|(_, s)| {
1606 matches!(
1607 s.status,
1608 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1609 )
1610 })
1611 .count()
1612 }
1613
1614 pub async fn GetStatistics(&self) -> DownloadStatistics {
1616 let stats = self.statistics.read().await;
1617
1618 stats.clone()
1619 }
1620
1621 pub async fn QueueDownload(
1623 &self,
1624
1625 url:String,
1626
1627 destination:String,
1628
1629 checksum:String,
1630
1631 priority:DownloadPriority,
1632 ) -> Result<String> {
1633 let DownloadId = Utility::GenerateRequestId();
1634
1635 let destination = if destination.is_empty() {
1636 let filename = url.split('/').last().unwrap_or("download.bin");
1637
1638 self.CacheDirectory.join(filename)
1639 } else {
1640 ConfigurationManager::ExpandPath(&destination)?
1641 };
1642
1643 let queued_download = QueuedDownload {
1644 DownloadId:DownloadId.clone(),
1645
1646 url,
1647
1648 destination,
1649
1650 checksum,
1651
1652 priority,
1653
1654 AddedAt:chrono::Utc::now(),
1655
1656 MaxFileSize:None,
1657
1658 ValidateDiskSpace:true,
1659 };
1660
1661 let mut queue = self.DownloadQueue.write().await;
1662
1663 queue.push_back(queued_download);
1664
1665 queue.make_contiguous().sort_by(|a, b| {
1667 match b.priority.cmp(&a.priority) {
1668 std::cmp::Ordering::Equal => {
1669 a.AddedAt.cmp(&b.AddedAt)
1671 },
1672 order => order,
1673 }
1674 });
1675
1676 {
1677 let mut stats = self.statistics.write().await;
1678
1679 stats.QueuedDownloads += 1;
1680 }
1681
1682 dev_log!(
1683 "update",
1684 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1685 DownloadId,
1686 priority
1687 );
1688
1689 Ok(DownloadId)
1690 }
1691
1692 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1694 let mut queue = self.DownloadQueue.write().await;
1695
1696 if let Some(queued) = queue.pop_front() {
1697 let download_id = queued.DownloadId.clone();
1698
1699 drop(queue); let config = DownloadConfig {
1702 url:queued.url.clone(),
1703
1704 destination:queued.destination.to_string_lossy().to_string(),
1705
1706 checksum:queued.checksum.clone(),
1707
1708 priority:queued.priority,
1709
1710 MaxFileSize:queued.MaxFileSize,
1711
1712 ValidateDiskSpace:queued.ValidateDiskSpace,
1713 ..Default::default()
1714 };
1715
1716 {
1717 let mut stats = self.statistics.write().await;
1718
1719 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1720 }
1721
1722 let manager = self.clone();
1724
1725 let download_id_clone = download_id.clone();
1726
1727 tokio::spawn(async move {
1728 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1729 dev_log!(
1730 "update",
1731 "error: [DownloadManager] Queued download failed [ID: {}]: {}",
1732 download_id_clone,
1733 e
1734 ); let _ = manager
1736 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1737 .await;
1738 }
1739 });
1740
1741 Ok(Some(download_id))
1742 } else {
1743 Ok(None)
1744 }
1745 }
1746
1747 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1749 let manager = self.clone();
1750
1751 let handle = tokio::spawn(async move {
1752 manager.BackgroundTaskLoop().await;
1753 });
1754
1755 dev_log!("update", "[DownloadManager] Background tasks started");
1756
1757 Ok(handle)
1758 }
1759
1760 async fn BackgroundTaskLoop(&self) {
1762 let mut interval = tokio::time::interval(Duration::from_secs(60));
1763
1764 loop {
1765 interval.tick().await;
1766
1767 if let Err(e) = self.ProcessQueue().await {
1769 dev_log!("update", "error: [DownloadManager] Queue processing error: {}", e);
1770 }
1771
1772 self.CleanupCompletedDownloads().await;
1774
1775 if let Err(e) = self.CleanupCache().await {
1777 dev_log!("update", "error: [DownloadManager] Cache cleanup failed: {}", e);
1778 }
1779 }
1780 }
1781
1782 async fn CleanupCompletedDownloads(&self) {
1784 let mut downloads = self.ActiveDownloads.write().await;
1785
1786 let mut cleaned_count = 0;
1787
1788 downloads.retain(|_, download| {
1789 let is_final = matches!(
1790 download.status,
1791 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1792 );
1793 if is_final {
1794 cleaned_count += 1;
1795 }
1796 !is_final
1797 });
1798
1799 if cleaned_count > 0 {
1800 dev_log!("update", "[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1801 }
1802 }
1803
1804 async fn CleanupCache(&self) -> Result<()> {
1806 let max_age_days = 7;
1807
1808 let now = chrono::Utc::now();
1809
1810 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1811 .await
1812 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1813
1814 let mut cleaned_count = 0;
1815
1816 while let Some(entry) = entries
1817 .next_entry()
1818 .await
1819 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1820 {
1821 let metadata = entry
1822 .metadata()
1823 .await
1824 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1825
1826 if metadata.is_file() {
1827 let path = entry.path();
1828
1829 let IsActive = {
1831 let downloads = self.ActiveDownloads.read().await;
1832
1833 downloads.values().any(|d| d.destination == path)
1834 };
1835
1836 if IsActive {
1837 continue;
1838 }
1839
1840 let modified = metadata
1841 .modified()
1842 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1843
1844 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1845
1846 let age = now.signed_duration_since(modified_time);
1847
1848 if age.num_days() > max_age_days {
1849 match tokio::fs::remove_file(&path).await {
1850 Ok(_) => {
1851 dev_log!(
1852 "update",
1853 "[DownloadManager] Removed old cache file: {}",
1854 entry.file_name().to_string_lossy()
1855 );
1856
1857 cleaned_count += 1;
1858 },
1859
1860 Err(e) => {
1861 dev_log!(
1862 "update",
1863 "warn: [DownloadManager] Failed to remove cache file {}: {}",
1864 entry.file_name().to_string_lossy(),
1865 e
1866 );
1867 },
1868 }
1869 }
1870 }
1871 }
1872
1873 if cleaned_count > 0 {
1874 dev_log!("update", "[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1875 }
1876
1877 Ok(())
1878 }
1879
1880 pub async fn StopBackgroundTasks(&self) {
1882 dev_log!("update", "[DownloadManager] Stopping background tasks");
1883
1884 let ids_to_cancel:Vec<String> = {
1886 let downloads = self.ActiveDownloads.read().await;
1887
1888 downloads
1889 .iter()
1890 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1891 .map(|(id, _)| id.clone())
1892 .collect()
1893 };
1894
1895 for id in ids_to_cancel {
1897 let _ = self.CancelDownload(&id).await;
1898 }
1899
1900 let _ = self
1902 .AppState
1903 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1904 .await;
1905 }
1906
1907 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1920 let bytes_per_sec = (mb_per_sec.max(1).min(1000) * 1024 * 1024) as u64;
1921
1922 {
1924 let mut bucket = self.TokenBucket.write().await;
1925
1926 bucket.set_rate(bytes_per_sec);
1927 }
1928
1929 let permits = mb_per_sec.max(1).min(1000);
1931
1932 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1933
1934 dev_log!(
1935 "update",
1936 "[DownloadManager] Bandwidth limit set to {} MB/s ({} bytes/s)",
1937 mb_per_sec,
1938 bytes_per_sec
1939 );
1940 }
1941
1942 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1946 let permits = max.max(1).min(20);
1947
1948 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1949
1950 dev_log!("update", "[DownloadManager] Max concurrent downloads set to {}", max);
1951 }
1952}
1953
1954impl Clone for DownloadManager {
1955 fn clone(&self) -> Self {
1956 Self {
1957 AppState:self.AppState.clone(),
1958
1959 ActiveDownloads:self.ActiveDownloads.clone(),
1960
1961 DownloadQueue:self.DownloadQueue.clone(),
1962
1963 CacheDirectory:self.CacheDirectory.clone(),
1964
1965 client:self.client.clone(),
1966
1967 ChecksumVerifier:self.ChecksumVerifier.clone(),
1968
1969 BandwidthLimiter:self.BandwidthLimiter.clone(),
1970
1971 TokenBucket:self.TokenBucket.clone(),
1972
1973 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1974
1975 statistics:self.statistics.clone(),
1976 }
1977 }
1978}
1979
1980impl Default for DownloadStatistics {
1981 fn default() -> Self {
1982 Self {
1983 TotalDownloads:0,
1984
1985 SuccessfulDownloads:0,
1986
1987 FailedDownloads:0,
1988
1989 CancelledDownloads:0,
1990
1991 TotalBytesDownloaded:0,
1992
1993 TotalDownloadTimeSecs:0.0,
1994
1995 AverageDownloadRate:0.0,
1996
1997 PeakDownloadRate:0,
1998
1999 ActiveDownloads:0,
2000
2001 QueuedDownloads:0,
2002 }
2003 }
2004}
2005
2006fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
2008 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
2009}
2010
2011#[derive(Debug, Clone)]
2013struct ChunkInfo {
2014 start:u64,
2015
2016 end:u64,
2017
2018 #[allow(dead_code)]
2019 downloaded:u64,
2020
2021 temp_path:PathBuf,
2022}
2023
2024#[derive(Debug)]
2026#[allow(dead_code)]
2027struct ParallelDownloadResult {
2028 chunks:Vec<ChunkInfo>,
2029
2030 total_size:u64,
2031}
2032
2033impl DownloadManager {
2070 pub async fn DownloadFileWithChunks(
2080 &self,
2081
2082 url:String,
2083
2084 destination:String,
2085
2086 checksum:String,
2087
2088 chunk_size_mb:usize,
2089 ) -> Result<DownloadResult> {
2090 dev_log!(
2091 "update",
2092 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
2093 url,
2094 chunk_size_mb
2095 );
2096
2097 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
2099
2100 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
2102
2103 dev_log!("update", "[DownloadManager] Remote file size: {} bytes", total_size);
2104
2105 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
2108 dev_log!(
2109 "update",
2110 "[DownloadManager] File too small for chunked download, using normal download"
2111 );
2112
2113 return self.DownloadFile(url, destination, checksum).await;
2114 }
2115
2116 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
2118
2119 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
2120
2121 let num_concurrent = num_chunks.min(4); dev_log!(
2124 "update",
2125 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
2126 num_chunks,
2127 num_concurrent
2128 );
2129
2130 let DownloadId = Utility::GenerateRequestId();
2131
2132 let DestinationPath = if destination.is_empty() {
2133 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
2134
2135 self.CacheDirectory.join(filename)
2136 } else {
2137 ConfigurationManager::ExpandPath(&destination)?
2138 };
2139
2140 let temp_dir = DestinationPath.with_extension("chunks");
2142
2143 tokio::fs::create_dir_all(&temp_dir)
2144 .await
2145 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
2146
2147 let mut chunks = Vec::with_capacity(num_chunks);
2149
2150 for i in 0..num_chunks {
2151 let start = (i as u64) * chunk_size;
2152
2153 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
2154
2155 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
2156 }
2157
2158 let downloaded_tracker = Arc::new(RwLock::new(0u64));
2160
2161 let completed_tracker = Arc::new(RwLock::new(0usize));
2162
2163 let mut handles = Vec::new();
2165
2166 for (i, chunk) in chunks.iter().enumerate() {
2167 let manager = self.clone();
2168
2169 let url_clone = sanitized_url.clone();
2170
2171 let chunk_clone = chunk.clone();
2172
2173 let downloaded_tracker = downloaded_tracker.clone();
2174
2175 let completed_tracker = completed_tracker.clone();
2176
2177 let _Did = DownloadId.clone();
2178
2179 let handle = tokio::spawn(async move {
2180 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
2181
2182 {
2184 let mut downloaded = downloaded_tracker.write().await;
2185 let mut completed = completed_tracker.write().await;
2186 *downloaded += chunk_clone.end - chunk_clone.start + 1;
2187 *completed += 1;
2188
2189 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
2190 dev_log!(
2191 "update",
2192 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
2193 i + 1,
2194 *completed,
2195 num_chunks,
2196 progress
2197 );
2198 }
2199
2200 Ok::<_, AirError>(())
2201 });
2202
2203 if (i + 1) % num_concurrent == 0 {
2205 for handle in handles.drain(..) {
2206 handle.await??;
2207 }
2208 }
2209
2210 handles.push(handle);
2211 }
2212
2213 for handle in handles {
2215 handle.await??;
2216 }
2217
2218 dev_log!("update", "[DownloadManager] Reassembling chunks into final file");
2220
2221 self.ReassembleChunks(&chunks, &DestinationPath).await?;
2222
2223 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
2225 dev_log!("update", "warn: [DownloadManager] Failed to clean up temp directory: {}", e);
2226 AirError::FileSystem(e.to_string())
2227 })?;
2228
2229 if !checksum.is_empty() {
2231 self.VerifyChecksum(&DestinationPath, &checksum).await?;
2232 }
2233
2234 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
2235
2236 dev_log!("update", "[DownloadManager] Chunked download completed successfully");
2237
2238 Ok(DownloadResult {
2239 path:DestinationPath.to_string_lossy().to_string(),
2240 size:total_size,
2241 checksum:actual_checksum,
2242 duration:Duration::from_secs(0),
2243 AverageRate:0,
2244 })
2245 }
2246
2247 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
2249 let response = self
2250 .client
2251 .head(url)
2252 .timeout(Duration::from_secs(30))
2253 .send()
2254 .await
2255 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
2256
2257 if !response.status().is_success() {
2258 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
2259 }
2260
2261 response
2262 .content_length()
2263 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
2264 }
2265
2266 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
2268 dev_log!(
2269 "update",
2270 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
2271 chunk_index,
2272 chunk.start,
2273 chunk.end
2274 );
2275
2276 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
2277
2278 let response = self
2279 .client
2280 .get(url)
2281 .header(reqwest::header::RANGE, range_header)
2282 .timeout(Duration::from_secs(300))
2283 .send()
2284 .await
2285 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
2286
2287 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
2288 return Err(AirError::Network(format!(
2289 "Chunk download failed with status: {}",
2290 response.status()
2291 )));
2292 }
2293
2294 let bytes = response
2296 .bytes()
2297 .await
2298 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
2299
2300 tokio::fs::write(&chunk.temp_path, &bytes)
2301 .await
2302 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
2303
2304 dev_log!(
2305 "update",
2306 "[DownloadManager] Chunk {} downloaded: {} bytes",
2307 chunk_index,
2308 bytes.len()
2309 );
2310
2311 Ok(())
2312 }
2313
2314 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
2316 use tokio::io::AsyncWriteExt;
2317
2318 let mut file = tokio::fs::File::create(destination)
2319 .await
2320 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
2321
2322 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
2324
2325 sorted_chunks.sort_by_key(|c| c.start);
2326
2327 for chunk in sorted_chunks {
2328 let contents = tokio::fs::read(&chunk.temp_path)
2329 .await
2330 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
2331
2332 file.write_all(&contents)
2333 .await
2334 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
2335
2336 dev_log!(
2337 "update",
2338 "[DownloadManager] Reassembled chunk (bytes {}-{})",
2339 chunk.start,
2340 chunk.end
2341 );
2342 }
2343
2344 file.flush()
2345 .await
2346 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
2347
2348 dev_log!("update", "[DownloadManager] All chunks reassembled successfully");
2349
2350 Ok(())
2351 }
2352}