497 lines
17 KiB
Python
497 lines
17 KiB
Python
|
|
"""
|
|||
|
|
NAS Sharing API Module - FolderSharing API calls
|
|||
|
|
|
|||
|
|
EXTRACTED từ download_link.py DSMSeleniumLogin methods
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import requests
|
|||
|
|
import urllib3
|
|||
|
|
import logging
|
|||
|
|
from typing import Dict, List, Any, Optional, Union, TYPE_CHECKING
|
|||
|
|
from urllib.parse import urlencode
|
|||
|
|
from selenium import webdriver
|
|||
|
|
from selenium.webdriver.common.by import By
|
|||
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|||
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|||
|
|
from selenium.common.exceptions import TimeoutException
|
|||
|
|
|
|||
|
|
# Disable SSL warnings
|
|||
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|||
|
|
|
|||
|
|
# Setup logger
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
logger.setLevel(logging.INFO)
|
|||
|
|
|
|||
|
|
if TYPE_CHECKING:
|
|||
|
|
from ..aria2.download_manager import Aria2DownloadManager
|
|||
|
|
|
|||
|
|
# aria2 integration
|
|||
|
|
USE_ARIA2 = os.getenv('USE_ARIA2', 'true').lower() == 'true'
|
|||
|
|
# None, False (unavailable), or Aria2DownloadManager
|
|||
|
|
_aria2_manager: Optional[Union[bool, "Aria2DownloadManager"]] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_aria2_manager() -> "Aria2DownloadManager":
|
|||
|
|
"""
|
|||
|
|
Get or create aria2 manager instance for sharing downloads.
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
RuntimeError: If aria2 is not available
|
|||
|
|
"""
|
|||
|
|
global _aria2_manager
|
|||
|
|
if _aria2_manager is None and USE_ARIA2:
|
|||
|
|
try:
|
|||
|
|
from ..aria2.download_manager import get_aria2_manager as _get_manager
|
|||
|
|
_aria2_manager = _get_manager()
|
|||
|
|
if not _aria2_manager:
|
|||
|
|
raise RuntimeError("aria2 manager returned None")
|
|||
|
|
logger.debug("✅ aria2 manager initialized for Sharing downloads")
|
|||
|
|
except Exception as e:
|
|||
|
|
raise RuntimeError(f"aria2 is required but not available: {e}")
|
|||
|
|
|
|||
|
|
if _aria2_manager is False or _aria2_manager is None:
|
|||
|
|
raise RuntimeError("aria2 is required but not initialized")
|
|||
|
|
|
|||
|
|
return _aria2_manager # type: ignore
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_file_list(
|
|||
|
|
driver: webdriver.Chrome,
|
|||
|
|
sharing_id: str,
|
|||
|
|
folder_path: str = "/"
|
|||
|
|
) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
Lấy danh sách file/folder từ sharing link qua FolderSharing.List API
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
driver: Selenium WebDriver có cookies hợp lệ
|
|||
|
|
sharing_id: Sharing ID (trích xuất từ URL)
|
|||
|
|
folder_path: Đường dẫn folder cần list (mặc định: "/")
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
List of file/folder dicts với các key:
|
|||
|
|
- name: Tên file/folder
|
|||
|
|
- is_folder: True nếu là folder
|
|||
|
|
- size: Chuỗi size đã format
|
|||
|
|
- size_bytes: Size tính bằng bytes
|
|||
|
|
- path: Đường dẫn đầy đủ
|
|||
|
|
- additional: Metadata bổ sung từ API
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
RuntimeError: Khi session hết hạn (error 101) hoặc lỗi API khác
|
|||
|
|
"""
|
|||
|
|
# Tạo session với cookies từ Selenium
|
|||
|
|
session = requests.Session()
|
|||
|
|
for cookie in driver.get_cookies():
|
|||
|
|
session.cookies.set(
|
|||
|
|
cookie['name'], cookie['value'], domain=cookie['domain'])
|
|||
|
|
|
|||
|
|
# Endpoint FolderSharing.List API
|
|||
|
|
url = "https://disk.lezhin.com:5001/sharing/webapi/entry.cgi"
|
|||
|
|
params = {
|
|||
|
|
'api': 'SYNO.FolderSharing.List',
|
|||
|
|
'method': 'list',
|
|||
|
|
'version': '2',
|
|||
|
|
'offset': '0',
|
|||
|
|
'limit': '1000',
|
|||
|
|
'sort_by': '"name"',
|
|||
|
|
'sort_direction': '"ASC"',
|
|||
|
|
'action': '"enum"',
|
|||
|
|
'additional': '["size","owner","time","perm","type","mount_point_type"]',
|
|||
|
|
'filetype': '"all"',
|
|||
|
|
'folder_path': f'"{folder_path}"',
|
|||
|
|
'_sharing_id': f'"{sharing_id}"'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Log với context rõ ràng hơn
|
|||
|
|
import inspect
|
|||
|
|
caller = inspect.stack()[1].function if len(
|
|||
|
|
inspect.stack()) > 1 else "unknown"
|
|||
|
|
print(f"\n🔍 [{caller}] Lấy danh sách (FolderSharing API): {folder_path}")
|
|||
|
|
|
|||
|
|
# NO RETRY - Throw error ngay để phát hiện vấn đề sớm
|
|||
|
|
try:
|
|||
|
|
response = session.post(url, data=params, verify=False, timeout=30)
|
|||
|
|
result = response.json()
|
|||
|
|
|
|||
|
|
# Kiểm tra response
|
|||
|
|
if not result:
|
|||
|
|
raise RuntimeError("API không trả về dữ liệu")
|
|||
|
|
|
|||
|
|
# Kiểm tra success
|
|||
|
|
if not result.get("success"):
|
|||
|
|
error_code = result.get('error', {}).get('code')
|
|||
|
|
error_detail = result.get('error', {})
|
|||
|
|
print(f"❌ API lỗi {error_code}: {error_detail}")
|
|||
|
|
|
|||
|
|
# Error 101: Session hết hạn
|
|||
|
|
if error_code == 101:
|
|||
|
|
raise RuntimeError("SESSION_EXPIRED")
|
|||
|
|
|
|||
|
|
# Error 407: Rate limit - KHÔNG RETRY, throw ngay
|
|||
|
|
# (Frontend có debounce 300ms + Backend có rate limit 500ms)
|
|||
|
|
if error_code == 407:
|
|||
|
|
raise RuntimeError(f"API_ERROR_407_RATE_LIMIT: {error_detail}")
|
|||
|
|
|
|||
|
|
# Các lỗi API khác
|
|||
|
|
raise RuntimeError(f"API_ERROR_{error_code}: {error_detail}")
|
|||
|
|
|
|||
|
|
# Parse file list
|
|||
|
|
files = result['data']['files']
|
|||
|
|
print(f"✅ Thành công! Tìm thấy {len(files)} item(s).")
|
|||
|
|
|
|||
|
|
# Format file list
|
|||
|
|
formatted = []
|
|||
|
|
for f in files:
|
|||
|
|
is_folder = f.get('isdir', False)
|
|||
|
|
|
|||
|
|
# Lấy size (chỉ cho files, không có cho folders)
|
|||
|
|
size_bytes = 0
|
|||
|
|
if not is_folder and f.get('additional') and f['additional'].get('size'):
|
|||
|
|
size_bytes = f['additional']['size']
|
|||
|
|
|
|||
|
|
# Format size string (rỗng cho folders)
|
|||
|
|
size_str = "" if is_folder else _format_size(size_bytes)
|
|||
|
|
|
|||
|
|
formatted.append({
|
|||
|
|
'name': f.get('name', ''),
|
|||
|
|
'is_folder': is_folder,
|
|||
|
|
'size': size_str,
|
|||
|
|
'size_bytes': size_bytes,
|
|||
|
|
'path': f.get('path', ''),
|
|||
|
|
'additional': f.get('additional', {})
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return formatted
|
|||
|
|
|
|||
|
|
except RuntimeError:
|
|||
|
|
# RuntimeError (SESSION_EXPIRED, API_ERROR_xxx) → raise ngay
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
except requests.exceptions.Timeout as e:
|
|||
|
|
# Network timeout - KHÔNG RETRY, throw ngay
|
|||
|
|
print(f"❌ Timeout khi gọi API: {e}")
|
|||
|
|
raise RuntimeError(f"API_TIMEOUT: {e}") from e
|
|||
|
|
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
# Network/Request errors - KHÔNG RETRY, throw ngay
|
|||
|
|
print(f"❌ Network error khi gọi API: {e}")
|
|||
|
|
raise RuntimeError(f"API_NETWORK_ERROR: {e}") from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
def encode_path_to_dlink(path: str) -> str:
|
|||
|
|
"""
|
|||
|
|
Encode path to dlink token (hex encoding)
|
|||
|
|
|
|||
|
|
EXACT COPY từ download_link.py DSMSeleniumLogin.encode_path_to_dlink()
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
path: File path, e.g., "/수조(북극여우)/001화_PSD_JPG.zip"
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Hex-encoded path string for dlink parameter
|
|||
|
|
"""
|
|||
|
|
# Encode path to bytes (UTF-8) then convert to hex
|
|||
|
|
path_bytes = path.encode('utf-8')
|
|||
|
|
dlink_hex = path_bytes.hex()
|
|||
|
|
return dlink_hex
|
|||
|
|
|
|||
|
|
|
|||
|
|
def download_file_direct(
|
|||
|
|
driver: webdriver.Chrome,
|
|||
|
|
sharing_id: str,
|
|||
|
|
remote_path: str,
|
|||
|
|
is_dir: bool = False,
|
|||
|
|
save_path: Optional[str] = None,
|
|||
|
|
progress_callback=None
|
|||
|
|
) -> bool:
|
|||
|
|
"""
|
|||
|
|
⚠️ DEPRECATED - DO NOT USE
|
|||
|
|
|
|||
|
|
This function uses requests library instead of aria2.
|
|||
|
|
All downloads MUST use aria2 for parallel connections.
|
|||
|
|
|
|||
|
|
Use prepare_download_url() + aria2_manager.download_file() instead.
|
|||
|
|
|
|||
|
|
Kept for reference only - will be removed in future versions.
|
|||
|
|
"""
|
|||
|
|
raise NotImplementedError(
|
|||
|
|
"download_file_direct() is deprecated. "
|
|||
|
|
"Use prepare_download_url() + aria2_manager.download_file() instead. "
|
|||
|
|
"All downloads MUST use aria2."
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Old implementation removed - see git history if needed
|
|||
|
|
if not save_path:
|
|||
|
|
raise ValueError("save_path is required")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def prepare_download_url(
|
|||
|
|
driver: webdriver.Chrome,
|
|||
|
|
sharing_id: str,
|
|||
|
|
remote_path: str,
|
|||
|
|
file_name: str
|
|||
|
|
) -> tuple[str, str]:
|
|||
|
|
"""
|
|||
|
|
Extract cookies and build download URL from Selenium driver.
|
|||
|
|
|
|||
|
|
This function MUST be called with driver_lock held.
|
|||
|
|
Returns URL and cookies that can be used for aria2 download WITHOUT driver.
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
driver: Selenium WebDriver with valid cookies
|
|||
|
|
sharing_id: Sharing ID
|
|||
|
|
remote_path: File/folder path in NAS
|
|||
|
|
file_name: Filename for URL path
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(download_url, cookie_string) tuple
|
|||
|
|
"""
|
|||
|
|
# Extract cookies from Selenium
|
|||
|
|
cookie_string = "; ".join([
|
|||
|
|
f"{c['name']}={c['value']}"
|
|||
|
|
for c in driver.get_cookies()
|
|||
|
|
])
|
|||
|
|
|
|||
|
|
# Convert path to dlink (hex-encoded) for BOTH files and folders
|
|||
|
|
dlink = encode_path_to_dlink(remote_path)
|
|||
|
|
|
|||
|
|
# Build GET request URL (same for files and folders)
|
|||
|
|
url = f"https://disk.lezhin.com:5001/fsdownload/webapi/file_download.cgi/{file_name}"
|
|||
|
|
params = {
|
|||
|
|
'dlink': f'"{dlink}"',
|
|||
|
|
'noCache': str(int(time.time() * 1000)),
|
|||
|
|
'_sharing_id': f'"{sharing_id}"',
|
|||
|
|
'api': 'SYNO.FolderSharing.Download',
|
|||
|
|
'version': '2',
|
|||
|
|
'method': 'download',
|
|||
|
|
'mode': 'download',
|
|||
|
|
'stdhtml': 'false'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
download_url = f"{url}?{urlencode(params)}"
|
|||
|
|
|
|||
|
|
return download_url, cookie_string
|
|||
|
|
|
|||
|
|
|
|||
|
|
def validate_download_link(
|
|||
|
|
download_url: str,
|
|||
|
|
cookie_string: str,
|
|||
|
|
timeout: int = 10
|
|||
|
|
) -> tuple[bool, Optional[str], Optional[int]]:
|
|||
|
|
"""
|
|||
|
|
Validate download link bằng HEAD request để phát hiện link chết TRƯỚC KHI tải.
|
|||
|
|
|
|||
|
|
Giải pháp cho vấn đề: Files pending lâu → Link expire → Download 38B HTML error page
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
download_url: URL download đã build từ prepare_download_url()
|
|||
|
|
cookie_string: Cookie string từ prepare_download_url()
|
|||
|
|
timeout: Timeout cho HEAD request (giây)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(is_valid, error_message, content_length) tuple:
|
|||
|
|
- is_valid: True nếu link OK, False nếu chết
|
|||
|
|
- error_message: None nếu OK, error string nếu failed
|
|||
|
|
- content_length: File size (bytes) nếu có, None nếu không xác định
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
RuntimeError: Nếu có lỗi network/timeout
|
|||
|
|
"""
|
|||
|
|
import requests
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Prepare headers
|
|||
|
|
headers = {
|
|||
|
|
'Cookie': cookie_string,
|
|||
|
|
'Referer': 'https://disk.lezhin.com:5001/',
|
|||
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.debug(
|
|||
|
|
f"[validate_link] Sending HEAD request to validate link...")
|
|||
|
|
|
|||
|
|
# Send HEAD request (lightweight, chỉ lấy headers)
|
|||
|
|
response = requests.head(
|
|||
|
|
download_url,
|
|||
|
|
headers=headers,
|
|||
|
|
timeout=timeout,
|
|||
|
|
verify=False, # NAS self-signed cert
|
|||
|
|
allow_redirects=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Check status code
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
# Link OK!
|
|||
|
|
content_length = response.headers.get('Content-Length')
|
|||
|
|
size_bytes = int(content_length) if content_length else None
|
|||
|
|
|
|||
|
|
# Check Content-Type để phát hiện HTML error page
|
|||
|
|
content_type = response.headers.get('Content-Type', '')
|
|||
|
|
|
|||
|
|
if 'text/html' in content_type.lower():
|
|||
|
|
# NAS trả về HTML thay vì file → Link có vấn đề
|
|||
|
|
error_msg = f"Link trả về HTML (possibly expired or error page). Content-Type: {content_type}"
|
|||
|
|
logger.warning(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
return False, error_msg, None
|
|||
|
|
|
|||
|
|
# Link hợp lệ
|
|||
|
|
size_str = f"{size_bytes:,} bytes" if size_bytes else "unknown size"
|
|||
|
|
logger.debug(f"[validate_link] ✅ Link valid ({size_str})")
|
|||
|
|
return True, None, size_bytes
|
|||
|
|
|
|||
|
|
elif response.status_code == 401:
|
|||
|
|
# Unauthorized → Session expired
|
|||
|
|
error_msg = "Session expired (401 Unauthorized)"
|
|||
|
|
logger.warning(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
return False, error_msg, None
|
|||
|
|
|
|||
|
|
elif response.status_code == 403:
|
|||
|
|
# Forbidden → Permission denied
|
|||
|
|
error_msg = "Permission denied (403 Forbidden)"
|
|||
|
|
logger.warning(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
return False, error_msg, None
|
|||
|
|
|
|||
|
|
elif response.status_code == 404:
|
|||
|
|
# Not found → File không tồn tại
|
|||
|
|
error_msg = "File not found (404)"
|
|||
|
|
logger.warning(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
return False, error_msg, None
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# Unexpected status code
|
|||
|
|
error_msg = f"Unexpected HTTP status: {response.status_code}"
|
|||
|
|
logger.warning(f"[validate_link] ⚠️ {error_msg}")
|
|||
|
|
return False, error_msg, None
|
|||
|
|
|
|||
|
|
except requests.exceptions.Timeout as e:
|
|||
|
|
error_msg = f"Timeout after {timeout}s: {e}"
|
|||
|
|
logger.error(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
raise RuntimeError(error_msg) from e
|
|||
|
|
|
|||
|
|
except requests.exceptions.RequestException as e:
|
|||
|
|
error_msg = f"Network error: {e}"
|
|||
|
|
logger.error(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
raise RuntimeError(error_msg) from e
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = f"Unexpected error: {e}"
|
|||
|
|
logger.error(f"[validate_link] ❌ {error_msg}")
|
|||
|
|
raise RuntimeError(error_msg) from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_initial_path(driver: webdriver.Chrome) -> str:
|
|||
|
|
"""
|
|||
|
|
Lấy folder path hiện tại từ thanh điều hướng (path bar) trên trang sharing.
|
|||
|
|
|
|||
|
|
Workflow:
|
|||
|
|
1. Chờ path bar xuất hiện (tối đa 15s)
|
|||
|
|
2. Nếu timeout → Check login dialog
|
|||
|
|
3. Nếu có login dialog → raise NEEDS_LOGIN
|
|||
|
|
4. Nếu không có cả 2 → raise error
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
driver: Selenium WebDriver đang ở trang sharing
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Path của folder hiện tại từ thanh điều hướng
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
RuntimeError:
|
|||
|
|
- "NEEDS_LOGIN" nếu phát hiện login dialog
|
|||
|
|
- Error khác nếu không tìm thấy path bar
|
|||
|
|
"""
|
|||
|
|
if not driver:
|
|||
|
|
raise RuntimeError("Driver không tồn tại")
|
|||
|
|
|
|||
|
|
# BƯỚC 1: Chờ folder path bar xuất hiện (tối đa 15s)
|
|||
|
|
print("🔍 Đang chờ folder path bar xuất hiện (timeout 15s)...")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
path_btn = WebDriverWait(driver, 15).until(
|
|||
|
|
EC.presence_of_element_located((By.CSS_SELECTOR,
|
|||
|
|
"li table.x-btn button.x-btn-text[aria-label]"))
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
folder_name = path_btn.get_attribute('aria-label')
|
|||
|
|
if folder_name:
|
|||
|
|
path = f"/{folder_name}"
|
|||
|
|
print(f"✅ Phát hiện folder path: {path}")
|
|||
|
|
return path
|
|||
|
|
|
|||
|
|
# aria-label rỗng → fallback check login
|
|||
|
|
print("⚠️ Path bar không có aria-label, kiểm tra login dialog...")
|
|||
|
|
|
|||
|
|
except TimeoutException:
|
|||
|
|
# Timeout 15s - không tìm thấy path bar
|
|||
|
|
print("⚠️ Timeout: Không tìm thấy folder path bar sau 15 giây")
|
|||
|
|
|
|||
|
|
# BƯỚC 2: Path bar không có → Check login dialog
|
|||
|
|
print("🔍 Kiểm tra login dialog...")
|
|||
|
|
try:
|
|||
|
|
login_dialog = driver.find_element(
|
|||
|
|
By.CSS_SELECTOR, "div#webfm-access-dialog")
|
|||
|
|
if login_dialog and login_dialog.is_displayed():
|
|||
|
|
print("⚠️ Phát hiện login dialog - cần đăng nhập File Station")
|
|||
|
|
raise RuntimeError("NEEDS_LOGIN")
|
|||
|
|
except Exception as e:
|
|||
|
|
# Không tìm thấy login dialog hoặc lỗi khác
|
|||
|
|
if "NEEDS_LOGIN" in str(e):
|
|||
|
|
raise
|
|||
|
|
print(f"⚠️ Không tìm thấy login dialog: {e}")
|
|||
|
|
|
|||
|
|
# BƯỚC 3: Không có path bar và không có login dialog → THROW ERROR
|
|||
|
|
current_url = driver.current_url if driver else "unknown"
|
|||
|
|
raise RuntimeError(
|
|||
|
|
f"Không tìm thấy folder path bar trên trang sharing sau 15 giây. "
|
|||
|
|
f"URL: {current_url}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_sharing_id(url: str) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
Extract sharing ID from sharing link URL
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
url: Sharing link URL (e.g., "https://disk.lezhin.com:5001/sharing/ABC123/...")
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Sharing ID string, or None if invalid format
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
parts = url.split('/sharing/')
|
|||
|
|
if len(parts) < 2:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
sharing_id = parts[1].split('/')[0].split('?')[0]
|
|||
|
|
return sharing_id
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ Lỗi extract_sharing_id: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _format_size(size_bytes: int) -> str:
|
|||
|
|
"""
|
|||
|
|
Helper: Format bytes to human-readable size
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
size_bytes: Size in bytes
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Formatted string (e.g., "1.5 MB")
|
|||
|
|
"""
|
|||
|
|
if size_bytes == 0:
|
|||
|
|
return ""
|
|||
|
|
elif size_bytes < 1024:
|
|||
|
|
return f"{size_bytes} B"
|
|||
|
|
elif size_bytes < 1024 * 1024:
|
|||
|
|
return f"{size_bytes / 1024:.2f} KB"
|
|||
|
|
elif size_bytes < 1024 * 1024 * 1024:
|
|||
|
|
return f"{size_bytes / (1024 * 1024):.2f} MB"
|
|||
|
|
else:
|
|||
|
|
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
|