616 lines
23 KiB
Python
Executable File
616 lines
23 KiB
Python
Executable File
"""
|
|
File-centric download worker.
|
|
Processes individual file downloads from the downloads table.
|
|
|
|
Architecture:
|
|
- Each download record = 1 file
|
|
- Worker picks up pending files and downloads them one by one
|
|
- Progress tracked per-file, not per-batch
|
|
- Uses aria2c for fast multi-connection downloads
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
import signal
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from .services import downloads_service
|
|
from .services import nas_service
|
|
from .services import mongodb_service
|
|
from .common import get_download_filename
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Worker configuration - Load from environment variables
|
|
POLL_INTERVAL = int(os.getenv('WORKER_POLL_INTERVAL', '3')
|
|
) # seconds (min interval)
|
|
# seconds (max when idle)
|
|
POLL_INTERVAL_MAX = int(os.getenv('WORKER_POLL_INTERVAL_MAX', '30'))
|
|
# Process up to N files concurrently
|
|
MAX_CONCURRENT_DOWNLOADS = int(
|
|
os.getenv('WORKER_MAX_CONCURRENT_DOWNLOADS', '5'))
|
|
BACKGROUND_DOWNLOAD_MAX_SPEED = os.getenv(
|
|
# Bandwidth limit for background queue
|
|
'BACKGROUND_DOWNLOAD_MAX_SPEED', '100K')
|
|
_shutdown_requested = False
|
|
|
|
|
|
def natural_sort_key(text: str) -> List:
|
|
"""
|
|
Generate key for natural sorting (e.g., file1, file2, file10, not file1, file10, file2).
|
|
|
|
Args:
|
|
text: String to generate sort key for
|
|
|
|
Returns:
|
|
List of mixed strings and integers for natural sorting
|
|
"""
|
|
def convert(part):
|
|
return int(part) if part.isdigit() else part.lower()
|
|
|
|
return [convert(c) for c in re.split('([0-9]+)', text)]
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle graceful shutdown on SIGINT/SIGTERM."""
|
|
global _shutdown_requested
|
|
logger.debug(f"Received signal {sig}, initiating graceful shutdown...")
|
|
_shutdown_requested = True
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
def process_file_download(download: Dict[str, Any], sid: str, max_speed: Optional[str] = None) -> bool:
|
|
"""
|
|
Process a single file download.
|
|
|
|
Args:
|
|
download: Download record from database
|
|
sid: NAS session ID for authentication
|
|
max_speed: Optional bandwidth limit (e.g., '100K') for throttling
|
|
|
|
Returns:
|
|
True if successful, False if failed
|
|
"""
|
|
download_id = download["id"]
|
|
file_name = download["file_name"]
|
|
file_path = download["file_path"]
|
|
ge_id = download["ge_id"]
|
|
lang = download["lang"]
|
|
mode = download["mode"]
|
|
|
|
# IMPORTANT: Uppercase lang code for folder naming (1000_DE, not 1000_de)
|
|
lang_upper = lang.upper()
|
|
|
|
logger.debug(
|
|
f"[Download {download_id}] Processing: {file_name} (mode: {mode}, GE={ge_id}, lang={lang_upper})")
|
|
|
|
try:
|
|
# Update to downloading status
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='downloading',
|
|
progress_percent=0.0
|
|
)
|
|
|
|
# Determine source and destination paths
|
|
if mode == 'api':
|
|
# API mode: Download from NAS FileStation
|
|
source_path = file_path # Relative path in NAS
|
|
base_dest = nas_service.DESTINATION_PATH
|
|
|
|
# Get final filename (adds .zip for folders automatically)
|
|
dest_filename = get_download_filename(file_path, file_name)
|
|
|
|
# Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip)
|
|
dest_path = os.path.join(
|
|
base_dest, f"{ge_id}_{lang_upper}", dest_filename)
|
|
|
|
# Get MongoDB path
|
|
mongodb_path = mongodb_service.get_path_from_tms_data(ge_id, lang)
|
|
|
|
# Download using NAS service
|
|
success, final_path, error_msg = _download_api_file(
|
|
download_id=download_id,
|
|
source_path=source_path,
|
|
dest_path=dest_path,
|
|
file_name=file_name,
|
|
sid=sid,
|
|
max_speed=max_speed
|
|
)
|
|
|
|
elif mode == 'sharing':
|
|
# Sharing mode: Download from sharing link
|
|
sharing_id = download.get('sharing_id')
|
|
if not sharing_id:
|
|
raise Exception("Missing sharing_id for sharing mode download")
|
|
|
|
base_dest = nas_service.DESTINATION_PATH
|
|
|
|
# Get final filename (adds .zip for folders automatically)
|
|
dest_filename = get_download_filename(file_path, file_name)
|
|
|
|
# Use format: base/GE_LANG/filename (e.g., raw/1000_DE/file.zip)
|
|
dest_path = os.path.join(
|
|
base_dest, f"{ge_id}_{lang_upper}", dest_filename)
|
|
|
|
# Download using sharing service
|
|
success, final_path, error_msg = _download_sharing_file(
|
|
download_id=download_id,
|
|
sharing_id=sharing_id,
|
|
file_path=file_path,
|
|
dest_path=dest_path,
|
|
file_name=file_name,
|
|
max_speed=max_speed
|
|
)
|
|
|
|
mongodb_path = None
|
|
|
|
else:
|
|
raise Exception(f"Unknown download mode: {mode}")
|
|
|
|
# Update final status
|
|
if success:
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='completed',
|
|
progress_percent=100.0,
|
|
destination_path=final_path
|
|
)
|
|
logger.debug(f"[Download {download_id}] ✅ Completed: {file_name}")
|
|
return True
|
|
else:
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='failed',
|
|
error_message=error_msg
|
|
)
|
|
logger.error(f"[Download {download_id}] ❌ Failed: {error_msg}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(
|
|
f"[Download {download_id}] Exception: {error_msg}", exc_info=True)
|
|
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='failed',
|
|
error_message=error_msg
|
|
)
|
|
return False
|
|
|
|
|
|
def _download_api_file(
|
|
download_id: int,
|
|
source_path: str,
|
|
dest_path: str,
|
|
file_name: str,
|
|
sid: str,
|
|
max_speed: Optional[str] = None
|
|
) -> tuple[bool, Optional[str], Optional[str]]:
|
|
"""
|
|
Download a single file via NAS FileStation API.
|
|
|
|
Args:
|
|
max_speed: Optional bandwidth limit (e.g., '100K')
|
|
|
|
Returns:
|
|
(success, final_path, error_message)
|
|
"""
|
|
try:
|
|
# Progress callback
|
|
def progress_callback(downloaded: int, total: int):
|
|
# Always update downloaded_size, even if total is unknown (folders)
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='downloading',
|
|
progress_percent=round(
|
|
(downloaded / total) * 100, 2) if total > 0 else None,
|
|
downloaded_size=downloaded,
|
|
file_size=total if total > 0 else None
|
|
)
|
|
|
|
# Create destination directory
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
|
# Download file using NAS API with aria2
|
|
success, error_msg, gid = nas_service.download_single_file_aria2(
|
|
sid=sid,
|
|
remote_path=source_path,
|
|
local_save_path=dest_path,
|
|
is_dir=False,
|
|
progress_callback=progress_callback,
|
|
max_speed=max_speed
|
|
)
|
|
|
|
# Save GID to database for cancellation support
|
|
if gid:
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='downloading',
|
|
aria2_gid=gid
|
|
)
|
|
|
|
if success:
|
|
return True, dest_path, None
|
|
else:
|
|
return False, None, error_msg or "Download failed"
|
|
|
|
except Exception as e:
|
|
return False, None, str(e)
|
|
|
|
|
|
def _download_sharing_file(
|
|
download_id: int,
|
|
sharing_id: str,
|
|
file_path: str,
|
|
dest_path: str,
|
|
file_name: str,
|
|
max_speed: Optional[str] = None
|
|
) -> tuple[bool, Optional[str], Optional[str]]:
|
|
"""
|
|
Download a single file from sharing link.
|
|
|
|
Args:
|
|
max_speed: Optional bandwidth limit (e.g., '100K')
|
|
|
|
Returns:
|
|
(success, final_path, error_message)
|
|
"""
|
|
try:
|
|
from .services import nas_sharing_service
|
|
|
|
# Progress callback
|
|
def progress_callback(downloaded: int, total: int):
|
|
# Always update downloaded_size, even if total is unknown (folders)
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='downloading',
|
|
progress_percent=round(
|
|
(downloaded / total) * 100, 2) if total > 0 else None,
|
|
downloaded_size=downloaded,
|
|
file_size=total if total > 0 else None
|
|
)
|
|
|
|
# Create destination directory
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
|
# Get sharing worker
|
|
worker = nas_sharing_service.get_sharing_worker()
|
|
if not worker:
|
|
return False, None, "Sharing worker not available"
|
|
|
|
# Determine if this is a folder (check file extension)
|
|
is_folder = not bool(os.path.splitext(file_path)[1])
|
|
|
|
# Download file from sharing link using aria2
|
|
result = nas_sharing_service.download_file(
|
|
sharing_id=sharing_id,
|
|
file_path=file_path,
|
|
save_path=dest_path,
|
|
is_folder=is_folder,
|
|
progress_callback=progress_callback,
|
|
max_speed=max_speed
|
|
)
|
|
|
|
# ✅ Save GID to database for cancellation support (same as API mode)
|
|
gid = result.get('aria2_gid')
|
|
if gid:
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='downloading',
|
|
aria2_gid=gid
|
|
)
|
|
|
|
if result['status'] == 'success':
|
|
logger.debug(
|
|
f"[Sharing Download {download_id}] ✅ Downloaded: {file_name}")
|
|
return True, result['save_path'], None
|
|
else:
|
|
logger.error(
|
|
f"[Sharing Download {download_id}] ❌ Failed: {result['message']}")
|
|
return False, None, result['message']
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[Sharing Download {download_id}] Exception: {str(e)}", exc_info=True)
|
|
return False, None, str(e)
|
|
|
|
|
|
def recover_orphaned_downloads():
|
|
"""
|
|
Recovery logic: Find downloads stuck in 'downloading' state.
|
|
|
|
This happens when:
|
|
- Server crashed while downloading
|
|
- Worker was killed
|
|
- Aria2 task completed but DB wasn't updated
|
|
|
|
For each orphaned download:
|
|
- Check if aria2 GID is still active
|
|
- If not active -> mark as failed
|
|
"""
|
|
logger.debug("🔍 Checking for orphaned downloads...")
|
|
|
|
try:
|
|
# Get all downloads stuck in 'downloading' state
|
|
orphaned = downloads_service.get_all_downloads(
|
|
status='downloading', limit=1000)
|
|
|
|
if not orphaned:
|
|
logger.debug("No orphaned downloads found")
|
|
return
|
|
|
|
logger.debug(f"Found {len(orphaned)} downloads in 'downloading' state")
|
|
|
|
# Try to get aria2 manager
|
|
try:
|
|
from .services.aria2.download_manager import get_aria2_manager
|
|
manager = get_aria2_manager()
|
|
except Exception as e:
|
|
logger.warning(f"Could not get aria2 manager: {e}")
|
|
manager = None
|
|
|
|
recovered_count = 0
|
|
failed_count = 0
|
|
|
|
for download in orphaned:
|
|
gid = download.get('aria2_gid')
|
|
download_id = download['id']
|
|
file_name = download.get('file_name', 'unknown')
|
|
|
|
# Check if aria2 task is still active
|
|
is_active = False
|
|
if gid and manager:
|
|
try:
|
|
status = manager.get_status(gid)
|
|
is_active = status.get('status') in [
|
|
'active', 'waiting', 'paused']
|
|
except Exception as e:
|
|
logger.debug(f"Could not get status for GID {gid}: {e}")
|
|
|
|
if is_active:
|
|
# Aria2 is still downloading - let worker handle it
|
|
logger.debug(
|
|
f"♻️ Download {download_id} ({file_name}) is still active in aria2")
|
|
recovered_count += 1
|
|
else:
|
|
# Aria2 task doesn't exist - mark as failed
|
|
logger.warning(
|
|
f"❌ Download {download_id} ({file_name}) has no active aria2 task")
|
|
downloads_service.update_download_status(
|
|
download_id=download_id,
|
|
status='failed',
|
|
error_message='Download was interrupted (server crash or restart)'
|
|
)
|
|
failed_count += 1
|
|
|
|
logger.debug(
|
|
f"✅ Recovery complete: {recovered_count} recovered, {failed_count} marked as failed")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error during orphaned downloads recovery: {e}", exc_info=True)
|
|
|
|
|
|
def worker_loop():
|
|
"""
|
|
Main worker loop.
|
|
Continuously polls for pending downloads and processes them.
|
|
|
|
LOGIC:
|
|
- API mode: Tải ngay lập tức, không giới hạn số lượng
|
|
- Sharing mode:
|
|
* Main queue (first MAX_CONCURRENT_DOWNLOADS): Full speed, ưu tiên slots
|
|
* Background queue (rest): Throttled, LUÔN TẢI SONG SONG để keep link alive
|
|
"""
|
|
logger.debug("🚀 File Download Worker started")
|
|
logger.debug(f" - Poll interval: {POLL_INTERVAL}s")
|
|
logger.debug(
|
|
f" - Max concurrent (Sharing only): {MAX_CONCURRENT_DOWNLOADS}")
|
|
logger.debug(
|
|
f" - Background speed (Sharing only): {BACKGROUND_DOWNLOAD_MAX_SPEED}")
|
|
|
|
# Recovery: Check for orphaned downloads from previous crashes
|
|
recover_orphaned_downloads()
|
|
|
|
# Get NAS session - use saved SID
|
|
sid = nas_service.load_sid()
|
|
if not sid:
|
|
logger.error("❌ No NAS session found. Please login via OTP first.")
|
|
return
|
|
|
|
logger.debug(f"✅ Loaded NAS session (SID: {sid[:20]}...)")
|
|
|
|
# Create thread pool - NO LIMIT (API mode needs unlimited parallelism)
|
|
# Sharing mode will self-limit via queue logic
|
|
executor = ThreadPoolExecutor(max_workers=50, thread_name_prefix="DL-")
|
|
active_futures = {} # Map future -> download_id
|
|
|
|
# Adaptive polling: start with min interval, increase when idle
|
|
current_poll_interval = POLL_INTERVAL
|
|
|
|
while not _shutdown_requested:
|
|
try:
|
|
# Get active downloads (pending or downloading)
|
|
active_downloads = downloads_service.get_active_downloads()
|
|
|
|
# Separate API and Sharing downloads
|
|
api_downloads = [d for d in active_downloads if d['mode'] == 'api']
|
|
sharing_downloads = [
|
|
d for d in active_downloads if d['mode'] == 'sharing']
|
|
|
|
# ========== API MODE: IMMEDIATE DOWNLOAD (NO QUEUE) ==========
|
|
api_pending = [
|
|
d for d in api_downloads if d['status'] == 'pending']
|
|
for download in api_pending:
|
|
try:
|
|
future = executor.submit(
|
|
process_file_download,
|
|
download,
|
|
sid,
|
|
max_speed=None # API mode: Always full speed
|
|
)
|
|
active_futures[future] = download['id']
|
|
logger.debug(
|
|
f"🚀 [API] Started download {download['id']}: {download['file_name']}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error submitting API download {download['id']}: {e}")
|
|
|
|
# ========== SHARING MODE: QUEUE-BASED WITH THROTTLING ==========
|
|
# Get pending sharing downloads
|
|
sharing_pending = [
|
|
d for d in sharing_downloads if d['status'] == 'pending']
|
|
|
|
if sharing_pending:
|
|
# Natural sort by file name within each batch
|
|
from collections import defaultdict
|
|
batches = defaultdict(list)
|
|
for d in sharing_pending:
|
|
batch_id = d.get('batch_id', 'default')
|
|
batches[batch_id].append(d)
|
|
|
|
# Sort each batch naturally and flatten
|
|
sorted_pending = []
|
|
for batch_files in batches.values():
|
|
sorted_batch = sorted(
|
|
batch_files, key=lambda x: natural_sort_key(x['file_name']))
|
|
sorted_pending.extend(sorted_batch)
|
|
|
|
# Split into main queue (first N) and background queue (rest)
|
|
main_queue = sorted_pending[:MAX_CONCURRENT_DOWNLOADS]
|
|
background_queue = sorted_pending[MAX_CONCURRENT_DOWNLOADS:]
|
|
|
|
# Count currently downloading sharing files (for slot calculation)
|
|
sharing_downloading_count = len(
|
|
[d for d in sharing_downloads if d['status'] == 'downloading'])
|
|
available_slots = MAX_CONCURRENT_DOWNLOADS - sharing_downloading_count
|
|
|
|
# Submit main queue files (full speed, use available slots)
|
|
if available_slots > 0:
|
|
for download in main_queue[:available_slots]:
|
|
try:
|
|
future = executor.submit(
|
|
process_file_download,
|
|
download,
|
|
sid,
|
|
max_speed=None # Full speed
|
|
)
|
|
active_futures[future] = download['id']
|
|
logger.debug(
|
|
f"🚀 [SHARING-MAIN] Started download {download['id']}: {download['file_name']}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error submitting sharing download {download['id']}: {e}")
|
|
|
|
# Submit background queue files (throttled, ALWAYS to keep links alive)
|
|
# Background files KHÔNG CẦN SLOTS - tải song song với tốc độ thấp
|
|
for download in background_queue:
|
|
# Check if already downloading
|
|
if download['status'] == 'pending':
|
|
try:
|
|
future = executor.submit(
|
|
process_file_download,
|
|
download,
|
|
sid,
|
|
max_speed=BACKGROUND_DOWNLOAD_MAX_SPEED # Throttled
|
|
)
|
|
active_futures[future] = download['id']
|
|
logger.debug(
|
|
f"🐢 [SHARING-BACKGROUND] Started download {download['id']}: {download['file_name']} (limited to {BACKGROUND_DOWNLOAD_MAX_SPEED})")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error submitting background download {download['id']}: {e}")
|
|
|
|
# Check for completed futures (non-blocking)
|
|
done_futures = [f for f in active_futures if f.done()]
|
|
for future in done_futures:
|
|
download_id = active_futures.pop(future)
|
|
try:
|
|
success = future.result()
|
|
if success:
|
|
logger.debug(
|
|
f"✅ Thread completed download {download_id}")
|
|
else:
|
|
logger.warning(
|
|
f"⚠️ Thread failed download {download_id}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"❌ Thread exception for download {download_id}: {e}")
|
|
|
|
# Log status
|
|
if active_downloads:
|
|
api_pending_count = len(
|
|
[d for d in api_downloads if d['status'] == 'pending'])
|
|
api_downloading_count = len(
|
|
[d for d in api_downloads if d['status'] == 'downloading'])
|
|
sharing_pending_count = len(
|
|
[d for d in sharing_downloads if d['status'] == 'pending'])
|
|
sharing_downloading_count = len(
|
|
[d for d in sharing_downloads if d['status'] == 'downloading'])
|
|
|
|
logger.debug(
|
|
f"📊 API: {len(api_downloads)} (Downloading: {api_downloading_count}, Pending: {api_pending_count}) | "
|
|
f"Sharing: {len(sharing_downloads)} (Downloading: {sharing_downloading_count}, Pending: {sharing_pending_count}) | "
|
|
f"Threads: {len(active_futures)}"
|
|
)
|
|
|
|
# Adaptive polling: fast when active, slow when idle
|
|
if active_downloads or active_futures:
|
|
# Has work → use minimum interval
|
|
current_poll_interval = POLL_INTERVAL
|
|
else:
|
|
# Idle → gradually increase interval (up to max)
|
|
current_poll_interval = min(
|
|
current_poll_interval * 2, POLL_INTERVAL_MAX)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in worker loop: {e}", exc_info=True)
|
|
# On error, use default interval
|
|
current_poll_interval = POLL_INTERVAL
|
|
|
|
time.sleep(current_poll_interval)
|
|
|
|
# Cleanup
|
|
logger.debug("Shutting down thread pool...")
|
|
executor.shutdown(wait=True)
|
|
logger.debug("Worker shutdown complete")
|
|
|
|
|
|
def start_worker():
|
|
"""Start the download worker in the background."""
|
|
import threading
|
|
|
|
def run():
|
|
try:
|
|
worker_loop()
|
|
except Exception as e:
|
|
logger.error(f"Worker crashed: {e}", exc_info=True)
|
|
|
|
thread = threading.Thread(target=run, daemon=True, name="DownloadWorker")
|
|
thread.start()
|
|
logger.debug("Download worker thread started")
|
|
|
|
return thread
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# For standalone testing
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
try:
|
|
worker_loop()
|
|
except KeyboardInterrupt:
|
|
logger.debug("Worker stopped by user")
|