""" Background worker to process pending submissions. Behavior: - Polls pending submissions from Supabase (`backend.services.supabase_service.get_pending_submissions_supabase`) if available. - For each submission: 1. Validate all GE/lang TMS data upfront; if any error, stop and report error for entire submission. 2. Mark as `processing`, call `automation.process_project` for each GE. 3. Determine overall status: if ANY detail has status='error', mark submission as 'failed'; else 'completed'. 4. Save structured results with url, message, status per detail. 5. Close driver on error; reuse on success. 6. Reset stuck submissions (processing > timeout) back to pending or failed. This module can be started as a standalone script during development, or imported and started from a FastAPI startup event. """ import time import logging import signal import sys from datetime import datetime, timedelta from typing import List, Dict, Any from .services import supabase_service # NOTE: TMS permission automation moved to TypeScript backend (backend-tms/) # This worker only handles raw_download submissions from .services import mongodb_service # Use logger from root (configured in main.py) log = logging.getLogger(__name__) # Reduce verbosity from httpx (Supabase client) to avoid spamming INFO logs for each request logging.getLogger('httpx').setLevel(logging.WARNING) # Signal handler for graceful shutdown _shutdown_requested = False def signal_handler(sig, frame): global _shutdown_requested log.info(f'Received signal {sig}, initiating graceful shutdown...') _shutdown_requested = True # NOTE: No need to close Chrome driver anymore - TMS automation moved to TypeScript backend # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def reset_processing_to_pending_on_startup(): """Reset all 'processing' submissions back to 'pending' on server startup. This handles the case where server was shut down while submissions were being processed. After reset, worker will process them in order by created_at (oldest first). """ try: processing_subs = supabase_service.get_processing_submissions_supabase() if not processing_subs: log.info('No stuck processing submissions found on startup') return log.info( f'Found {len(processing_subs)} submissions stuck in processing state. Resetting to pending...') for sub in processing_subs: submission_id = str(sub.get('submission_id') or sub.get('id')) try: supabase_service.update_submission_supabase( submission_id, status='pending', error_message=None # clear any previous error message ) log.info( f'Reset submission {submission_id} from processing to pending') except Exception as e: log.error(f'Error resetting submission {submission_id}: {e}') except Exception as e: log.error(f'Error in reset_processing_to_pending_on_startup: {e}') def parse_ge_input_raw(ge_input: str) -> List[Dict[str, Any]]: """Split raw ge_input (newline separated) into list of dicts {ge_id, langs, tn_mode}. This is a minimal parser compatible with the old input format like "1000 de" or "696 us". The `langs` returned is a list of tuples (lang_code, final_flag) where final_flag is False by default. """ lines = [l.strip() for l in str(ge_input).splitlines() if l.strip()] parsed = [] for line in lines: parts = line.split() ge_id = parts[0] lang = parts[1] if len(parts) > 1 else '' lang_code = lang parsed.append({'ge_id': ge_id, 'langs': [ (lang_code, False)], 'tn_mode': False}) return parsed def extract_url_from_message(message: str) -> tuple[str, str]: """Extract URL and clean message from automation message format. Expected format: "https://... -> message text" or just "message text" Returns: (url, clean_message) """ if not message: return ('#', '') # Try to parse URL -> message format (legacy automation output) if ' -> ' in message: parts = message.split(' -> ', 1) url = parts[0].strip() clean_msg = parts[1].strip() # verify it looks like a URL if url.startswith('http://') or url.startswith('https://'): return (url, clean_msg) # If no URL found, use '#' as placeholder return ('#', message) def validate_ge_inputs(ge_list: List[Dict[str, Any]]) -> tuple[bool, List[str]]: """Validate all GE/lang TMS data upfront. Return (is_valid, error_messages). Mimics old project behavior: check ALL before processing ANY. """ errors = [] for ge in ge_list: ge_id = ge['ge_id'] langs = ge['langs'] # list of (lang_code, final_flag) for lang_code, _ in langs: if not lang_code: continue orig_lang = lang_code.split( '_')[1] if '_' in lang_code else lang_code try: tms_id = mongodb_service.get_tms_data(ge_id, orig_lang) log.debug( f"Validated TMS data: GE={ge_id}, lang={orig_lang}, tms_id={tms_id}") except Exception as e: error_msg = str(e) if 'TMS ID chưa được bổ sung' in error_msg: error_msg = f"{ge_id} {orig_lang}: TMS ID chưa được bổ sung" errors.append(error_msg) return (len(errors) == 0, errors) def process_one_submission(sub: Dict[str, Any]): submission_id = sub.get('submission_id') or sub.get('id') if not submission_id: log.error('Submission missing id, skipping') return submission_id = str(submission_id) log.info(f"Processing submission: {submission_id}") try: # Mark processing supabase_service.update_submission_supabase( submission_id, status='processing') usernames = [] input_data = sub.get('input') if isinstance( sub.get('input'), dict) else None if input_data: usernames = input_data.get('usernames', []) or [] ge_input = input_data.get('ge_input', '') else: # compatibility: older shape usernames = sub.get('usernames', []) or [] ge_input = sub.get('ge_input', '') parsed_ge = parse_ge_input_raw(ge_input) # ===== STEP 1: Validate all GE data BEFORE processing ===== is_valid, validation_errors = validate_ge_inputs(parsed_ge) if not is_valid: error_message = "Không thể tiếp tục do có lỗi với dữ liệu đầu vào:\n" + \ "\n".join(validation_errors) log.error(f"Validation failed: {error_message}") # Build error results for all GE entries ge_results = [] for ge in parsed_ge: ge_id = ge['ge_id'] langs = ge['langs'] ge_id_and_lang = f"{ge_id} {langs[0][0]}" if langs else ge_id details = [] for username in usernames: details.append({ 'username': username, 'url': '#', 'message': error_message, 'status': 'error', 'errorDetails': None }) ge_result = { 'geIdAndLang': ge_id_and_lang, 'completionTime': datetime.utcnow().isoformat() + 'Z', 'details': details } ge_results.append(ge_result) # Mark as failed due to validation error supabase_service.update_submission_supabase( submission_id, status='failed', error_message=error_message, results=ge_results) log.info( f"Submission {submission_id} marked as failed due to validation error") return # ===== STEP 2: Process submissions ===== # NOTE: This Python worker only handles raw_download submissions. # TMS permission submissions are handled by TypeScript backend (backend-tms/) # If we reach here, it means the submission_type filter failed - this is a bug. error_message = "ERROR: Python worker only handles raw_download submissions. TMS permission automation has been moved to TypeScript backend (port 4000)." log.error( f"Submission {submission_id} should not be processed by Python worker - check submission_type filter") ge_results = [] for ge in parsed_ge: ge_id = ge['ge_id'] langs = ge['langs'] ge_id_and_lang = f"{ge_id} {langs[0][0]}" if langs else ge_id details = [] for username in usernames: details.append({ 'username': username, 'url': '#', 'message': error_message, 'status': 'error', 'errorDetails': None }) ge_result = { 'geIdAndLang': ge_id_and_lang, 'completionTime': datetime.utcnow().isoformat() + 'Z', 'details': details } ge_results.append(ge_result) supabase_service.update_submission_supabase( submission_id, status='failed', error_message=error_message, results=ge_results) log.info( f"Submission {submission_id} marked as failed - wrong submission_type") except Exception as e: err = str(e) log.exception(f"Error processing submission {submission_id}: {err}") supabase_service.update_submission_supabase( submission_id, status='failed', error_message=err) def run_loop(poll_interval: int = 3): """Run the worker loop with adaptive backoff when no pending submissions are found. Behavior: - When there are pending submissions: process them and poll at `poll_interval` (fast). - When there are no pending submissions: exponentially back off (double interval) up to `max_interval`. - This reduces the number of requests to Supabase and avoids repeated httpx INFO logs. - On startup, resets any stuck 'processing' submissions back to 'pending'. """ log.info('Worker started, resetting any stuck submissions...') reset_processing_to_pending_on_startup() log.info('Polling for pending submissions...') current_interval = poll_interval max_interval = 30 # seconds while not _shutdown_requested: try: pending = supabase_service.get_pending_submissions_supabase() if pending and isinstance(pending, list) and len(pending) > 0: # reset to fast polling when work exists current_interval = poll_interval for sub in pending: if _shutdown_requested: log.info('Shutdown requested, stopping processing') break process_one_submission(sub) # short pause before re-checking time.sleep(current_interval) else: # no pending work: backoff to reduce polling frequency current_interval = min( max_interval, current_interval * 2) if current_interval < max_interval else max_interval log.debug( f'No pending submissions, backing off to {current_interval}s') time.sleep(current_interval) except Exception as e: log.exception(f'Worker loop error: {e}') # on error, wait a bit before retrying time.sleep(min(max_interval, current_interval * 2)) log.info('Worker shutting down gracefully') # NOTE: No cleanup needed - TMS automation moved to TypeScript backend if __name__ == '__main__': run_loop()