ge-tool/backend/services/nas_sharing_api/selenium_operations.py
2025-12-10 13:41:43 +07:00

497 lines
17 KiB
Python
Executable File

"""
NAS Sharing API Module - FolderSharing API calls
EXTRACTED từ download_link.py DSMSeleniumLogin methods
"""
import os
import sys
import time
import requests
import urllib3
import logging
from typing import Dict, List, Any, Optional, Union, TYPE_CHECKING
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Setup logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if TYPE_CHECKING:
from ..aria2.download_manager import Aria2DownloadManager
# aria2 integration
USE_ARIA2 = os.getenv('USE_ARIA2', 'true').lower() == 'true'
# None, False (unavailable), or Aria2DownloadManager
_aria2_manager: Optional[Union[bool, "Aria2DownloadManager"]] = None
def get_aria2_manager() -> "Aria2DownloadManager":
"""
Get or create aria2 manager instance for sharing downloads.
Raises:
RuntimeError: If aria2 is not available
"""
global _aria2_manager
if _aria2_manager is None and USE_ARIA2:
try:
from ..aria2.download_manager import get_aria2_manager as _get_manager
_aria2_manager = _get_manager()
if not _aria2_manager:
raise RuntimeError("aria2 manager returned None")
logger.debug("✅ aria2 manager initialized for Sharing downloads")
except Exception as e:
raise RuntimeError(f"aria2 is required but not available: {e}")
if _aria2_manager is False or _aria2_manager is None:
raise RuntimeError("aria2 is required but not initialized")
return _aria2_manager # type: ignore
def get_file_list(
driver: webdriver.Chrome,
sharing_id: str,
folder_path: str = "/"
) -> List[Dict[str, Any]]:
"""
Lấy danh sách file/folder từ sharing link qua FolderSharing.List API
Args:
driver: Selenium WebDriver có cookies hợp lệ
sharing_id: Sharing ID (trích xuất từ URL)
folder_path: Đường dẫn folder cần list (mặc định: "/")
Returns:
List of file/folder dicts với các key:
- name: Tên file/folder
- is_folder: True nếu là folder
- size: Chuỗi size đã format
- size_bytes: Size tính bằng bytes
- path: Đường dẫn đầy đủ
- additional: Metadata bổ sung từ API
Raises:
RuntimeError: Khi session hết hạn (error 101) hoặc lỗi API khác
"""
# Tạo session với cookies từ Selenium
session = requests.Session()
for cookie in driver.get_cookies():
session.cookies.set(
cookie['name'], cookie['value'], domain=cookie['domain'])
# Endpoint FolderSharing.List API
url = "https://disk.lezhin.com:5001/sharing/webapi/entry.cgi"
params = {
'api': 'SYNO.FolderSharing.List',
'method': 'list',
'version': '2',
'offset': '0',
'limit': '1000',
'sort_by': '"name"',
'sort_direction': '"ASC"',
'action': '"enum"',
'additional': '["size","owner","time","perm","type","mount_point_type"]',
'filetype': '"all"',
'folder_path': f'"{folder_path}"',
'_sharing_id': f'"{sharing_id}"'
}
# Log với context rõ ràng hơn
import inspect
caller = inspect.stack()[1].function if len(
inspect.stack()) > 1 else "unknown"
print(f"\n🔍 [{caller}] Lấy danh sách (FolderSharing API): {folder_path}")
# NO RETRY - Throw error ngay để phát hiện vấn đề sớm
try:
response = session.post(url, data=params, verify=False, timeout=30)
result = response.json()
# Kiểm tra response
if not result:
raise RuntimeError("API không trả về dữ liệu")
# Kiểm tra success
if not result.get("success"):
error_code = result.get('error', {}).get('code')
error_detail = result.get('error', {})
print(f"❌ API lỗi {error_code}: {error_detail}")
# Error 101: Session hết hạn
if error_code == 101:
raise RuntimeError("SESSION_EXPIRED")
# Error 407: Rate limit - KHÔNG RETRY, throw ngay
# (Frontend có debounce 300ms + Backend có rate limit 500ms)
if error_code == 407:
raise RuntimeError(f"API_ERROR_407_RATE_LIMIT: {error_detail}")
# Các lỗi API khác
raise RuntimeError(f"API_ERROR_{error_code}: {error_detail}")
# Parse file list
files = result['data']['files']
print(f"✅ Thành công! Tìm thấy {len(files)} item(s).")
# Format file list
formatted = []
for f in files:
is_folder = f.get('isdir', False)
# Lấy size (chỉ cho files, không có cho folders)
size_bytes = 0
if not is_folder and f.get('additional') and f['additional'].get('size'):
size_bytes = f['additional']['size']
# Format size string (rỗng cho folders)
size_str = "" if is_folder else _format_size(size_bytes)
formatted.append({
'name': f.get('name', ''),
'is_folder': is_folder,
'size': size_str,
'size_bytes': size_bytes,
'path': f.get('path', ''),
'additional': f.get('additional', {})
})
return formatted
except RuntimeError:
# RuntimeError (SESSION_EXPIRED, API_ERROR_xxx) → raise ngay
raise
except requests.exceptions.Timeout as e:
# Network timeout - KHÔNG RETRY, throw ngay
print(f"❌ Timeout khi gọi API: {e}")
raise RuntimeError(f"API_TIMEOUT: {e}") from e
except requests.exceptions.RequestException as e:
# Network/Request errors - KHÔNG RETRY, throw ngay
print(f"❌ Network error khi gọi API: {e}")
raise RuntimeError(f"API_NETWORK_ERROR: {e}") from e
def encode_path_to_dlink(path: str) -> str:
"""
Encode path to dlink token (hex encoding)
EXACT COPY từ download_link.py DSMSeleniumLogin.encode_path_to_dlink()
Args:
path: File path, e.g., "/수조(북극여우)/001화_PSD_JPG.zip"
Returns:
Hex-encoded path string for dlink parameter
"""
# Encode path to bytes (UTF-8) then convert to hex
path_bytes = path.encode('utf-8')
dlink_hex = path_bytes.hex()
return dlink_hex
def download_file_direct(
driver: webdriver.Chrome,
sharing_id: str,
remote_path: str,
is_dir: bool = False,
save_path: Optional[str] = None,
progress_callback=None
) -> bool:
"""
⚠️ DEPRECATED - DO NOT USE
This function uses requests library instead of aria2.
All downloads MUST use aria2 for parallel connections.
Use prepare_download_url() + aria2_manager.download_file() instead.
Kept for reference only - will be removed in future versions.
"""
raise NotImplementedError(
"download_file_direct() is deprecated. "
"Use prepare_download_url() + aria2_manager.download_file() instead. "
"All downloads MUST use aria2."
)
# Old implementation removed - see git history if needed
if not save_path:
raise ValueError("save_path is required")
def prepare_download_url(
driver: webdriver.Chrome,
sharing_id: str,
remote_path: str,
file_name: str
) -> tuple[str, str]:
"""
Extract cookies and build download URL from Selenium driver.
This function MUST be called with driver_lock held.
Returns URL and cookies that can be used for aria2 download WITHOUT driver.
Args:
driver: Selenium WebDriver with valid cookies
sharing_id: Sharing ID
remote_path: File/folder path in NAS
file_name: Filename for URL path
Returns:
(download_url, cookie_string) tuple
"""
# Extract cookies from Selenium
cookie_string = "; ".join([
f"{c['name']}={c['value']}"
for c in driver.get_cookies()
])
# Convert path to dlink (hex-encoded) for BOTH files and folders
dlink = encode_path_to_dlink(remote_path)
# Build GET request URL (same for files and folders)
url = f"https://disk.lezhin.com:5001/fsdownload/webapi/file_download.cgi/{file_name}"
params = {
'dlink': f'"{dlink}"',
'noCache': str(int(time.time() * 1000)),
'_sharing_id': f'"{sharing_id}"',
'api': 'SYNO.FolderSharing.Download',
'version': '2',
'method': 'download',
'mode': 'download',
'stdhtml': 'false'
}
download_url = f"{url}?{urlencode(params)}"
return download_url, cookie_string
def validate_download_link(
download_url: str,
cookie_string: str,
timeout: int = 10
) -> tuple[bool, Optional[str], Optional[int]]:
"""
Validate download link bằng HEAD request để phát hiện link chết TRƯỚC KHI tải.
Giải pháp cho vấn đề: Files pending lâu → Link expire → Download 38B HTML error page
Args:
download_url: URL download đã build từ prepare_download_url()
cookie_string: Cookie string từ prepare_download_url()
timeout: Timeout cho HEAD request (giây)
Returns:
(is_valid, error_message, content_length) tuple:
- is_valid: True nếu link OK, False nếu chết
- error_message: None nếu OK, error string nếu failed
- content_length: File size (bytes) nếu có, None nếu không xác định
Raises:
RuntimeError: Nếu có lỗi network/timeout
"""
import requests
try:
# Prepare headers
headers = {
'Cookie': cookie_string,
'Referer': 'https://disk.lezhin.com:5001/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
logger.debug(
f"[validate_link] Sending HEAD request to validate link...")
# Send HEAD request (lightweight, chỉ lấy headers)
response = requests.head(
download_url,
headers=headers,
timeout=timeout,
verify=False, # NAS self-signed cert
allow_redirects=True
)
# Check status code
if response.status_code == 200:
# Link OK!
content_length = response.headers.get('Content-Length')
size_bytes = int(content_length) if content_length else None
# Check Content-Type để phát hiện HTML error page
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type.lower():
# NAS trả về HTML thay vì file → Link có vấn đề
error_msg = f"Link trả về HTML (possibly expired or error page). Content-Type: {content_type}"
logger.warning(f"[validate_link] ❌ {error_msg}")
return False, error_msg, None
# Link hợp lệ
size_str = f"{size_bytes:,} bytes" if size_bytes else "unknown size"
logger.debug(f"[validate_link] ✅ Link valid ({size_str})")
return True, None, size_bytes
elif response.status_code == 401:
# Unauthorized → Session expired
error_msg = "Session expired (401 Unauthorized)"
logger.warning(f"[validate_link] ❌ {error_msg}")
return False, error_msg, None
elif response.status_code == 403:
# Forbidden → Permission denied
error_msg = "Permission denied (403 Forbidden)"
logger.warning(f"[validate_link] ❌ {error_msg}")
return False, error_msg, None
elif response.status_code == 404:
# Not found → File không tồn tại
error_msg = "File not found (404)"
logger.warning(f"[validate_link] ❌ {error_msg}")
return False, error_msg, None
else:
# Unexpected status code
error_msg = f"Unexpected HTTP status: {response.status_code}"
logger.warning(f"[validate_link] ⚠️ {error_msg}")
return False, error_msg, None
except requests.exceptions.Timeout as e:
error_msg = f"Timeout after {timeout}s: {e}"
logger.error(f"[validate_link] ❌ {error_msg}")
raise RuntimeError(error_msg) from e
except requests.exceptions.RequestException as e:
error_msg = f"Network error: {e}"
logger.error(f"[validate_link] ❌ {error_msg}")
raise RuntimeError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error: {e}"
logger.error(f"[validate_link] ❌ {error_msg}")
raise RuntimeError(error_msg) from e
def get_initial_path(driver: webdriver.Chrome) -> str:
"""
Lấy folder path hiện tại từ thanh điều hướng (path bar) trên trang sharing.
Workflow:
1. Chờ path bar xuất hiện (tối đa 15s)
2. Nếu timeout → Check login dialog
3. Nếu có login dialog → raise NEEDS_LOGIN
4. Nếu không có cả 2 → raise error
Args:
driver: Selenium WebDriver đang ở trang sharing
Returns:
Path của folder hiện tại từ thanh điều hướng
Raises:
RuntimeError:
- "NEEDS_LOGIN" nếu phát hiện login dialog
- Error khác nếu không tìm thấy path bar
"""
if not driver:
raise RuntimeError("Driver không tồn tại")
# BƯỚC 1: Chờ folder path bar xuất hiện (tối đa 15s)
print("🔍 Đang chờ folder path bar xuất hiện (timeout 15s)...")
try:
path_btn = WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR,
"li table.x-btn button.x-btn-text[aria-label]"))
)
folder_name = path_btn.get_attribute('aria-label')
if folder_name:
path = f"/{folder_name}"
print(f"✅ Phát hiện folder path: {path}")
return path
# aria-label rỗng → fallback check login
print("⚠️ Path bar không có aria-label, kiểm tra login dialog...")
except TimeoutException:
# Timeout 15s - không tìm thấy path bar
print("⚠️ Timeout: Không tìm thấy folder path bar sau 15 giây")
# BƯỚC 2: Path bar không có → Check login dialog
print("🔍 Kiểm tra login dialog...")
try:
login_dialog = driver.find_element(
By.CSS_SELECTOR, "div#webfm-access-dialog")
if login_dialog and login_dialog.is_displayed():
print("⚠️ Phát hiện login dialog - cần đăng nhập File Station")
raise RuntimeError("NEEDS_LOGIN")
except Exception as e:
# Không tìm thấy login dialog hoặc lỗi khác
if "NEEDS_LOGIN" in str(e):
raise
print(f"⚠️ Không tìm thấy login dialog: {e}")
# BƯỚC 3: Không có path bar và không có login dialog → THROW ERROR
current_url = driver.current_url if driver else "unknown"
raise RuntimeError(
f"Không tìm thấy folder path bar trên trang sharing sau 15 giây. "
f"URL: {current_url}"
)
def extract_sharing_id(url: str) -> Optional[str]:
"""
Extract sharing ID from sharing link URL
Args:
url: Sharing link URL (e.g., "https://disk.lezhin.com:5001/sharing/ABC123/...")
Returns:
Sharing ID string, or None if invalid format
"""
try:
parts = url.split('/sharing/')
if len(parts) < 2:
return None
sharing_id = parts[1].split('/')[0].split('?')[0]
return sharing_id
except Exception as e:
print(f"❌ Lỗi extract_sharing_id: {e}")
return None
def _format_size(size_bytes: int) -> str:
"""
Helper: Format bytes to human-readable size
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB")
"""
if size_bytes == 0:
return ""
elif size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.2f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"