ge-tool/backend/worker.py
2025-12-10 13:41:43 +07:00

304 lines
12 KiB
Python
Executable File

"""
Background worker to process pending submissions.
Behavior:
- Polls pending submissions from Supabase (`backend.services.supabase_service.get_pending_submissions_supabase`) if available.
- For each submission:
1. Validate all GE/lang TMS data upfront; if any error, stop and report error for entire submission.
2. Mark as `processing`, call `automation.process_project` for each GE.
3. Determine overall status: if ANY detail has status='error', mark submission as 'failed'; else 'completed'.
4. Save structured results with url, message, status per detail.
5. Close driver on error; reuse on success.
6. Reset stuck submissions (processing > timeout) back to pending or failed.
This module can be started as a standalone script during development, or imported and started
from a FastAPI startup event.
"""
import time
import logging
import signal
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Any
from .services import supabase_service
# NOTE: TMS permission automation moved to TypeScript backend (backend-tms/)
# This worker only handles raw_download submissions
from .services import mongodb_service
# Use logger from root (configured in main.py)
log = logging.getLogger(__name__)
# Reduce verbosity from httpx (Supabase client) to avoid spamming INFO logs for each request
logging.getLogger('httpx').setLevel(logging.WARNING)
# Signal handler for graceful shutdown
_shutdown_requested = False
def signal_handler(sig, frame):
global _shutdown_requested
log.info(f'Received signal {sig}, initiating graceful shutdown...')
_shutdown_requested = True
# NOTE: No need to close Chrome driver anymore - TMS automation moved to TypeScript backend
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def reset_processing_to_pending_on_startup():
"""Reset all 'processing' submissions back to 'pending' on server startup.
This handles the case where server was shut down while submissions were being processed.
After reset, worker will process them in order by created_at (oldest first).
"""
try:
processing_subs = supabase_service.get_processing_submissions_supabase()
if not processing_subs:
log.info('No stuck processing submissions found on startup')
return
log.info(
f'Found {len(processing_subs)} submissions stuck in processing state. Resetting to pending...')
for sub in processing_subs:
submission_id = str(sub.get('submission_id') or sub.get('id'))
try:
supabase_service.update_submission_supabase(
submission_id,
status='pending',
error_message=None # clear any previous error message
)
log.info(
f'Reset submission {submission_id} from processing to pending')
except Exception as e:
log.error(f'Error resetting submission {submission_id}: {e}')
except Exception as e:
log.error(f'Error in reset_processing_to_pending_on_startup: {e}')
def parse_ge_input_raw(ge_input: str) -> List[Dict[str, Any]]:
"""Split raw ge_input (newline separated) into list of dicts {ge_id, langs, tn_mode}.
This is a minimal parser compatible with the old input format like "1000 de" or "696 us".
The `langs` returned is a list of tuples (lang_code, final_flag) where final_flag is False by default.
"""
lines = [l.strip() for l in str(ge_input).splitlines() if l.strip()]
parsed = []
for line in lines:
parts = line.split()
ge_id = parts[0]
lang = parts[1] if len(parts) > 1 else ''
lang_code = lang
parsed.append({'ge_id': ge_id, 'langs': [
(lang_code, False)], 'tn_mode': False})
return parsed
def extract_url_from_message(message: str) -> tuple[str, str]:
"""Extract URL and clean message from automation message format.
Expected format: "https://... -> message text" or just "message text"
Returns: (url, clean_message)
"""
if not message:
return ('#', '')
# Try to parse URL -> message format (legacy automation output)
if ' -> ' in message:
parts = message.split(' -> ', 1)
url = parts[0].strip()
clean_msg = parts[1].strip()
# verify it looks like a URL
if url.startswith('http://') or url.startswith('https://'):
return (url, clean_msg)
# If no URL found, use '#' as placeholder
return ('#', message)
def validate_ge_inputs(ge_list: List[Dict[str, Any]]) -> tuple[bool, List[str]]:
"""Validate all GE/lang TMS data upfront. Return (is_valid, error_messages).
Mimics old project behavior: check ALL before processing ANY.
"""
errors = []
for ge in ge_list:
ge_id = ge['ge_id']
langs = ge['langs'] # list of (lang_code, final_flag)
for lang_code, _ in langs:
if not lang_code:
continue
orig_lang = lang_code.split(
'_')[1] if '_' in lang_code else lang_code
try:
tms_id = mongodb_service.get_tms_data(ge_id, orig_lang)
log.debug(
f"Validated TMS data: GE={ge_id}, lang={orig_lang}, tms_id={tms_id}")
except Exception as e:
error_msg = str(e)
if 'TMS ID chưa được bổ sung' in error_msg:
error_msg = f"{ge_id} {orig_lang}: TMS ID chưa được bổ sung"
errors.append(error_msg)
return (len(errors) == 0, errors)
def process_one_submission(sub: Dict[str, Any]):
submission_id = sub.get('submission_id') or sub.get('id')
if not submission_id:
log.error('Submission missing id, skipping')
return
submission_id = str(submission_id)
log.info(f"Processing submission: {submission_id}")
try:
# Mark processing
supabase_service.update_submission_supabase(
submission_id, status='processing')
usernames = []
input_data = sub.get('input') if isinstance(
sub.get('input'), dict) else None
if input_data:
usernames = input_data.get('usernames', []) or []
ge_input = input_data.get('ge_input', '')
else:
# compatibility: older shape
usernames = sub.get('usernames', []) or []
ge_input = sub.get('ge_input', '')
parsed_ge = parse_ge_input_raw(ge_input)
# ===== STEP 1: Validate all GE data BEFORE processing =====
is_valid, validation_errors = validate_ge_inputs(parsed_ge)
if not is_valid:
error_message = "Không thể tiếp tục do có lỗi với dữ liệu đầu vào:\n" + \
"\n".join(validation_errors)
log.error(f"Validation failed: {error_message}")
# Build error results for all GE entries
ge_results = []
for ge in parsed_ge:
ge_id = ge['ge_id']
langs = ge['langs']
ge_id_and_lang = f"{ge_id} {langs[0][0]}" if langs else ge_id
details = []
for username in usernames:
details.append({
'username': username,
'url': '#',
'message': error_message,
'status': 'error',
'errorDetails': None
})
ge_result = {
'geIdAndLang': ge_id_and_lang,
'completionTime': datetime.utcnow().isoformat() + 'Z',
'details': details
}
ge_results.append(ge_result)
# Mark as failed due to validation error
supabase_service.update_submission_supabase(
submission_id, status='failed', error_message=error_message, results=ge_results)
log.info(
f"Submission {submission_id} marked as failed due to validation error")
return
# ===== STEP 2: Process submissions =====
# NOTE: This Python worker only handles raw_download submissions.
# TMS permission submissions are handled by TypeScript backend (backend-tms/)
# If we reach here, it means the submission_type filter failed - this is a bug.
error_message = "ERROR: Python worker only handles raw_download submissions. TMS permission automation has been moved to TypeScript backend (port 4000)."
log.error(
f"Submission {submission_id} should not be processed by Python worker - check submission_type filter")
ge_results = []
for ge in parsed_ge:
ge_id = ge['ge_id']
langs = ge['langs']
ge_id_and_lang = f"{ge_id} {langs[0][0]}" if langs else ge_id
details = []
for username in usernames:
details.append({
'username': username,
'url': '#',
'message': error_message,
'status': 'error',
'errorDetails': None
})
ge_result = {
'geIdAndLang': ge_id_and_lang,
'completionTime': datetime.utcnow().isoformat() + 'Z',
'details': details
}
ge_results.append(ge_result)
supabase_service.update_submission_supabase(
submission_id, status='failed', error_message=error_message, results=ge_results)
log.info(
f"Submission {submission_id} marked as failed - wrong submission_type")
except Exception as e:
err = str(e)
log.exception(f"Error processing submission {submission_id}: {err}")
supabase_service.update_submission_supabase(
submission_id, status='failed', error_message=err)
def run_loop(poll_interval: int = 3):
"""Run the worker loop with adaptive backoff when no pending submissions are found.
Behavior:
- When there are pending submissions: process them and poll at `poll_interval` (fast).
- When there are no pending submissions: exponentially back off (double interval) up to `max_interval`.
- This reduces the number of requests to Supabase and avoids repeated httpx INFO logs.
- On startup, resets any stuck 'processing' submissions back to 'pending'.
"""
log.info('Worker started, resetting any stuck submissions...')
reset_processing_to_pending_on_startup()
log.info('Polling for pending submissions...')
current_interval = poll_interval
max_interval = 30 # seconds
while not _shutdown_requested:
try:
pending = supabase_service.get_pending_submissions_supabase()
if pending and isinstance(pending, list) and len(pending) > 0:
# reset to fast polling when work exists
current_interval = poll_interval
for sub in pending:
if _shutdown_requested:
log.info('Shutdown requested, stopping processing')
break
process_one_submission(sub)
# short pause before re-checking
time.sleep(current_interval)
else:
# no pending work: backoff to reduce polling frequency
current_interval = min(
max_interval, current_interval * 2) if current_interval < max_interval else max_interval
log.debug(
f'No pending submissions, backing off to {current_interval}s')
time.sleep(current_interval)
except Exception as e:
log.exception(f'Worker loop error: {e}')
# on error, wait a bit before retrying
time.sleep(min(max_interval, current_interval * 2))
log.info('Worker shutting down gracefully')
# NOTE: No cleanup needed - TMS automation moved to TypeScript backend
if __name__ == '__main__':
run_loop()