ge-tool/backend/services/nas_api/file_operations.py

660 lines
25 KiB
Python
Raw Permalink Normal View History

2025-12-10 06:41:43 +00:00
"""
NAS API File Operations
File listing, downloading, and management.
"""
import os
import json
import shutil
import requests
from typing import List, Dict, Optional, Tuple, Union, TYPE_CHECKING
from urllib.parse import urlencode
from .config import BASE_URL, DESTINATION_PATH, session, logger
from .session import load_sid
from .exceptions import NASConnectionError, NASAPIError
if TYPE_CHECKING:
from ..aria2.download_manager import Aria2DownloadManager
# aria2 integration
USE_ARIA2 = os.getenv('USE_ARIA2', 'true').lower() == 'true'
# None, False (unavailable), or Aria2DownloadManager
_aria2_manager: Optional[Union[bool, "Aria2DownloadManager"]] = None
def get_aria2_manager() -> "Aria2DownloadManager":
"""Get or create aria2 manager instance. Raises error if unavailable."""
global _aria2_manager
if _aria2_manager is None and USE_ARIA2:
try:
from ..aria2.download_manager import get_aria2_manager as _get_manager
_aria2_manager = _get_manager()
if _aria2_manager:
logger.debug(
"✅ aria2 manager initialized for NAS API downloads")
else:
raise RuntimeError("aria2 manager returned None")
except Exception as e:
logger.error(f"❌ CRITICAL: aria2 not available: {e}")
raise RuntimeError(
f"aria2 is required but not available: {e}") from e
if _aria2_manager is False or _aria2_manager is None:
raise RuntimeError("aria2 is required but not initialized")
return _aria2_manager # type: ignore
def syno_entry_request(sid: str, calls: List[Dict]) -> Dict:
"""
Make a SYNO.Entry.Request with multiple API calls.
Returns the JSON response.
Raises NASAPIError on API failure.
"""
try:
url = f"{BASE_URL}/entry.cgi"
compound = json.dumps(calls)
params = {
"api": "SYNO.Entry.Request",
"version": 1,
"method": "request",
"_sid": sid,
"compound": compound
}
resp = session.post(url, data=params, verify=False, timeout=30)
resp.raise_for_status()
data = resp.json()
logger.debug(f"NAS API response: {data}")
return data
except requests.exceptions.RequestException as e:
logger.error(f"Network error during NAS API call: {e}")
raise NASConnectionError(f"Lỗi kết nối NAS API: {e}")
except Exception as e:
logger.error(f"Unexpected error during NAS API call: {e}")
raise NASAPIError(f"Lỗi NAS API: {e}")
def test_session_validity(sid: str) -> bool:
"""
Test if the current session ID is still valid by attempting a simple API call.
Returns True if valid, False if expired/invalid.
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list_share",
"version": 2
}])
# Check if the API call was successful
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
logger.debug("Session ID is valid")
return True
logger.warning("Session ID appears to be invalid")
return False
except Exception as e:
logger.warning(f"Session validation failed: {e}")
return False
def list_folder_contents(sid: str, folder_path: str) -> Tuple[bool, List[Dict], Optional[str]]:
"""
List files and folders in the specified path.
Args:
sid: Session ID
folder_path: Path to list (e.g., "/Comic_TMS_L/DKI/JP")
Returns:
Tuple[bool, List[Dict], Optional[str]]:
- success: True if successful, False if failed
- files: List of file/folder dictionaries
- error_message: Error message if failed, None if successful
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list",
"version": 2,
"folder_path": folder_path,
"additional": ["real_path", "size", "owner", "time", "perm", "type"]
}])
# Parse the response
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
files = first_result.get("data", {}).get("files", [])
logger.debug(
f"Successfully listed {len(files)} items in {folder_path}")
return True, files, None
else:
error_info = first_result.get("error", {})
error_code = error_info.get("code") if isinstance(
error_info, dict) else None
# Check for specific error codes
if error_code == 408:
error_msg = f"Thư mục không tồn tại: {folder_path}"
elif error_code == 400:
error_msg = f"Đường dẫn không hợp lệ: {folder_path}"
elif error_code == 402:
error_msg = f"Không có quyền truy cập: {folder_path}"
else:
error_msg = f"Lỗi NAS (code {error_code}): {error_info}"
logger.error(
f"Failed to list folder {folder_path}: {error_msg}")
return False, [], error_msg
else:
error_msg = "Invalid API response format"
logger.error(f"Failed to list folder {folder_path}: {error_msg}")
return False, [], error_msg
except Exception as e:
error_msg = f"Exception during folder listing: {e}"
logger.error(error_msg)
return False, [], error_msg
def list_shares(sid: str) -> Tuple[bool, List[str], Optional[str]]:
"""
List all available root shares.
Returns:
Tuple[bool, List[str], Optional[str]]:
- success: True if successful, False if failed
- shares: List of share names
- error_message: Error message if failed, None if successful
"""
try:
data = syno_entry_request(sid, [{
"api": "SYNO.FileStation.List",
"method": "list_share",
"version": 2
}])
# Parse the response
result = data.get("data", {}).get("result", [])
if result and len(result) > 0:
first_result = result[0]
if first_result.get("success"):
shares_data = first_result.get("data", {}).get("shares", [])
share_names = [share["name"] for share in shares_data]
logger.debug(f"Successfully listed {len(share_names)} shares")
return True, share_names, None
else:
error_info = first_result.get("error", {})
error_msg = f"NAS API Error: {error_info}"
logger.error(f"Failed to list shares: {error_msg}")
return False, [], error_msg
else:
error_msg = "Invalid API response format"
logger.error(f"Failed to list shares: {error_msg}")
return False, [], error_msg
except Exception as e:
error_msg = f"Exception during shares listing: {e}"
logger.error(error_msg)
return False, [], error_msg
def get_files_for_path(folder_path: str) -> Tuple[str, List[Dict], Optional[str]]:
"""
High-level function to get files for a given path.
Handles session management automatically.
Args:
folder_path: Path to list (e.g., "/Comic_TMS_L/DKI/JP")
Returns:
Tuple[str, List[Dict], Optional[str]]:
- status: "success", "otp_required", or "error"
- files: List of file/folder dictionaries (empty if not success)
- message: Status message or error description
"""
try:
# Try to load existing session
sid = load_sid()
if sid:
# Test if session is still valid
if test_session_validity(sid):
# Session is valid, try to list files
success, files, error_msg = list_folder_contents(
sid, folder_path)
if success:
return "success", files, "Đã tải danh sách file thành công"
else:
# error_msg already contains detailed message from list_folder_contents
logger.warning(
f"Failed to list files despite valid session: {error_msg}")
return "error", [], error_msg
else:
# Session is invalid, need new login
logger.debug("Session expired, OTP required")
return "otp_required", [], "Phiên đăng nhập đã hết hạn. Vui lòng nhập mã OTP."
else:
# No session found, need login
logger.debug("No session found, OTP required")
return "otp_required", [], "Cần đăng nhập. Vui lòng nhập mã OTP."
except Exception as e:
logger.error(f"Unexpected error in get_files_for_path: {e}")
return "error", [], f"Lỗi hệ thống: {e}"
def download_single_file_aria2(
sid: str,
remote_path: str,
local_save_path: str,
is_dir: bool = False,
progress_callback=None,
max_speed: Optional[str] = None
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Download via aria2 - NO FALLBACK, throws error if aria2 unavailable
Args:
sid: Session ID
remote_path: Path on NAS
local_save_path: Local file path to save to
is_dir: Whether the remote_path is a directory (will be zipped)
progress_callback: Optional callback(downloaded_bytes, total_bytes)
max_speed: Optional bandwidth limit (e.g., '100K')
Returns:
Tuple[success, error_message, gid]
"""
# Will raise RuntimeError if aria2 not available
manager = get_aria2_manager()
try:
# Build download URL with SID
url = f"{BASE_URL}/entry.cgi"
params = {
"api": "SYNO.FileStation.Download",
"version": 2,
"method": "download",
"path": remote_path,
"mode": "download",
"_sid": sid
}
# Convert to full URL with query string
download_url = f"{url}?{urlencode(params)}"
logger.debug(
f"[aria2] Downloading: {remote_path} -> {local_save_path}")
# Download via aria2
success, error_msg, gid = manager.download_file(
url=download_url,
dest_path=local_save_path,
progress_callback=progress_callback,
max_download_limit=max_speed
)
if success:
logger.debug(f"[aria2] ✅ Download success: {local_save_path}")
return True, None, gid
else:
# NO FALLBACK - Throw error immediately
error_msg = error_msg or "aria2 download failed"
logger.error(f"[aria2] ❌ FAILED: {error_msg}")
raise RuntimeError(f"aria2 download failed: {error_msg}")
except RuntimeError:
# Re-raise RuntimeError (from aria2 failures)
raise
except Exception as e:
# Unexpected exception - NO FALLBACK
logger.error(f"[aria2] ❌ ERROR: {e}")
raise RuntimeError(f"aria2 unexpected error: {e}") from e
def cleanup_duplicates_before_download(dest_path: str, file_name_pattern: str, exact_filename: str, delete_dirs: bool = True) -> None:
"""
Before download, delete all files/folders that contain file_name_pattern in their name.
Also attempts to delete the exact_filename if it exists.
Args:
dest_path: Destination directory path
file_name_pattern: Original file name to search for (e.g., "[식자설정]")
exact_filename: Exact filename of the file to be downloaded (e.g., "[식자설정].zip")
delete_dirs: Whether to delete matching directories (True for API mode, False for Sharing mode)
"""
try:
if not os.path.exists(dest_path):
return
# List all files/folders in destination
for item_name in os.listdir(dest_path):
item_path = os.path.join(dest_path, item_name)
# Check if item name contains the file_name_pattern
if file_name_pattern in item_name:
try:
# If it's the exact file we are about to download
if item_name == exact_filename:
if os.path.isfile(item_path):
os.remove(item_path)
logger.debug(f"Deleted existing file: {item_path}")
elif os.path.isdir(item_path) and delete_dirs:
shutil.rmtree(item_path)
logger.debug(
f"Deleted existing folder (exact match): {item_path}")
continue
# For other duplicates
if os.path.isfile(item_path):
os.remove(item_path)
logger.debug(f"Cleaned up duplicate file: {item_path}")
elif os.path.isdir(item_path):
if delete_dirs:
shutil.rmtree(item_path)
logger.debug(
f"Cleaned up duplicate folder: {item_path}")
else:
logger.debug(
f"Skipped deleting duplicate folder (Sharing Mode): {item_path}")
except Exception as e:
logger.warning(f"Could not delete {item_path}: {e}")
except Exception as e:
logger.error(
f"Error cleaning up duplicates for '{file_name_pattern}': {e}")
def download_files_to_destination(
files_info: List[Dict],
ge_id: str,
lang: str,
base_destination: Optional[str] = None,
progress_callback=None
) -> Tuple[str, List[Dict], Optional[str], Optional[str]]:
"""
Download multiple files from NAS to network destination.
Simplified version - no complex error handling, just download.
Args:
files_info: List of file dicts with keys: name, path, isdir
ge_id: GE ID for folder naming (not used if base_destination provided)
lang: Language code (not used if base_destination provided)
base_destination: Full destination path. If None, will create GEID_LANG folder under DESTINATION_PATH
Returns:
Tuple[str, List[Dict], Optional[str], Optional[str]]:
- status: "success", "partial", or "error"
- results: List of download results with success/error for each file
- message: Overall status message
- destination_path: The actual destination path where files were downloaded
"""
try:
# Validate session
sid = load_sid()
if not sid:
return "error", [], "Session không hợp lệ. Vui lòng đăng nhập lại.", None
# Determine destination path
if base_destination is None:
# Create GEID_LANG folder under default DESTINATION_PATH
lang_upper = lang.upper()
dest_folder = f"{ge_id}_{lang_upper}"
if DESTINATION_PATH.endswith("\\"):
dest_path = f"{DESTINATION_PATH}{dest_folder}"
else:
dest_path = f"{DESTINATION_PATH}\\{dest_folder}"
else:
# Use provided path directly (already includes GEID_LANG)
dest_path = base_destination
# Create destination directory
os.makedirs(dest_path, exist_ok=True)
logger.debug(f"Destination created: {dest_path}")
results = []
successful_downloads = 0
total_files = len(files_info)
# Initialize files_status for progress tracking
files_status = [
{
"name": f"{file_info.get('name', 'unknown')}{'.zip' if file_info.get('isdir', False) else ''}",
"status": "pending",
"progress": None,
"downloaded": 0,
"total": None,
"is_folder": file_info.get("isdir", False)
}
for file_info in files_info
]
for idx, file_info in enumerate(files_info):
file_name = file_info.get("name", "unknown")
remote_path = file_info.get("path", "")
is_dir = file_info.get("isdir", False)
# Add .zip extension for directories
local_filename = file_name
if is_dir:
local_filename += ".zip"
# Cleanup duplicates BEFORE download
cleanup_duplicates_before_download(
dest_path, file_name, local_filename, delete_dirs=True)
# Build file path manually
local_file_path = f"{dest_path}\\{local_filename}"
# Safety check: If file still exists (could not be deleted), append _NEW
if os.path.exists(local_file_path):
logger.warning(
f"Could not delete existing file {local_filename}, appending _NEW")
name, ext = os.path.splitext(local_filename)
local_filename = f"{name}_NEW{ext}"
local_file_path = f"{dest_path}\\{local_filename}"
# Update status to downloading
files_status[idx]["status"] = "downloading"
# Create file-specific progress callback
def file_progress_callback(downloaded_bytes, total_bytes):
files_status[idx]["downloaded"] = downloaded_bytes
files_status[idx]["total"] = total_bytes
if total_bytes > 0:
progress_pct = (downloaded_bytes / total_bytes) * 100
files_status[idx]["progress"] = round(progress_pct, 1)
else:
files_status[idx]["progress"] = None
# Call parent progress callback
if progress_callback:
progress_callback(idx, total_files, {
"current_file": local_filename,
"current_file_index": idx + 1,
"total_files": total_files,
"current_file_progress": files_status[idx].get("progress"),
"current_file_downloaded": downloaded_bytes,
"current_file_total": total_bytes if total_bytes > 0 else None,
"files_status": files_status
})
# Download via aria2 (required)
success, error_msg, gid = download_single_file_aria2(
sid, remote_path, local_file_path, is_dir,
progress_callback=file_progress_callback
)
# Update files_status
if success:
files_status[idx]["status"] = "completed"
successful_downloads += 1
else:
files_status[idx]["status"] = "failed"
result = {
"file_name": file_name,
"local_path": local_file_path,
"success": success,
"error_message": error_msg,
"is_directory": is_dir
}
results.append(result)
# No cleanup after download anymore
# Determine overall status
total_files = len(files_info)
if successful_downloads == total_files:
status = "success"
message = f"Đã tải xuống {successful_downloads}/{total_files} file vào {dest_path}"
elif successful_downloads > 0:
status = "partial"
message = f"Đã tải xuống {successful_downloads}/{total_files} file"
else:
status = "error"
message = "Không thể tải xuống file nào"
return status, results, message, dest_path
except Exception as e:
logger.error(f"Error in download_files_to_destination: {e}")
return "error", [], f"Lỗi: {e}", None
def download_files_as_single_zip(
files_info: List[Dict],
dest_path: str,
sid: Optional[str] = None
) -> Tuple[str, List[Dict], str, Optional[str]]:
"""
Download multiple files/folders as a single zip file.
Args:
files_info: List of file info dicts with 'path', 'name', 'isdir' keys
dest_path: Local destination path (network share)
sid: Session ID for authentication
Returns:
Tuple of (status, results, message, zip_file_path)
- status: "success", "error"
- results: List of file info dicts with download status
- message: Human-readable status message
- zip_file_path: Full path to the created zip file
Logic:
- If only 1 file/folder: zip that single item (don't double-zip folders)
- If multiple files: zip all into a single archive named after parent folder
"""
try:
if not sid:
return "error", [], "Missing session ID", None
if not files_info:
return "error", [], "No files selected", None
# Ensure destination directory exists
os.makedirs(dest_path, exist_ok=True)
# Determine zip file name
if len(files_info) == 1:
# Single file/folder: use its name for zip
single_item = files_info[0]
zip_base_name = single_item.get("name", "download")
else:
# Multiple files: find common parent folder name
# Extract parent path from first file
first_path = files_info[0].get("path", "")
# Remove leading/trailing slashes and get parent
clean_path = first_path.strip("/")
path_parts = clean_path.split("/")
if len(path_parts) > 1:
# Use parent folder name
zip_base_name = path_parts[-2]
else:
# Fallback to generic name
zip_base_name = "download"
# Sanitize filename
zip_base_name = "".join(c if c.isalnum() or c in (
' ', '-', '_') else '_' for c in zip_base_name)
zip_filename = f"{zip_base_name}.zip"
local_zip_path = os.path.join(dest_path, zip_filename)
# Prepare path parameter for FileStation API
# API expects comma-separated paths with proper JSON encoding
path_list = [f'"{file_info["path"]}"' for file_info in files_info]
path_param = f'[{",".join(path_list)}]'
logger.debug(
f"Downloading {len(files_info)} items as single zip: {zip_filename}")
logger.debug(f"Path parameter: {path_param}")
# Download using FileStation Download API with multi-file mode
download_url = f"{BASE_URL}/entry.cgi"
params = {
"api": "SYNO.FileStation.Download",
"version": "2",
"method": "download",
"path": path_param,
"mode": "download",
"_sid": sid
}
response = session.get(download_url, params=params,
verify=False, timeout=300, stream=True)
if response.status_code == 200:
# Save zip file
with open(local_zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Verify file was created
if os.path.exists(local_zip_path) and os.path.getsize(local_zip_path) > 0:
file_size = os.path.getsize(local_zip_path)
logger.debug(
f"Successfully downloaded: {local_zip_path} ({file_size} bytes)")
# Create result entries for each file
results = [
{
"file_name": file_info.get("name", "unknown"),
"local_path": local_zip_path,
"success": True,
"error_message": None,
"is_directory": file_info.get("isdir", False)
}
for file_info in files_info
]
return "success", results, f"Đã tải xuống {len(files_info)} file vào {local_zip_path}", local_zip_path
else:
logger.error(
f"Downloaded file is empty or does not exist: {local_zip_path}")
return "error", [], "File tải xuống bị lỗi (0 bytes)", None
else:
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
logger.error(f"Download failed: {error_msg}")
return "error", [], error_msg, None
except Exception as e:
logger.error(
f"Error in download_files_as_single_zip: {e}", exc_info=True)
return "error", [], f"Lỗi: {str(e)}", None