""" File-centric download worker. Processes individual file downloads from the downloads table. Architecture: - Each download record = 1 file - Worker picks up pending files and downloads them one by one - Progress tracked per-file, not per-batch - Uses aria2c for fast multi-connection downloads """ import time import logging import signal import re from datetime import datetime from typing import Optional, Dict, Any, List import os from concurrent.futures import ThreadPoolExecutor, as_completed from .services import downloads_service from .services import nas_service from .services import mongodb_service from .common import get_download_filename logger = logging.getLogger(__name__) # Worker configuration - Load from environment variables POLL_INTERVAL = int(os.getenv('WORKER_POLL_INTERVAL', '3') ) # seconds (min interval) # seconds (max when idle) POLL_INTERVAL_MAX = int(os.getenv('WORKER_POLL_INTERVAL_MAX', '30')) # Process up to N files concurrently MAX_CONCURRENT_DOWNLOADS = int( os.getenv('WORKER_MAX_CONCURRENT_DOWNLOADS', '5')) BACKGROUND_DOWNLOAD_MAX_SPEED = os.getenv( # Bandwidth limit for background queue 'BACKGROUND_DOWNLOAD_MAX_SPEED', '100K') _shutdown_requested = False def natural_sort_key(text: str) -> List: """ Generate key for natural sorting (e.g., file1, file2, file10, not file1, file10, file2). Args: text: String to generate sort key for Returns: List of mixed strings and integers for natural sorting """ def convert(part): return int(part) if part.isdigit() else part.lower() return [convert(c) for c in re.split('([0-9]+)', text)] def signal_handler(sig, frame): """Handle graceful shutdown on SIGINT/SIGTERM.""" global _shutdown_requested logger.debug(f"Received signal {sig}, initiating graceful shutdown...") _shutdown_requested = True signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def process_file_download(download: Dict[str, Any], sid: str, max_speed: Optional[str] = None) -> bool: """ Process a single file download. Args: download: Download record from database sid: NAS session ID for authentication max_speed: Optional bandwidth limit (e.g., '100K') for throttling Returns: True if successful, False if failed """ download_id = download["id"] file_name = download["file_name"] file_path = download["file_path"] ge_id = download["ge_id"] lang = download["lang"] mode = download["mode"] # IMPORTANT: Uppercase lang code for folder naming (1000_DE, not 1000_de) lang_upper = lang.upper() logger.debug( f"[Download {download_id}] Processing: {file_name} (mode: {mode}, GE={ge_id}, lang={lang_upper})") try: # Update to downloading status downloads_service.update_download_status( download_id=download_id, status='downloading', progress_percent=0.0 ) # Determine source and destination paths if mode == 'api': # API mode: Download from NAS FileStation source_path = file_path # Relative path in NAS base_dest = nas_service.DESTINATION_PATH # Get final filename (adds .zip for folders automatically) dest_filename = get_download_filename(file_path, file_name) # Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip) dest_path = os.path.join( base_dest, f"{ge_id}_{lang_upper}", dest_filename) # Get MongoDB path mongodb_path = mongodb_service.get_path_from_tms_data(ge_id, lang) # Download using NAS service success, final_path, error_msg = _download_api_file( download_id=download_id, source_path=source_path, dest_path=dest_path, file_name=file_name, sid=sid, max_speed=max_speed ) elif mode == 'sharing': # Sharing mode: Download from sharing link sharing_id = download.get('sharing_id') if not sharing_id: raise Exception("Missing sharing_id for sharing mode download") base_dest = nas_service.DESTINATION_PATH # Get final filename (adds .zip for folders automatically) dest_filename = get_download_filename(file_path, file_name) # Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip) dest_path = os.path.join( base_dest, f"{ge_id}_{lang_upper}", dest_filename) # Download using sharing service success, final_path, error_msg = _download_sharing_file( download_id=download_id, sharing_id=sharing_id, file_path=file_path, dest_path=dest_path, file_name=file_name, max_speed=max_speed ) mongodb_path = None else: raise Exception(f"Unknown download mode: {mode}") # Update final status if success: downloads_service.update_download_status( download_id=download_id, status='completed', progress_percent=100.0, destination_path=final_path ) logger.debug(f"[Download {download_id}] ✅ Completed: {file_name}") return True else: downloads_service.update_download_status( download_id=download_id, status='failed', error_message=error_msg ) logger.error(f"[Download {download_id}] ❌ Failed: {error_msg}") return False except Exception as e: error_msg = str(e) logger.error( f"[Download {download_id}] Exception: {error_msg}", exc_info=True) downloads_service.update_download_status( download_id=download_id, status='failed', error_message=error_msg ) return False def _download_api_file( download_id: int, source_path: str, dest_path: str, file_name: str, sid: str, max_speed: Optional[str] = None ) -> tuple[bool, Optional[str], Optional[str]]: """ Download a single file via NAS FileStation API. Args: max_speed: Optional bandwidth limit (e.g., '100K') Returns: (success, final_path, error_message) """ try: # Progress callback def progress_callback(downloaded: int, total: int): # Always update downloaded_size, even if total is unknown (folders) downloads_service.update_download_status( download_id=download_id, status='downloading', progress_percent=round( (downloaded / total) * 100, 2) if total > 0 else None, downloaded_size=downloaded, file_size=total if total > 0 else None ) # Create destination directory os.makedirs(os.path.dirname(dest_path), exist_ok=True) # Download file using NAS API with aria2 success, error_msg, gid = nas_service.download_single_file_aria2( sid=sid, remote_path=source_path, local_save_path=dest_path, is_dir=False, progress_callback=progress_callback, max_speed=max_speed ) # Save GID to database for cancellation support if gid: downloads_service.update_download_status( download_id=download_id, status='downloading', aria2_gid=gid ) if success: return True, dest_path, None else: return False, None, error_msg or "Download failed" except Exception as e: return False, None, str(e) def _download_sharing_file( download_id: int, sharing_id: str, file_path: str, dest_path: str, file_name: str, max_speed: Optional[str] = None ) -> tuple[bool, Optional[str], Optional[str]]: """ Download a single file from sharing link. Args: max_speed: Optional bandwidth limit (e.g., '100K') Returns: (success, final_path, error_message) """ try: from .services import nas_sharing_service # Progress callback def progress_callback(downloaded: int, total: int): # Always update downloaded_size, even if total is unknown (folders) downloads_service.update_download_status( download_id=download_id, status='downloading', progress_percent=round( (downloaded / total) * 100, 2) if total > 0 else None, downloaded_size=downloaded, file_size=total if total > 0 else None ) # Create destination directory os.makedirs(os.path.dirname(dest_path), exist_ok=True) # Get sharing worker worker = nas_sharing_service.get_sharing_worker() if not worker: return False, None, "Sharing worker not available" # Determine if this is a folder (check file extension) is_folder = not bool(os.path.splitext(file_path)[1]) # Download file from sharing link using aria2 result = nas_sharing_service.download_file( sharing_id=sharing_id, file_path=file_path, save_path=dest_path, is_folder=is_folder, progress_callback=progress_callback, max_speed=max_speed ) # ✅ Save GID to database for cancellation support (same as API mode) gid = result.get('aria2_gid') if gid: downloads_service.update_download_status( download_id=download_id, status='downloading', aria2_gid=gid ) if result['status'] == 'success': logger.debug( f"[Sharing Download {download_id}] ✅ Downloaded: {file_name}") return True, result['save_path'], None else: logger.error( f"[Sharing Download {download_id}] ❌ Failed: {result['message']}") return False, None, result['message'] except Exception as e: logger.error( f"[Sharing Download {download_id}] Exception: {str(e)}", exc_info=True) return False, None, str(e) def recover_orphaned_downloads(): """ Recovery logic: Find downloads stuck in 'downloading' state. This happens when: - Server crashed while downloading - Worker was killed - Aria2 task completed but DB wasn't updated For each orphaned download: - Check if aria2 GID is still active - If not active -> mark as failed """ logger.debug("🔍 Checking for orphaned downloads...") try: # Get all downloads stuck in 'downloading' state orphaned = downloads_service.get_all_downloads( status='downloading', limit=1000) if not orphaned: logger.debug("No orphaned downloads found") return logger.debug(f"Found {len(orphaned)} downloads in 'downloading' state") # Try to get aria2 manager try: from .services.aria2.download_manager import get_aria2_manager manager = get_aria2_manager() except Exception as e: logger.warning(f"Could not get aria2 manager: {e}") manager = None recovered_count = 0 failed_count = 0 for download in orphaned: gid = download.get('aria2_gid') download_id = download['id'] file_name = download.get('file_name', 'unknown') # Check if aria2 task is still active is_active = False if gid and manager: try: status = manager.get_status(gid) is_active = status.get('status') in [ 'active', 'waiting', 'paused'] except Exception as e: logger.debug(f"Could not get status for GID {gid}: {e}") if is_active: # Aria2 is still downloading - let worker handle it logger.debug( f"♻️ Download {download_id} ({file_name}) is still active in aria2") recovered_count += 1 else: # Aria2 task doesn't exist - mark as failed logger.warning( f"❌ Download {download_id} ({file_name}) has no active aria2 task") downloads_service.update_download_status( download_id=download_id, status='failed', error_message='Download was interrupted (server crash or restart)' ) failed_count += 1 logger.debug( f"✅ Recovery complete: {recovered_count} recovered, {failed_count} marked as failed") except Exception as e: logger.error( f"Error during orphaned downloads recovery: {e}", exc_info=True) def worker_loop(): """ Main worker loop. Continuously polls for pending downloads and processes them. LOGIC: - API mode: Tải ngay lập tức, không giới hạn số lượng - Sharing mode: * Main queue (first MAX_CONCURRENT_DOWNLOADS): Full speed, ưu tiên slots * Background queue (rest): Throttled, LUÔN TẢI SONG SONG để keep link alive """ logger.debug("🚀 File Download Worker started") logger.debug(f" - Poll interval: {POLL_INTERVAL}s") logger.debug( f" - Max concurrent (Sharing only): {MAX_CONCURRENT_DOWNLOADS}") logger.debug( f" - Background speed (Sharing only): {BACKGROUND_DOWNLOAD_MAX_SPEED}") # Recovery: Check for orphaned downloads from previous crashes recover_orphaned_downloads() # Get NAS session - use saved SID sid = nas_service.load_sid() if not sid: logger.error("❌ No NAS session found. Please login via OTP first.") return logger.debug(f"✅ Loaded NAS session (SID: {sid[:20]}...)") # Create thread pool - NO LIMIT (API mode needs unlimited parallelism) # Sharing mode will self-limit via queue logic executor = ThreadPoolExecutor(max_workers=50, thread_name_prefix="DL-") active_futures = {} # Map future -> download_id # Adaptive polling: start with min interval, increase when idle current_poll_interval = POLL_INTERVAL while not _shutdown_requested: try: # Get active downloads (pending or downloading) active_downloads = downloads_service.get_active_downloads() # Separate API and Sharing downloads api_downloads = [d for d in active_downloads if d['mode'] == 'api'] sharing_downloads = [ d for d in active_downloads if d['mode'] == 'sharing'] # ========== API MODE: IMMEDIATE DOWNLOAD (NO QUEUE) ========== api_pending = [ d for d in api_downloads if d['status'] == 'pending'] for download in api_pending: try: future = executor.submit( process_file_download, download, sid, max_speed=None # API mode: Always full speed ) active_futures[future] = download['id'] logger.debug( f"🚀 [API] Started download {download['id']}: {download['file_name']}") except Exception as e: logger.error( f"Error submitting API download {download['id']}: {e}") # ========== SHARING MODE: QUEUE-BASED WITH THROTTLING ========== # Get pending sharing downloads sharing_pending = [ d for d in sharing_downloads if d['status'] == 'pending'] if sharing_pending: # Natural sort by file name within each batch from collections import defaultdict batches = defaultdict(list) for d in sharing_pending: batch_id = d.get('batch_id', 'default') batches[batch_id].append(d) # Sort each batch naturally and flatten sorted_pending = [] for batch_files in batches.values(): sorted_batch = sorted( batch_files, key=lambda x: natural_sort_key(x['file_name'])) sorted_pending.extend(sorted_batch) # Split into main queue (first N) and background queue (rest) main_queue = sorted_pending[:MAX_CONCURRENT_DOWNLOADS] background_queue = sorted_pending[MAX_CONCURRENT_DOWNLOADS:] # Count currently downloading sharing files (for slot calculation) sharing_downloading_count = len( [d for d in sharing_downloads if d['status'] == 'downloading']) available_slots = MAX_CONCURRENT_DOWNLOADS - sharing_downloading_count # Submit main queue files (full speed, use available slots) if available_slots > 0: for download in main_queue[:available_slots]: try: future = executor.submit( process_file_download, download, sid, max_speed=None # Full speed ) active_futures[future] = download['id'] logger.debug( f"🚀 [SHARING-MAIN] Started download {download['id']}: {download['file_name']}") except Exception as e: logger.error( f"Error submitting sharing download {download['id']}: {e}") # Submit background queue files (throttled, ALWAYS to keep links alive) # Background files KHÔNG CẦN SLOTS - tải song song với tốc độ thấp for download in background_queue: # Check if already downloading if download['status'] == 'pending': try: future = executor.submit( process_file_download, download, sid, max_speed=BACKGROUND_DOWNLOAD_MAX_SPEED # Throttled ) active_futures[future] = download['id'] logger.debug( f"🐢 [SHARING-BACKGROUND] Started download {download['id']}: {download['file_name']} (limited to {BACKGROUND_DOWNLOAD_MAX_SPEED})") except Exception as e: logger.error( f"Error submitting background download {download['id']}: {e}") # Check for completed futures (non-blocking) done_futures = [f for f in active_futures if f.done()] for future in done_futures: download_id = active_futures.pop(future) try: success = future.result() if success: logger.debug( f"✅ Thread completed download {download_id}") else: logger.warning( f"⚠️ Thread failed download {download_id}") except Exception as e: logger.error( f"❌ Thread exception for download {download_id}: {e}") # Log status if active_downloads: api_pending_count = len( [d for d in api_downloads if d['status'] == 'pending']) api_downloading_count = len( [d for d in api_downloads if d['status'] == 'downloading']) sharing_pending_count = len( [d for d in sharing_downloads if d['status'] == 'pending']) sharing_downloading_count = len( [d for d in sharing_downloads if d['status'] == 'downloading']) logger.debug( f"📊 API: {len(api_downloads)} (Downloading: {api_downloading_count}, Pending: {api_pending_count}) | " f"Sharing: {len(sharing_downloads)} (Downloading: {sharing_downloading_count}, Pending: {sharing_pending_count}) | " f"Threads: {len(active_futures)}" ) # Adaptive polling: fast when active, slow when idle if active_downloads or active_futures: # Has work → use minimum interval current_poll_interval = POLL_INTERVAL else: # Idle → gradually increase interval (up to max) current_poll_interval = min( current_poll_interval * 2, POLL_INTERVAL_MAX) except Exception as e: logger.error(f"Error in worker loop: {e}", exc_info=True) # On error, use default interval current_poll_interval = POLL_INTERVAL time.sleep(current_poll_interval) # Cleanup logger.debug("Shutting down thread pool...") executor.shutdown(wait=True) logger.debug("Worker shutdown complete") def start_worker(): """Start the download worker in the background.""" import threading def run(): try: worker_loop() except Exception as e: logger.error(f"Worker crashed: {e}", exc_info=True) thread = threading.Thread(target=run, daemon=True, name="DownloadWorker") thread.start() logger.debug("Download worker thread started") return thread if __name__ == "__main__": # For standalone testing logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) try: worker_loop() except KeyboardInterrupt: logger.debug("Worker stopped by user")