ge-tool/backend/worker_downloads.py
2025-12-10 13:41:43 +07:00

616 lines
23 KiB
Python
Executable File

"""
File-centric download worker.
Processes individual file downloads from the downloads table.
Architecture:
- Each download record = 1 file
- Worker picks up pending files and downloads them one by one
- Progress tracked per-file, not per-batch
- Uses aria2c for fast multi-connection downloads
"""
import time
import logging
import signal
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from .services import downloads_service
from .services import nas_service
from .services import mongodb_service
from .common import get_download_filename
logger = logging.getLogger(__name__)
# Worker configuration - Load from environment variables
POLL_INTERVAL = int(os.getenv('WORKER_POLL_INTERVAL', '3')
) # seconds (min interval)
# seconds (max when idle)
POLL_INTERVAL_MAX = int(os.getenv('WORKER_POLL_INTERVAL_MAX', '30'))
# Process up to N files concurrently
MAX_CONCURRENT_DOWNLOADS = int(
os.getenv('WORKER_MAX_CONCURRENT_DOWNLOADS', '5'))
BACKGROUND_DOWNLOAD_MAX_SPEED = os.getenv(
# Bandwidth limit for background queue
'BACKGROUND_DOWNLOAD_MAX_SPEED', '100K')
_shutdown_requested = False
def natural_sort_key(text: str) -> List:
"""
Generate key for natural sorting (e.g., file1, file2, file10, not file1, file10, file2).
Args:
text: String to generate sort key for
Returns:
List of mixed strings and integers for natural sorting
"""
def convert(part):
return int(part) if part.isdigit() else part.lower()
return [convert(c) for c in re.split('([0-9]+)', text)]
def signal_handler(sig, frame):
"""Handle graceful shutdown on SIGINT/SIGTERM."""
global _shutdown_requested
logger.debug(f"Received signal {sig}, initiating graceful shutdown...")
_shutdown_requested = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def process_file_download(download: Dict[str, Any], sid: str, max_speed: Optional[str] = None) -> bool:
"""
Process a single file download.
Args:
download: Download record from database
sid: NAS session ID for authentication
max_speed: Optional bandwidth limit (e.g., '100K') for throttling
Returns:
True if successful, False if failed
"""
download_id = download["id"]
file_name = download["file_name"]
file_path = download["file_path"]
ge_id = download["ge_id"]
lang = download["lang"]
mode = download["mode"]
# IMPORTANT: Uppercase lang code for folder naming (1000_DE, not 1000_de)
lang_upper = lang.upper()
logger.debug(
f"[Download {download_id}] Processing: {file_name} (mode: {mode}, GE={ge_id}, lang={lang_upper})")
try:
# Update to downloading status
downloads_service.update_download_status(
download_id=download_id,
status='downloading',
progress_percent=0.0
)
# Determine source and destination paths
if mode == 'api':
# API mode: Download from NAS FileStation
source_path = file_path # Relative path in NAS
base_dest = nas_service.DESTINATION_PATH
# Get final filename (adds .zip for folders automatically)
dest_filename = get_download_filename(file_path, file_name)
# Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip)
dest_path = os.path.join(
base_dest, f"{ge_id}_{lang_upper}", dest_filename)
# Get MongoDB path
mongodb_path = mongodb_service.get_path_from_tms_data(ge_id, lang)
# Download using NAS service
success, final_path, error_msg = _download_api_file(
download_id=download_id,
source_path=source_path,
dest_path=dest_path,
file_name=file_name,
sid=sid,
max_speed=max_speed
)
elif mode == 'sharing':
# Sharing mode: Download from sharing link
sharing_id = download.get('sharing_id')
if not sharing_id:
raise Exception("Missing sharing_id for sharing mode download")
base_dest = nas_service.DESTINATION_PATH
# Get final filename (adds .zip for folders automatically)
dest_filename = get_download_filename(file_path, file_name)
# Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip)
dest_path = os.path.join(
base_dest, f"{ge_id}_{lang_upper}", dest_filename)
# Download using sharing service
success, final_path, error_msg = _download_sharing_file(
download_id=download_id,
sharing_id=sharing_id,
file_path=file_path,
dest_path=dest_path,
file_name=file_name,
max_speed=max_speed
)
mongodb_path = None
else:
raise Exception(f"Unknown download mode: {mode}")
# Update final status
if success:
downloads_service.update_download_status(
download_id=download_id,
status='completed',
progress_percent=100.0,
destination_path=final_path
)
logger.debug(f"[Download {download_id}] ✅ Completed: {file_name}")
return True
else:
downloads_service.update_download_status(
download_id=download_id,
status='failed',
error_message=error_msg
)
logger.error(f"[Download {download_id}] ❌ Failed: {error_msg}")
return False
except Exception as e:
error_msg = str(e)
logger.error(
f"[Download {download_id}] Exception: {error_msg}", exc_info=True)
downloads_service.update_download_status(
download_id=download_id,
status='failed',
error_message=error_msg
)
return False
def _download_api_file(
download_id: int,
source_path: str,
dest_path: str,
file_name: str,
sid: str,
max_speed: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""
Download a single file via NAS FileStation API.
Args:
max_speed: Optional bandwidth limit (e.g., '100K')
Returns:
(success, final_path, error_message)
"""
try:
# Progress callback
def progress_callback(downloaded: int, total: int):
# Always update downloaded_size, even if total is unknown (folders)
downloads_service.update_download_status(
download_id=download_id,
status='downloading',
progress_percent=round(
(downloaded / total) * 100, 2) if total > 0 else None,
downloaded_size=downloaded,
file_size=total if total > 0 else None
)
# Create destination directory
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Download file using NAS API with aria2
success, error_msg, gid = nas_service.download_single_file_aria2(
sid=sid,
remote_path=source_path,
local_save_path=dest_path,
is_dir=False,
progress_callback=progress_callback,
max_speed=max_speed
)
# Save GID to database for cancellation support
if gid:
downloads_service.update_download_status(
download_id=download_id,
status='downloading',
aria2_gid=gid
)
if success:
return True, dest_path, None
else:
return False, None, error_msg or "Download failed"
except Exception as e:
return False, None, str(e)
def _download_sharing_file(
download_id: int,
sharing_id: str,
file_path: str,
dest_path: str,
file_name: str,
max_speed: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""
Download a single file from sharing link.
Args:
max_speed: Optional bandwidth limit (e.g., '100K')
Returns:
(success, final_path, error_message)
"""
try:
from .services import nas_sharing_service
# Progress callback
def progress_callback(downloaded: int, total: int):
# Always update downloaded_size, even if total is unknown (folders)
downloads_service.update_download_status(
download_id=download_id,
status='downloading',
progress_percent=round(
(downloaded / total) * 100, 2) if total > 0 else None,
downloaded_size=downloaded,
file_size=total if total > 0 else None
)
# Create destination directory
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Get sharing worker
worker = nas_sharing_service.get_sharing_worker()
if not worker:
return False, None, "Sharing worker not available"
# Determine if this is a folder (check file extension)
is_folder = not bool(os.path.splitext(file_path)[1])
# Download file from sharing link using aria2
result = nas_sharing_service.download_file(
sharing_id=sharing_id,
file_path=file_path,
save_path=dest_path,
is_folder=is_folder,
progress_callback=progress_callback,
max_speed=max_speed
)
# ✅ Save GID to database for cancellation support (same as API mode)
gid = result.get('aria2_gid')
if gid:
downloads_service.update_download_status(
download_id=download_id,
status='downloading',
aria2_gid=gid
)
if result['status'] == 'success':
logger.debug(
f"[Sharing Download {download_id}] ✅ Downloaded: {file_name}")
return True, result['save_path'], None
else:
logger.error(
f"[Sharing Download {download_id}] ❌ Failed: {result['message']}")
return False, None, result['message']
except Exception as e:
logger.error(
f"[Sharing Download {download_id}] Exception: {str(e)}", exc_info=True)
return False, None, str(e)
def recover_orphaned_downloads():
"""
Recovery logic: Find downloads stuck in 'downloading' state.
This happens when:
- Server crashed while downloading
- Worker was killed
- Aria2 task completed but DB wasn't updated
For each orphaned download:
- Check if aria2 GID is still active
- If not active -> mark as failed
"""
logger.debug("🔍 Checking for orphaned downloads...")
try:
# Get all downloads stuck in 'downloading' state
orphaned = downloads_service.get_all_downloads(
status='downloading', limit=1000)
if not orphaned:
logger.debug("No orphaned downloads found")
return
logger.debug(f"Found {len(orphaned)} downloads in 'downloading' state")
# Try to get aria2 manager
try:
from .services.aria2.download_manager import get_aria2_manager
manager = get_aria2_manager()
except Exception as e:
logger.warning(f"Could not get aria2 manager: {e}")
manager = None
recovered_count = 0
failed_count = 0
for download in orphaned:
gid = download.get('aria2_gid')
download_id = download['id']
file_name = download.get('file_name', 'unknown')
# Check if aria2 task is still active
is_active = False
if gid and manager:
try:
status = manager.get_status(gid)
is_active = status.get('status') in [
'active', 'waiting', 'paused']
except Exception as e:
logger.debug(f"Could not get status for GID {gid}: {e}")
if is_active:
# Aria2 is still downloading - let worker handle it
logger.debug(
f"♻️ Download {download_id} ({file_name}) is still active in aria2")
recovered_count += 1
else:
# Aria2 task doesn't exist - mark as failed
logger.warning(
f"❌ Download {download_id} ({file_name}) has no active aria2 task")
downloads_service.update_download_status(
download_id=download_id,
status='failed',
error_message='Download was interrupted (server crash or restart)'
)
failed_count += 1
logger.debug(
f"✅ Recovery complete: {recovered_count} recovered, {failed_count} marked as failed")
except Exception as e:
logger.error(
f"Error during orphaned downloads recovery: {e}", exc_info=True)
def worker_loop():
"""
Main worker loop.
Continuously polls for pending downloads and processes them.
LOGIC:
- API mode: Tải ngay lập tức, không giới hạn số lượng
- Sharing mode:
* Main queue (first MAX_CONCURRENT_DOWNLOADS): Full speed, ưu tiên slots
* Background queue (rest): Throttled, LUÔN TẢI SONG SONG để keep link alive
"""
logger.debug("🚀 File Download Worker started")
logger.debug(f" - Poll interval: {POLL_INTERVAL}s")
logger.debug(
f" - Max concurrent (Sharing only): {MAX_CONCURRENT_DOWNLOADS}")
logger.debug(
f" - Background speed (Sharing only): {BACKGROUND_DOWNLOAD_MAX_SPEED}")
# Recovery: Check for orphaned downloads from previous crashes
recover_orphaned_downloads()
# Get NAS session - use saved SID
sid = nas_service.load_sid()
if not sid:
logger.error("❌ No NAS session found. Please login via OTP first.")
return
logger.debug(f"✅ Loaded NAS session (SID: {sid[:20]}...)")
# Create thread pool - NO LIMIT (API mode needs unlimited parallelism)
# Sharing mode will self-limit via queue logic
executor = ThreadPoolExecutor(max_workers=50, thread_name_prefix="DL-")
active_futures = {} # Map future -> download_id
# Adaptive polling: start with min interval, increase when idle
current_poll_interval = POLL_INTERVAL
while not _shutdown_requested:
try:
# Get active downloads (pending or downloading)
active_downloads = downloads_service.get_active_downloads()
# Separate API and Sharing downloads
api_downloads = [d for d in active_downloads if d['mode'] == 'api']
sharing_downloads = [
d for d in active_downloads if d['mode'] == 'sharing']
# ========== API MODE: IMMEDIATE DOWNLOAD (NO QUEUE) ==========
api_pending = [
d for d in api_downloads if d['status'] == 'pending']
for download in api_pending:
try:
future = executor.submit(
process_file_download,
download,
sid,
max_speed=None # API mode: Always full speed
)
active_futures[future] = download['id']
logger.debug(
f"🚀 [API] Started download {download['id']}: {download['file_name']}")
except Exception as e:
logger.error(
f"Error submitting API download {download['id']}: {e}")
# ========== SHARING MODE: QUEUE-BASED WITH THROTTLING ==========
# Get pending sharing downloads
sharing_pending = [
d for d in sharing_downloads if d['status'] == 'pending']
if sharing_pending:
# Natural sort by file name within each batch
from collections import defaultdict
batches = defaultdict(list)
for d in sharing_pending:
batch_id = d.get('batch_id', 'default')
batches[batch_id].append(d)
# Sort each batch naturally and flatten
sorted_pending = []
for batch_files in batches.values():
sorted_batch = sorted(
batch_files, key=lambda x: natural_sort_key(x['file_name']))
sorted_pending.extend(sorted_batch)
# Split into main queue (first N) and background queue (rest)
main_queue = sorted_pending[:MAX_CONCURRENT_DOWNLOADS]
background_queue = sorted_pending[MAX_CONCURRENT_DOWNLOADS:]
# Count currently downloading sharing files (for slot calculation)
sharing_downloading_count = len(
[d for d in sharing_downloads if d['status'] == 'downloading'])
available_slots = MAX_CONCURRENT_DOWNLOADS - sharing_downloading_count
# Submit main queue files (full speed, use available slots)
if available_slots > 0:
for download in main_queue[:available_slots]:
try:
future = executor.submit(
process_file_download,
download,
sid,
max_speed=None # Full speed
)
active_futures[future] = download['id']
logger.debug(
f"🚀 [SHARING-MAIN] Started download {download['id']}: {download['file_name']}")
except Exception as e:
logger.error(
f"Error submitting sharing download {download['id']}: {e}")
# Submit background queue files (throttled, ALWAYS to keep links alive)
# Background files KHÔNG CẦN SLOTS - tải song song với tốc độ thấp
for download in background_queue:
# Check if already downloading
if download['status'] == 'pending':
try:
future = executor.submit(
process_file_download,
download,
sid,
max_speed=BACKGROUND_DOWNLOAD_MAX_SPEED # Throttled
)
active_futures[future] = download['id']
logger.debug(
f"🐢 [SHARING-BACKGROUND] Started download {download['id']}: {download['file_name']} (limited to {BACKGROUND_DOWNLOAD_MAX_SPEED})")
except Exception as e:
logger.error(
f"Error submitting background download {download['id']}: {e}")
# Check for completed futures (non-blocking)
done_futures = [f for f in active_futures if f.done()]
for future in done_futures:
download_id = active_futures.pop(future)
try:
success = future.result()
if success:
logger.debug(
f"✅ Thread completed download {download_id}")
else:
logger.warning(
f"⚠️ Thread failed download {download_id}")
except Exception as e:
logger.error(
f"❌ Thread exception for download {download_id}: {e}")
# Log status
if active_downloads:
api_pending_count = len(
[d for d in api_downloads if d['status'] == 'pending'])
api_downloading_count = len(
[d for d in api_downloads if d['status'] == 'downloading'])
sharing_pending_count = len(
[d for d in sharing_downloads if d['status'] == 'pending'])
sharing_downloading_count = len(
[d for d in sharing_downloads if d['status'] == 'downloading'])
logger.debug(
f"📊 API: {len(api_downloads)} (Downloading: {api_downloading_count}, Pending: {api_pending_count}) | "
f"Sharing: {len(sharing_downloads)} (Downloading: {sharing_downloading_count}, Pending: {sharing_pending_count}) | "
f"Threads: {len(active_futures)}"
)
# Adaptive polling: fast when active, slow when idle
if active_downloads or active_futures:
# Has work → use minimum interval
current_poll_interval = POLL_INTERVAL
else:
# Idle → gradually increase interval (up to max)
current_poll_interval = min(
current_poll_interval * 2, POLL_INTERVAL_MAX)
except Exception as e:
logger.error(f"Error in worker loop: {e}", exc_info=True)
# On error, use default interval
current_poll_interval = POLL_INTERVAL
time.sleep(current_poll_interval)
# Cleanup
logger.debug("Shutting down thread pool...")
executor.shutdown(wait=True)
logger.debug("Worker shutdown complete")
def start_worker():
"""Start the download worker in the background."""
import threading
def run():
try:
worker_loop()
except Exception as e:
logger.error(f"Worker crashed: {e}", exc_info=True)
thread = threading.Thread(target=run, daemon=True, name="DownloadWorker")
thread.start()
logger.debug("Download worker thread started")
return thread
if __name__ == "__main__":
# For standalone testing
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
worker_loop()
except KeyboardInterrupt:
logger.debug("Worker stopped by user")