ge-tool/backend/services/nas_api/file_operations.py
2025-12-10 13:41:43 +07:00

660 lines
25 KiB
Python
Executable File

"""
NAS API File Operations
File listing, downloading, and management.
"""
import os
import json
import shutil
import requests
from typing import List, Dict, Optional, Tuple, Union, TYPE_CHECKING
from urllib.parse import urlencode
from .config import BASE_URL, DESTINATION_PATH, session, logger
from .session import load_sid
from .exceptions import NASConnectionError, NASAPIError
if TYPE_CHECKING:
from ..aria2.download_manager import Aria2DownloadManager
# aria2 integration
USE_ARIA2 = os.getenv('USE_ARIA2', 'true').lower() == 'true'
# None, False (unavailable), or Aria2DownloadManager
_aria2_manager: Optional[Union[bool, "Aria2DownloadManager"]] = None
def get_aria2_manager() -> "Aria2DownloadManager":
"""Get or create aria2 manager instance. Raises error if unavailable."""
global _aria2_manager
if _aria2_manager is None and USE_ARIA2:
try:
from ..aria2.download_manager import get_aria2_manager as _get_manager
_aria2_manager = _get_manager()
if _aria2_manager:
logger.debug(
"✅ aria2 manager initialized for NAS API downloads")
else:
raise RuntimeError("aria2 manager returned None")
except Exception as e:
logger.error(f"❌ CRITICAL: aria2 not available: {e}")
raise RuntimeError(
f"aria2 is required but not available: {e}") from e
if _aria2_manager is False or _aria2_manager is None:
raise RuntimeError("aria2 is required but not initialized")
return _aria2_manager # type: ignore
def syno_entry_request(sid: str, calls: List[Dict]) -> Dict:
"""
Make a SYNO.Entry.Request with multiple API calls.
Returns the JSON response.
Raises NASAPIError on API failure.
"""
try:
url = f"{BASE_URL}/entry.cgi"
compound = json.dumps(calls)
params = {
"api": "SYNO.Entry.Request",
"version": 1,
"method": "request",
"_sid": sid,
"compound": compound
}
resp = session.post(url, data=params, verify=False, timeout=30)
resp.raise_for_status()
data = resp.json()
logger.debug(f"NAS API response: {data}")
return data
except requests.exceptions.RequestException as e:
logger.error(f"Network error during NAS API call: {e}")
raise NASConnectionError(f"Lỗi kết nối NAS API: {e}")
except Exception as e:
logger.error(f"Unexpected error during NAS API call: {e}")
raise NASAPIError(f"Lỗi NAS API: {e}")
def test_session_validity(sid: str) -> bool:
"""
Test if the current session ID is still valid by attempting a simple API call.
Returns True if valid, False if expired/invalid.
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list_share",
"version": 2
}])
# Check if the API call was successful
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
logger.debug("Session ID is valid")
return True
logger.warning("Session ID appears to be invalid")
return False
except Exception as e:
logger.warning(f"Session validation failed: {e}")
return False
def list_folder_contents(sid: str, folder_path: str) -> Tuple[bool, List[Dict], Optional[str]]:
"""
List files and folders in the specified path.
Args:
sid: Session ID
folder_path: Path to list (e.g., "/Comic_TMS_L/DKI/JP")
Returns:
Tuple[bool, List[Dict], Optional[str]]:
- success: True if successful, False if failed
- files: List of file/folder dictionaries
- error_message: Error message if failed, None if successful
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list",
"version": 2,
"folder_path": folder_path,
"additional": ["real_path", "size", "owner", "time", "perm", "type"]
}])
# Parse the response
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
files = first_result.get("data", {}).get("files", [])
logger.debug(
f"Successfully listed {len(files)} items in {folder_path}")
return True, files, None
else:
error_info = first_result.get("error", {})
error_code = error_info.get("code") if isinstance(
error_info, dict) else None
# Check for specific error codes
if error_code == 408:
error_msg = f"Thư mục không tồn tại: {folder_path}"
elif error_code == 400:
error_msg = f"Đường dẫn không hợp lệ: {folder_path}"
elif error_code == 402:
error_msg = f"Không có quyền truy cập: {folder_path}"
else:
error_msg = f"Lỗi NAS (code {error_code}): {error_info}"
logger.error(
f"Failed to list folder {folder_path}: {error_msg}")
return False, [], error_msg
else:
error_msg = "Invalid API response format"
logger.error(f"Failed to list folder {folder_path}: {error_msg}")
return False, [], error_msg
except Exception as e:
error_msg = f"Exception during folder listing: {e}"
logger.error(error_msg)
return False, [], error_msg
def list_shares(sid: str) -> Tuple[bool, List[str], Optional[str]]:
"""
List all available root shares.
Returns:
Tuple[bool, List[str], Optional[str]]:
- success: True if successful, False if failed
- shares: List of share names
- error_message: Error message if failed, None if successful
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list_share",
"version": 2
}])
# Parse the response
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
shares_data = first_result.get("data", {}).get("shares", [])
share_names = [share["name"] for share in shares_data]
logger.debug(f"Successfully listed {len(share_names)} shares")
return True, share_names, None
else:
error_info = first_result.get("error", {})
error_msg = f"NAS API Error: {error_info}"
logger.error(f"Failed to list shares: {error_msg}")
return False, [], error_msg
else:
error_msg = "Invalid API response format"
logger.error(f"Failed to list shares: {error_msg}")
return False, [], error_msg
except Exception as e:
error_msg = f"Exception during shares listing: {e}"
logger.error(error_msg)
return False, [], error_msg
def get_files_for_path(folder_path: str) -> Tuple[str, List[Dict], Optional[str]]:
"""
High-level function to get files for a given path.
Handles session management automatically.
Args:
folder_path: Path to list (e.g., "/Comic_TMS_L/DKI/JP")
Returns:
Tuple[str, List[Dict], Optional[str]]:
- status: "success", "otp_required", or "error"
- files: List of file/folder dictionaries (empty if not success)
- message: Status message or error description
"""
try:
# Try to load existing session
sid = load_sid()
if sid:
# Test if session is still valid
if test_session_validity(sid):
# Session is valid, try to list files
success, files, error_msg = list_folder_contents(
sid, folder_path)
if success:
return "success", files, "Đã tải danh sách file thành công"
else:
# error_msg already contains detailed message from list_folder_contents
logger.warning(
f"Failed to list files despite valid session: {error_msg}")
return "error", [], error_msg
else:
# Session is invalid, need new login
logger.debug("Session expired, OTP required")
return "otp_required", [], "Phiên đăng nhập đã hết hạn. Vui lòng nhập mã OTP."
else:
# No session found, need login
logger.debug("No session found, OTP required")
return "otp_required", [], "Cần đăng nhập. Vui lòng nhập mã OTP."
except Exception as e:
logger.error(f"Unexpected error in get_files_for_path: {e}")
return "error", [], f"Lỗi hệ thống: {e}"
def download_single_file_aria2(
sid: str,
remote_path: str,
local_save_path: str,
is_dir: bool = False,
progress_callback=None,
max_speed: Optional[str] = None
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Download via aria2 - NO FALLBACK, throws error if aria2 unavailable
Args:
sid: Session ID
remote_path: Path on NAS
local_save_path: Local file path to save to
is_dir: Whether the remote_path is a directory (will be zipped)
progress_callback: Optional callback(downloaded_bytes, total_bytes)
max_speed: Optional bandwidth limit (e.g., '100K')
Returns:
Tuple[success, error_message, gid]
"""
# Will raise RuntimeError if aria2 not available
manager = get_aria2_manager()
try:
# Build download URL with SID
url = f"{BASE_URL}/entry.cgi"
params = {
"api": "SYNO.FileStation.Download",
"version": 2,
"method": "download",
"path": remote_path,
"mode": "download",
"_sid": sid
}
# Convert to full URL with query string
download_url = f"{url}?{urlencode(params)}"
logger.debug(
f"[aria2] Downloading: {remote_path} -> {local_save_path}")
# Download via aria2
success, error_msg, gid = manager.download_file(
url=download_url,
dest_path=local_save_path,
progress_callback=progress_callback,
max_download_limit=max_speed
)
if success:
logger.debug(f"[aria2] ✅ Download success: {local_save_path}")
return True, None, gid
else:
# NO FALLBACK - Throw error immediately
error_msg = error_msg or "aria2 download failed"
logger.error(f"[aria2] ❌ FAILED: {error_msg}")
raise RuntimeError(f"aria2 download failed: {error_msg}")
except RuntimeError:
# Re-raise RuntimeError (from aria2 failures)
raise
except Exception as e:
# Unexpected exception - NO FALLBACK
logger.error(f"[aria2] ❌ ERROR: {e}")
raise RuntimeError(f"aria2 unexpected error: {e}") from e
def cleanup_duplicates_before_download(dest_path: str, file_name_pattern: str, exact_filename: str, delete_dirs: bool = True) -> None:
"""
Before download, delete all files/folders that contain file_name_pattern in their name.
Also attempts to delete the exact_filename if it exists.
Args:
dest_path: Destination directory path
file_name_pattern: Original file name to search for (e.g., "[식자설정]")
exact_filename: Exact filename of the file to be downloaded (e.g., "[식자설정].zip")
delete_dirs: Whether to delete matching directories (True for API mode, False for Sharing mode)
"""
try:
if not os.path.exists(dest_path):
return
# List all files/folders in destination
for item_name in os.listdir(dest_path):
item_path = os.path.join(dest_path, item_name)
# Check if item name contains the file_name_pattern
if file_name_pattern in item_name:
try:
# If it's the exact file we are about to download
if item_name == exact_filename:
if os.path.isfile(item_path):
os.remove(item_path)
logger.debug(f"Deleted existing file: {item_path}")
elif os.path.isdir(item_path) and delete_dirs:
shutil.rmtree(item_path)
logger.debug(
f"Deleted existing folder (exact match): {item_path}")
continue
# For other duplicates
if os.path.isfile(item_path):
os.remove(item_path)
logger.debug(f"Cleaned up duplicate file: {item_path}")
elif os.path.isdir(item_path):
if delete_dirs:
shutil.rmtree(item_path)
logger.debug(
f"Cleaned up duplicate folder: {item_path}")
else:
logger.debug(
f"Skipped deleting duplicate folder (Sharing Mode): {item_path}")
except Exception as e:
logger.warning(f"Could not delete {item_path}: {e}")
except Exception as e:
logger.error(
f"Error cleaning up duplicates for '{file_name_pattern}': {e}")
def download_files_to_destination(
files_info: List[Dict],
ge_id: str,
lang: str,
base_destination: Optional[str] = None,
progress_callback=None
) -> Tuple[str, List[Dict], Optional[str], Optional[str]]:
"""
Download multiple files from NAS to network destination.
Simplified version - no complex error handling, just download.
Args:
files_info: List of file dicts with keys: name, path, isdir
ge_id: GE ID for folder naming (not used if base_destination provided)
lang: Language code (not used if base_destination provided)
base_destination: Full destination path. If None, will create GEID_LANG folder under DESTINATION_PATH
Returns:
Tuple[str, List[Dict], Optional[str], Optional[str]]:
- status: "success", "partial", or "error"
- results: List of download results with success/error for each file
- message: Overall status message
- destination_path: The actual destination path where files were downloaded
"""
try:
# Validate session
sid = load_sid()
if not sid:
return "error", [], "Session không hợp lệ. Vui lòng đăng nhập lại.", None
# Determine destination path
if base_destination is None:
# Create GEID_LANG folder under default DESTINATION_PATH
lang_upper = lang.upper()
dest_folder = f"{ge_id}_{lang_upper}"
if DESTINATION_PATH.endswith("\\"):
dest_path = f"{DESTINATION_PATH}{dest_folder}"
else:
dest_path = f"{DESTINATION_PATH}\\{dest_folder}"
else:
# Use provided path directly (already includes GEID_LANG)
dest_path = base_destination
# Create destination directory
os.makedirs(dest_path, exist_ok=True)
logger.debug(f"Destination created: {dest_path}")
results = []
successful_downloads = 0
total_files = len(files_info)
# Initialize files_status for progress tracking
files_status = [
{
"name": f"{file_info.get('name', 'unknown')}{'.zip' if file_info.get('isdir', False) else ''}",
"status": "pending",
"progress": None,
"downloaded": 0,
"total": None,
"is_folder": file_info.get("isdir", False)
}
for file_info in files_info
]
for idx, file_info in enumerate(files_info):
file_name = file_info.get("name", "unknown")
remote_path = file_info.get("path", "")
is_dir = file_info.get("isdir", False)
# Add .zip extension for directories
local_filename = file_name
if is_dir:
local_filename += ".zip"
# Cleanup duplicates BEFORE download
cleanup_duplicates_before_download(
dest_path, file_name, local_filename, delete_dirs=True)
# Build file path manually
local_file_path = f"{dest_path}\\{local_filename}"
# Safety check: If file still exists (could not be deleted), append _NEW
if os.path.exists(local_file_path):
logger.warning(
f"Could not delete existing file {local_filename}, appending _NEW")
name, ext = os.path.splitext(local_filename)
local_filename = f"{name}_NEW{ext}"
local_file_path = f"{dest_path}\\{local_filename}"
# Update status to downloading
files_status[idx]["status"] = "downloading"
# Create file-specific progress callback
def file_progress_callback(downloaded_bytes, total_bytes):
files_status[idx]["downloaded"] = downloaded_bytes
files_status[idx]["total"] = total_bytes
if total_bytes > 0:
progress_pct = (downloaded_bytes / total_bytes) * 100
files_status[idx]["progress"] = round(progress_pct, 1)
else:
files_status[idx]["progress"] = None
# Call parent progress callback
if progress_callback:
progress_callback(idx, total_files, {
"current_file": local_filename,
"current_file_index": idx + 1,
"total_files": total_files,
"current_file_progress": files_status[idx].get("progress"),
"current_file_downloaded": downloaded_bytes,
"current_file_total": total_bytes if total_bytes > 0 else None,
"files_status": files_status
})
# Download via aria2 (required)
success, error_msg, gid = download_single_file_aria2(
sid, remote_path, local_file_path, is_dir,
progress_callback=file_progress_callback
)
# Update files_status
if success:
files_status[idx]["status"] = "completed"
successful_downloads += 1
else:
files_status[idx]["status"] = "failed"
result = {
"file_name": file_name,
"local_path": local_file_path,
"success": success,
"error_message": error_msg,
"is_directory": is_dir
}
results.append(result)
# No cleanup after download anymore
# Determine overall status
total_files = len(files_info)
if successful_downloads == total_files:
status = "success"
message = f"Đã tải xuống {successful_downloads}/{total_files} file vào {dest_path}"
elif successful_downloads > 0:
status = "partial"
message = f"Đã tải xuống {successful_downloads}/{total_files} file"
else:
status = "error"
message = "Không thể tải xuống file nào"
return status, results, message, dest_path
except Exception as e:
logger.error(f"Error in download_files_to_destination: {e}")
return "error", [], f"Lỗi: {e}", None
def download_files_as_single_zip(
files_info: List[Dict],
dest_path: str,
sid: Optional[str] = None
) -> Tuple[str, List[Dict], str, Optional[str]]:
"""
Download multiple files/folders as a single zip file.
Args:
files_info: List of file info dicts with 'path', 'name', 'isdir' keys
dest_path: Local destination path (network share)
sid: Session ID for authentication
Returns:
Tuple of (status, results, message, zip_file_path)
- status: "success", "error"
- results: List of file info dicts with download status
- message: Human-readable status message
- zip_file_path: Full path to the created zip file
Logic:
- If only 1 file/folder: zip that single item (don't double-zip folders)
- If multiple files: zip all into a single archive named after parent folder
"""
try:
if not sid:
return "error", [], "Missing session ID", None
if not files_info:
return "error", [], "No files selected", None
# Ensure destination directory exists
os.makedirs(dest_path, exist_ok=True)
# Determine zip file name
if len(files_info) == 1:
# Single file/folder: use its name for zip
single_item = files_info[0]
zip_base_name = single_item.get("name", "download")
else:
# Multiple files: find common parent folder name
# Extract parent path from first file
first_path = files_info[0].get("path", "")
# Remove leading/trailing slashes and get parent
clean_path = first_path.strip("/")
path_parts = clean_path.split("/")
if len(path_parts) > 1:
# Use parent folder name
zip_base_name = path_parts[-2]
else:
# Fallback to generic name
zip_base_name = "download"
# Sanitize filename
zip_base_name = "".join(c if c.isalnum() or c in (
' ', '-', '_') else '_' for c in zip_base_name)
zip_filename = f"{zip_base_name}.zip"
local_zip_path = os.path.join(dest_path, zip_filename)
# Prepare path parameter for FileStation API
# API expects comma-separated paths with proper JSON encoding
path_list = [f'"{file_info["path"]}"' for file_info in files_info]
path_param = f'[{",".join(path_list)}]'
logger.debug(
f"Downloading {len(files_info)} items as single zip: {zip_filename}")
logger.debug(f"Path parameter: {path_param}")
# Download using FileStation Download API with multi-file mode
download_url = f"{BASE_URL}/entry.cgi"
params = {
"api": "SYNO.FileStation.Download",
"version": "2",
"method": "download",
"path": path_param,
"mode": "download",
"_sid": sid
}
response = session.get(download_url, params=params,
verify=False, timeout=300, stream=True)
if response.status_code == 200:
# Save zip file
with open(local_zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Verify file was created
if os.path.exists(local_zip_path) and os.path.getsize(local_zip_path) > 0:
file_size = os.path.getsize(local_zip_path)
logger.debug(
f"Successfully downloaded: {local_zip_path} ({file_size} bytes)")
# Create result entries for each file
results = [
{
"file_name": file_info.get("name", "unknown"),
"local_path": local_zip_path,
"success": True,
"error_message": None,
"is_directory": file_info.get("isdir", False)
}
for file_info in files_info
]
return "success", results, f"Đã tải xuống {len(files_info)} file vào {local_zip_path}", local_zip_path
else:
logger.error(
f"Downloaded file is empty or does not exist: {local_zip_path}")
return "error", [], "File tải xuống bị lỗi (0 bytes)", None
else:
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
logger.error(f"Download failed: {error_msg}")
return "error", [], error_msg, None
except Exception as e:
logger.error(
f"Error in download_files_as_single_zip: {e}", exc_info=True)
return "error", [], f"Lỗi: {str(e)}", None