""" Downloads Service - File-centric download management. Each download record represents ONE file, not a batch of files. """ import logging from typing import List, Dict, Any, Optional, cast from datetime import datetime from uuid import uuid4 from .supabase_service import get_supabase_client logger = logging.getLogger(__name__) # ==================== BATCH CREATION ==================== def create_downloads_batch( files: List[Dict[str, Any]], ge_id: str, lang: str, mode: str, sharing_id: Optional[str] = None, mongodb_path: Optional[str] = None, destination_path: Optional[str] = None ) -> Dict[str, Any]: """ Create a batch of file download records. Args: files: List of file info dicts with 'name', 'path', 'isdir'/'is_folder' ge_id: GE ID for organizing lang: Language code mode: 'api' or 'sharing' sharing_id: Optional sharing link ID (for sharing mode) mongodb_path: MongoDB reference path - For API mode: folder_path from titles_data - For Sharing mode: linkRaw from titles_data destination_path: Optional full destination path for downloads Returns: { "success": bool, "batch_id": str, "download_ids": List[int], "file_count": int, "message": str } """ try: client = get_supabase_client() batch_id = str(uuid4()) # Generate batch ID to group files # Prepare download records (one per file) download_records = [] for file_info in files: file_name = file_info.get('name', '') file_path = file_info.get('path', '') record = { 'batch_id': batch_id, 'ge_id': ge_id, 'lang': lang, 'file_name': file_name, 'file_path': file_path, 'mode': mode, 'status': 'pending', 'sharing_id': sharing_id, 'mongodb_path': mongodb_path, 'destination_path': destination_path, 'retry_count': 0, 'downloaded_size': 0, 'progress_percent': 0.0 } download_records.append(record) # Insert all records in one batch response = client.table('downloads').insert(download_records).execute() # Extract IDs of created downloads created_downloads = response.data or [] download_ids = [cast(int, d.get('id')) for d in created_downloads if isinstance( d, dict) and d.get('id')] logger.debug( f"Created batch {batch_id}: {len(download_ids)} files for {ge_id} {lang}") return { "success": True, "batch_id": batch_id, "download_ids": download_ids, "file_count": len(download_ids), "message": f"Created {len(download_ids)} download records" } except Exception as e: logger.error(f"Error creating downloads batch: {e}") return { "success": False, "batch_id": None, "download_ids": [], "file_count": 0, "message": f"Lỗi tạo downloads: {str(e)}" } # ==================== QUERY FUNCTIONS ==================== def get_all_downloads( status: Optional[str] = None, mode: Optional[str] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get all download records with optional filtering. Args: status: Filter by status (pending, downloading, completed, failed, cancelled) mode: Filter by mode (api, sharing) limit: Max number of records to return Returns: List of download records """ try: client = get_supabase_client() # Build query query = client.table('downloads').select('*') # Apply filters if status: query = query.eq('status', status) if mode: query = query.eq('mode', mode) # Order by created_at descending (newest first) query = query.order('created_at', desc=True).limit(limit) response = query.execute() return cast(List[Dict[str, Any]], response.data or []) except Exception as e: logger.error(f"Error getting downloads: {e}") return [] def get_download_by_id(download_id: int) -> Optional[Dict[str, Any]]: """Get a single download record by ID.""" try: client = get_supabase_client() response = client.table('downloads').select( '*').eq('id', download_id).execute() if response.data and len(response.data) > 0: return cast(Dict[str, Any], response.data[0]) return None except Exception as e: logger.error(f"Error getting download {download_id}: {e}") return None def get_downloads_by_batch(batch_id: str) -> List[Dict[str, Any]]: """Get all downloads in a batch.""" try: client = get_supabase_client() response = client.table('downloads').select( '*').eq('batch_id', batch_id).order('file_name').execute() return cast(List[Dict[str, Any]], response.data or []) except Exception as e: logger.error(f"Error getting batch {batch_id}: {e}") return [] def get_active_downloads() -> List[Dict[str, Any]]: """Get all downloads that are pending or currently downloading.""" try: client = get_supabase_client() response = client.table('downloads')\ .select('*')\ .in_('status', ['pending', 'downloading'])\ .order('created_at', desc=False)\ .execute() return cast(List[Dict[str, Any]], response.data or []) except Exception as e: logger.error(f"Error getting active downloads: {e}") return [] # ==================== UPDATE FUNCTIONS ==================== def update_download_status( download_id: int, status: str, error_message: Optional[str] = None, progress_percent: Optional[float] = None, downloaded_size: Optional[int] = None, file_size: Optional[int] = None, destination_path: Optional[str] = None, aria2_gid: Optional[str] = None ) -> bool: """ Update download status and progress. Args: download_id: ID of download to update status: New status (pending, downloading, completed, failed, cancelled) error_message: Optional error message progress_percent: Download progress (0-100) downloaded_size: Bytes downloaded so far file_size: Total file size in bytes destination_path: Final destination path aria2_gid: Aria2 download GID (for cancellation) Returns: True if successful, False otherwise """ try: client = get_supabase_client() update_data: Dict[str, Any] = {'status': status} # Set timestamps based on status if status == 'downloading': # First time switching to downloading, set started_at download = get_download_by_id(download_id) if download and not download.get('started_at'): update_data['started_at'] = datetime.utcnow().isoformat() elif status in ['completed', 'failed', 'cancelled']: # Terminal status, set completed_at update_data['completed_at'] = datetime.utcnow().isoformat() # Clear aria2_gid when terminal update_data['aria2_gid'] = None # Optional fields if error_message is not None: update_data['error_message'] = error_message if progress_percent is not None: update_data['progress_percent'] = progress_percent if downloaded_size is not None: update_data['downloaded_size'] = downloaded_size if file_size is not None: update_data['file_size'] = file_size if destination_path is not None: update_data['destination_path'] = destination_path if aria2_gid is not None: update_data['aria2_gid'] = aria2_gid response = client.table('downloads').update( update_data).eq('id', download_id).execute() if response.data: logger.debug(f"Updated download {download_id} to status {status}") return True return False except Exception as e: logger.error(f"Error updating download {download_id}: {e}") return False def cancel_download(download_id: int) -> bool: """Cancel a download (set status to cancelled).""" try: # Get download info first download = get_download_by_id(download_id) if not download: logger.warning(f"Cannot cancel download {download_id}: not found") return False # If actively downloading with aria2, cancel the aria2 task first if download.get('status') == 'downloading' and download.get('aria2_gid'): try: from .aria2.download_manager import get_aria2_manager manager = get_aria2_manager() if manager: gid = download['aria2_gid'] logger.debug(f"Cancelling aria2 download GID: {gid}") manager.cancel_download(gid) except Exception as e: logger.error(f"Failed to cancel aria2 download: {e}") # Continue to update DB even if aria2 cancel fails # Update database status return update_download_status(download_id, 'cancelled') except Exception as e: logger.error(f"Error cancelling download {download_id}: {e}") return False def retry_download(download_id: int) -> bool: """ Retry a failed download. Resets status to pending and increments retry_count. """ try: client = get_supabase_client() # Get current download download = get_download_by_id(download_id) if not download: return False # Increment retry count retry_count = download.get('retry_count', 0) + 1 # Reset to pending update_data = { 'status': 'pending', 'retry_count': retry_count, 'error_message': None, 'progress_percent': 0.0, 'downloaded_size': 0, 'started_at': None, 'completed_at': None } response = client.table('downloads').update( update_data).eq('id', download_id).execute() if response.data: logger.debug( f"Retrying download {download_id} (attempt #{retry_count})") return True return False except Exception as e: logger.error(f"Error retrying download {download_id}: {e}") return False # ==================== DELETE FUNCTIONS ==================== def delete_download(download_id: int) -> bool: """Delete a download record (usually for completed/failed downloads).""" try: client = get_supabase_client() response = client.table('downloads').delete().eq( 'id', download_id).execute() if response.data: logger.debug(f"Deleted download {download_id}") return True return False except Exception as e: logger.error(f"Error deleting download {download_id}: {e}") return False def delete_batch(batch_id: str) -> bool: """Delete all downloads in a batch.""" try: client = get_supabase_client() response = client.table('downloads').delete().eq( 'batch_id', batch_id).execute() if response.data: logger.debug( f"Deleted batch {batch_id} ({len(response.data)} files)") return True return False except Exception as e: logger.error(f"Error deleting batch {batch_id}: {e}") return False # ==================== BATCH OPERATIONS ==================== def get_batch_summary(batch_id: str) -> Optional[Dict[str, Any]]: """ Get summary statistics for a download batch. Returns: { "batch_id": str, "ge_id": str, "lang": str, "mode": str, "total_files": int, "completed_files": int, "failed_files": int, "total_size": int, "downloaded_size": int, "status": str, # derived from file statuses "created_at": str, "started_at": str, "completed_at": str, "duration_seconds": float } """ try: downloads = get_downloads_by_batch(batch_id) if not downloads: return None # Calculate stats first_download = downloads[0] total_files = len(downloads) completed_files = sum( 1 for d in downloads if d['status'] == 'completed') failed_files = sum(1 for d in downloads if d['status'] == 'failed') total_size = sum(d.get('file_size', 0) or 0 for d in downloads) downloaded_size = sum(d.get('downloaded_size', 0) or 0 for d in downloads) # Determine batch status if all(d['status'] == 'completed' for d in downloads): batch_status = 'completed' elif all(d['status'] == 'failed' for d in downloads): batch_status = 'failed' elif any(d['status'] in ['pending', 'downloading'] for d in downloads): batch_status = 'downloading' else: batch_status = 'partial_failed' # Timestamps created_at = min(d['created_at'] for d in downloads) started_times = [d['started_at'] for d in downloads if d.get('started_at')] completed_times = [d['completed_at'] for d in downloads if d.get('completed_at')] started_at = min(started_times) if started_times else None completed_at = max(completed_times) if completed_times and len( completed_times) == total_files else None # Calculate duration duration_seconds = None if started_at and completed_at: start = datetime.fromisoformat(started_at.replace('Z', '+00:00')) end = datetime.fromisoformat(completed_at.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() return { "batch_id": batch_id, "ge_id": first_download['ge_id'], "lang": first_download['lang'], "mode": first_download['mode'], "total_files": total_files, "completed_files": completed_files, "failed_files": failed_files, "total_size": total_size, "downloaded_size": downloaded_size, "status": batch_status, "created_at": created_at, "started_at": started_at, "completed_at": completed_at, "duration_seconds": duration_seconds } except Exception as e: logger.error(f"Error getting batch summary for {batch_id}: {e}") return None