375 lines
15 KiB
Python
375 lines
15 KiB
Python
|
|
"""
|
||
|
|
NAS Sharing Worker - Thread-based queue processor
|
||
|
|
|
||
|
|
REFACTORED: Dùng nas_sharing_auth + nas_sharing_api modules
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
import threading
|
||
|
|
import queue
|
||
|
|
import uuid
|
||
|
|
from typing import Dict, Optional
|
||
|
|
from functools import wraps
|
||
|
|
from selenium import webdriver
|
||
|
|
from selenium.webdriver.chrome.service import Service
|
||
|
|
from selenium.webdriver.chrome.options import Options
|
||
|
|
from selenium.common.exceptions import NoSuchWindowException, WebDriverException
|
||
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
||
|
|
|
||
|
|
from .nas_sharing_api import (
|
||
|
|
SharingSessionManager,
|
||
|
|
perform_login,
|
||
|
|
extract_sharing_id,
|
||
|
|
get_initial_path,
|
||
|
|
get_file_list,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def handle_window_closed(func):
|
||
|
|
"""Decorator to handle browser crash/close errors"""
|
||
|
|
@wraps(func)
|
||
|
|
def wrapper(self, *args, **kwargs):
|
||
|
|
try:
|
||
|
|
return func(self, *args, **kwargs)
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = str(e).lower()
|
||
|
|
is_driver_error = (
|
||
|
|
'window already closed' in error_msg or
|
||
|
|
'web view not found' in error_msg or
|
||
|
|
'max retries exceeded' in error_msg or
|
||
|
|
'connection refused' in error_msg or
|
||
|
|
isinstance(e, (NoSuchWindowException, WebDriverException))
|
||
|
|
)
|
||
|
|
|
||
|
|
if is_driver_error:
|
||
|
|
print(f"[SharingWorker] ⚠️ Driver error: {str(e)[:100]}")
|
||
|
|
print(f"[SharingWorker] 🔄 Resetting driver...")
|
||
|
|
try:
|
||
|
|
if self.driver:
|
||
|
|
self.driver.quit()
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
self.driver = None
|
||
|
|
return func(self, *args, **kwargs)
|
||
|
|
else:
|
||
|
|
raise
|
||
|
|
return wrapper
|
||
|
|
|
||
|
|
|
||
|
|
class SharingLinkWorker:
|
||
|
|
"""
|
||
|
|
Worker processes sharing link requests from queue
|
||
|
|
Single browser instance with session persistence
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.driver: Optional[webdriver.Chrome] = None
|
||
|
|
self.session_manager: Optional[SharingSessionManager] = None
|
||
|
|
self.request_queue = queue.Queue()
|
||
|
|
self.results = {}
|
||
|
|
self.is_running = False
|
||
|
|
self.worker_thread = None
|
||
|
|
|
||
|
|
# Thread safety: Lock to prevent concurrent driver access
|
||
|
|
self.driver_lock = threading.RLock()
|
||
|
|
|
||
|
|
# OTP handling with modal shown tracking
|
||
|
|
self.otp_pending = False
|
||
|
|
self.otp_code: Optional[str] = None
|
||
|
|
self.otp_modal_shown = False
|
||
|
|
self.otp_submitted = False # Track OTP submission success
|
||
|
|
|
||
|
|
def start(self):
|
||
|
|
"""Start worker thread"""
|
||
|
|
if self.is_running:
|
||
|
|
return
|
||
|
|
self.is_running = True
|
||
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
||
|
|
self.worker_thread.start()
|
||
|
|
print("[SharingWorker] Started")
|
||
|
|
|
||
|
|
def stop(self):
|
||
|
|
"""Stop worker and cleanup"""
|
||
|
|
self.is_running = False
|
||
|
|
if self.driver:
|
||
|
|
try:
|
||
|
|
# Close all windows first
|
||
|
|
if len(self.driver.window_handles) > 0:
|
||
|
|
self.driver.quit()
|
||
|
|
else:
|
||
|
|
# Force kill if no windows
|
||
|
|
self.driver.service.process.terminate()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[SharingWorker] Warning during cleanup: {e}")
|
||
|
|
finally:
|
||
|
|
self.driver = None
|
||
|
|
print("[SharingWorker] Stopped")
|
||
|
|
|
||
|
|
def submit_request(self, url: str) -> str:
|
||
|
|
"""Submit sharing link for processing"""
|
||
|
|
request_id = str(uuid.uuid4())
|
||
|
|
self.request_queue.put({
|
||
|
|
'id': request_id,
|
||
|
|
'url': url,
|
||
|
|
'timestamp': time.time()
|
||
|
|
})
|
||
|
|
|
||
|
|
self.results[request_id] = {
|
||
|
|
'status': 'pending',
|
||
|
|
'message': 'Đang xử lý sharing link...'
|
||
|
|
}
|
||
|
|
|
||
|
|
return request_id
|
||
|
|
|
||
|
|
def get_result(self, request_id: str) -> Optional[Dict]:
|
||
|
|
"""Get processing result"""
|
||
|
|
return self.results.get(request_id)
|
||
|
|
|
||
|
|
def _worker_loop(self):
|
||
|
|
"""Main worker loop"""
|
||
|
|
print("[SharingWorker] Worker loop started")
|
||
|
|
|
||
|
|
while self.is_running:
|
||
|
|
try:
|
||
|
|
try:
|
||
|
|
request = self.request_queue.get(timeout=1)
|
||
|
|
except queue.Empty:
|
||
|
|
continue
|
||
|
|
|
||
|
|
request_id = request['id']
|
||
|
|
url = request['url']
|
||
|
|
|
||
|
|
print(f"[SharingWorker] Processing: {url}")
|
||
|
|
result = self._process_sharing_link(url)
|
||
|
|
|
||
|
|
self.results[request_id] = result
|
||
|
|
print(f"[SharingWorker] Completed: {result['status']}")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[SharingWorker] Error: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
|
||
|
|
def _ensure_driver_ready(self):
|
||
|
|
"""Setup Chrome driver if not exists - Thread-safe"""
|
||
|
|
with self.driver_lock:
|
||
|
|
if self.driver:
|
||
|
|
try:
|
||
|
|
_ = self.driver.current_url
|
||
|
|
print("[SharingWorker] ✅ Reusing existing driver")
|
||
|
|
return
|
||
|
|
except:
|
||
|
|
print("[SharingWorker] ⚠️ Driver dead, creating new...")
|
||
|
|
try:
|
||
|
|
self.driver.quit()
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
self.driver = None
|
||
|
|
|
||
|
|
# ========== TẤT CẢ CODE TẠO DRIVER TRONG LOCK ĐỂ TRÁNH RACE CONDITION ==========
|
||
|
|
print("[SharingWorker] 🚀 Creating new Chrome driver...")
|
||
|
|
chrome_options = Options()
|
||
|
|
|
||
|
|
# Chrome profile from environment
|
||
|
|
profile_path_env = os.getenv("NAS_CHROME_PROFILE_PATH")
|
||
|
|
if not profile_path_env:
|
||
|
|
raise ValueError("NAS_CHROME_PROFILE_PATH must be set in .env.local")
|
||
|
|
|
||
|
|
# Resolve absolute path
|
||
|
|
current_file = os.path.abspath(__file__)
|
||
|
|
backend_dir = os.path.dirname(os.path.dirname(current_file))
|
||
|
|
workspace_root = os.path.dirname(backend_dir)
|
||
|
|
profile_path = os.path.join(workspace_root, profile_path_env)
|
||
|
|
os.makedirs(profile_path, exist_ok=True)
|
||
|
|
|
||
|
|
# Chrome options (fix crash issues)
|
||
|
|
chrome_options.add_argument(f'user-data-dir={profile_path}')
|
||
|
|
chrome_options.add_argument('--disable-gpu')
|
||
|
|
chrome_options.add_argument('--start-maximized')
|
||
|
|
chrome_options.add_argument('--ignore-certificate-errors')
|
||
|
|
|
||
|
|
# Additional stability options (prevent crashes)
|
||
|
|
chrome_options.add_argument('--no-sandbox')
|
||
|
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
|
|
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
|
|
chrome_options.add_argument('--remote-debugging-port=0') # Let Chrome choose port
|
||
|
|
|
||
|
|
# Disable extensions to avoid conflicts
|
||
|
|
chrome_options.add_argument('--disable-extensions')
|
||
|
|
|
||
|
|
# Prevent "Chrome is being controlled by automated test software" banner
|
||
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||
|
|
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
|
|
||
|
|
service = Service(ChromeDriverManager().install())
|
||
|
|
|
||
|
|
try:
|
||
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[SharingWorker] ❌ Failed to create Chrome driver: {e}")
|
||
|
|
print(f"[SharingWorker] Profile path: {profile_path}")
|
||
|
|
print(f"[SharingWorker] Chrome options: {chrome_options.arguments}")
|
||
|
|
|
||
|
|
# Try to kill any zombie Chrome processes
|
||
|
|
import subprocess
|
||
|
|
try:
|
||
|
|
subprocess.run(['taskkill', '/F', '/IM', 'chrome.exe'],
|
||
|
|
capture_output=True, timeout=5)
|
||
|
|
subprocess.run(['taskkill', '/F', '/IM', 'chromedriver.exe'],
|
||
|
|
capture_output=True, timeout=5)
|
||
|
|
print(f"[SharingWorker] Killed zombie Chrome processes, retrying...")
|
||
|
|
time.sleep(2)
|
||
|
|
|
||
|
|
# Retry once after killing zombies
|
||
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
|
|
except Exception as retry_error:
|
||
|
|
print(f"[SharingWorker] ❌ Retry also failed: {retry_error}")
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Cannot create Chrome driver. "
|
||
|
|
f"Try: 1) Close all Chrome windows, 2) Delete chrome_profile_nas folder, 3) Restart"
|
||
|
|
) from e
|
||
|
|
|
||
|
|
# Create session manager
|
||
|
|
self.session_manager = SharingSessionManager(self.driver)
|
||
|
|
|
||
|
|
print(f"[SharingWorker] ✅ Driver created, profile: {profile_path}")
|
||
|
|
|
||
|
|
@handle_window_closed
|
||
|
|
def _perform_login(self) -> bool:
|
||
|
|
"""
|
||
|
|
Perform DSM login using nas_sharing_auth module
|
||
|
|
OTP via modal pattern
|
||
|
|
"""
|
||
|
|
if not self.driver or not self.session_manager:
|
||
|
|
raise RuntimeError("Driver not initialized")
|
||
|
|
|
||
|
|
# Type safety assertions
|
||
|
|
assert self.driver is not None
|
||
|
|
assert self.session_manager is not None
|
||
|
|
|
||
|
|
# OTP callback for nas_sharing_auth.perform_login()
|
||
|
|
def otp_callback() -> Optional[str]:
|
||
|
|
"""Wait for OTP from frontend modal"""
|
||
|
|
# Set pending flag
|
||
|
|
if not self.otp_pending:
|
||
|
|
self.otp_pending = True
|
||
|
|
self.otp_modal_shown = False
|
||
|
|
self.otp_submitted = False
|
||
|
|
|
||
|
|
# Wait for OTP (max 5 minutes)
|
||
|
|
for i in range(300):
|
||
|
|
if self.otp_code:
|
||
|
|
code = self.otp_code
|
||
|
|
# DON'T reset flags yet - wait for login completion
|
||
|
|
self.otp_code = None
|
||
|
|
return code
|
||
|
|
|
||
|
|
time.sleep(1)
|
||
|
|
if i % 10 == 0:
|
||
|
|
print(f"[SharingWorker] ⏳ Waiting for OTP... ({300-i}s)")
|
||
|
|
|
||
|
|
# Timeout
|
||
|
|
self.otp_pending = False
|
||
|
|
self.otp_modal_shown = False
|
||
|
|
self.otp_submitted = False
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Call perform_login() from nas_sharing_api
|
||
|
|
success = perform_login(
|
||
|
|
driver=self.driver,
|
||
|
|
otp_callback=otp_callback
|
||
|
|
)
|
||
|
|
|
||
|
|
if success:
|
||
|
|
print("[SharingWorker] ✅ Login successful!")
|
||
|
|
|
||
|
|
# QUAN TRỌNG: Mark OTP submitted TRƯỚC KHI reset flags
|
||
|
|
if self.otp_pending:
|
||
|
|
self.otp_submitted = True
|
||
|
|
print("[SharingWorker] ✅ OTP đã được xác nhận thành công")
|
||
|
|
|
||
|
|
# Đợi để Chrome profile lưu cookies (quan trọng!)
|
||
|
|
print("[SharingWorker] ⏳ Đợi 5s để lưu cookies vào Chrome profile...")
|
||
|
|
time.sleep(5)
|
||
|
|
|
||
|
|
# Reset OTP flags sau khi đã đợi
|
||
|
|
self.otp_pending = False
|
||
|
|
self.otp_modal_shown = False
|
||
|
|
|
||
|
|
print("[SharingWorker] ✅ Cookies đã được lưu vào Chrome profile")
|
||
|
|
return True
|
||
|
|
|
||
|
|
# Login failed - reset flags
|
||
|
|
self.otp_pending = False
|
||
|
|
self.otp_modal_shown = False
|
||
|
|
self.otp_submitted = False
|
||
|
|
return False
|
||
|
|
|
||
|
|
@handle_window_closed
|
||
|
|
def _process_sharing_link(self, url: str) -> Dict:
|
||
|
|
"""
|
||
|
|
Process sharing link - navigate và extract file list
|
||
|
|
"""
|
||
|
|
from .nas_sharing_api.selenium_operations import extract_sharing_id, get_initial_path, get_file_list
|
||
|
|
|
||
|
|
try:
|
||
|
|
sharing_id = extract_sharing_id(url)
|
||
|
|
if not sharing_id:
|
||
|
|
raise Exception("Cannot extract sharing_id from URL")
|
||
|
|
|
||
|
|
print(f"[SharingWorker] Sharing ID: {sharing_id}")
|
||
|
|
|
||
|
|
with self.driver_lock:
|
||
|
|
self._ensure_driver_ready()
|
||
|
|
assert self.driver is not None
|
||
|
|
|
||
|
|
# Clear cache để tránh conflict ExtJS
|
||
|
|
try:
|
||
|
|
self.driver.execute_cdp_cmd('Network.clearBrowserCache', {})
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
self.driver.get(url)
|
||
|
|
|
||
|
|
try:
|
||
|
|
initial_path = get_initial_path(self.driver)
|
||
|
|
except RuntimeError as e:
|
||
|
|
if "NEEDS_LOGIN" in str(e):
|
||
|
|
print("[SharingWorker] Login required")
|
||
|
|
if not self._perform_login():
|
||
|
|
raise Exception("Login failed")
|
||
|
|
|
||
|
|
# XÓA CACHE sau khi login xong để tránh conflict ExtJS
|
||
|
|
print("[SharingWorker] Xóa cache trước khi truy cập lại sharing link...")
|
||
|
|
try:
|
||
|
|
self.driver.execute_cdp_cmd('Network.clearBrowserCache', {})
|
||
|
|
except Exception as clear_error:
|
||
|
|
print(f"[SharingWorker] ⚠️ Không thể xóa cache: {clear_error}")
|
||
|
|
|
||
|
|
self.driver.get(url)
|
||
|
|
initial_path = get_initial_path(self.driver)
|
||
|
|
else:
|
||
|
|
raise
|
||
|
|
|
||
|
|
print(f"[SharingWorker] 📋 Lấy danh sách ROOT folder: {initial_path}")
|
||
|
|
files = get_file_list(self.driver, sharing_id, initial_path)
|
||
|
|
|
||
|
|
return {
|
||
|
|
'status': 'success',
|
||
|
|
'sharing_id': sharing_id,
|
||
|
|
'path': initial_path,
|
||
|
|
'files': files,
|
||
|
|
'total_files': len(files),
|
||
|
|
'message': f'Found {len(files)} files'
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[SharingWorker] Error: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
return {
|
||
|
|
'status': 'error',
|
||
|
|
'message': str(e)
|
||
|
|
}
|