""" Supabase service wrapper for managing userslist and submissions. Uses Supabase for userslist storage and can optionally use for submissions. """ import os import logging from typing import List, Dict, Any, Optional from datetime import datetime from dotenv import load_dotenv from supabase import create_client # Load .env.local load_dotenv('.env.local') load_dotenv() logger = logging.getLogger(__name__) # Supabase configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") _supabase_client = None def get_supabase_client(): """Get or create Supabase client instance.""" global _supabase_client if _supabase_client is None: if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: raise ValueError( "SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY not configured") _supabase_client = create_client( SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) logger.debug("Supabase client initialized") return _supabase_client # =========================== # Userslist Management # =========================== def get_userslist() -> List[str]: """Fetch all usernames from Supabase userslist table.""" try: client = get_supabase_client() response = client.table("userslist").select( "username").order("username", desc=False).execute() usernames = [] for item in response.data: if isinstance(item, dict) and "username" in item: usernames.append(item["username"]) return usernames except Exception as e: logger.error(f"Error fetching userslist: {e}") return [] def add_username(username: str) -> Dict[str, Any]: """Add a new username to Supabase userslist table.""" username = username.strip() if not username: return {"success": False, "message": "Empty username"} try: client = get_supabase_client() # Check if username already exists existing = client.table("userslist").select( "id").eq("username", username).execute() if existing.data: all_usernames = get_userslist() return {"success": False, "message": "Already exists", "usernames": all_usernames} # Insert new username client.table("userslist").insert({"username": username}).execute() all_usernames = get_userslist() return {"success": True, "message": "Added", "usernames": all_usernames} except Exception as e: logger.error(f"Error adding username: {e}") return {"success": False, "message": f"Could not save: {str(e)}"} def delete_username(username: str) -> Dict[str, Any]: """Delete a username from Supabase userslist table.""" username = username.strip() try: client = get_supabase_client() # Check if username exists existing = client.table("userslist").select( "id").eq("username", username).execute() if not existing.data: all_usernames = get_userslist() return {"success": False, "message": "Not found", "usernames": all_usernames} # Delete username client.table("userslist").delete().eq("username", username).execute() all_usernames = get_userslist() return {"success": True, "message": "Deleted", "usernames": all_usernames} except Exception as e: logger.error(f"Error deleting username: {e}") return {"success": False, "message": f"Could not delete: {str(e)}"} # =========================== # Submissions Management (via Supabase) # =========================== def create_submission_supabase(submission_id: str, usernames: List[str], ge_input: str) -> Dict[str, Any]: """Create a new submission in Supabase.""" try: client = get_supabase_client() # Get max queue_position from pending submissions pending = client.table("submissions").select("queue_position").eq( "status", "pending").order("queue_position", desc=True).limit(1).execute() next_position = 1 if pending.data and len(pending.data) > 0: first_item = pending.data[0] if isinstance(first_item, dict): max_pos = first_item.get("queue_position") if isinstance(max_pos, int): next_position = max_pos + 1 submission_data = { "submission_id": submission_id, "status": "pending", "input": { "usernames": usernames, "ge_input": ge_input }, "results": [], "queue_position": next_position, "retry_count": 0 } response = client.table("submissions").insert( submission_data).execute() if response.data: return {"success": True, "submission": response.data[0]} return {"success": False, "message": "Failed to create submission"} except Exception as e: logger.error(f"Error creating submission: {e}") return {"success": False, "message": str(e)} def get_submission_by_id(submission_id: str) -> Optional[Dict[str, Any]]: """Get a single submission by its ID.""" try: client = get_supabase_client() response = client.table("submissions").select( "*").eq("id", submission_id).limit(1).execute() if response.data and len(response.data) > 0: result = response.data[0] if isinstance(result, dict): return result return None except Exception as e: logger.error(f"Error fetching submission by id: {e}") return None def create_retry_submission(username: str, ge_id_and_lang: str) -> Optional[Dict[str, Any]]: """Create a new submission for retry (simpler version for retrying errors).""" try: import uuid client = get_supabase_client() # Get max queue_position from pending submissions pending = client.table("submissions").select("queue_position").eq( "status", "pending").order("queue_position", desc=True).limit(1).execute() next_position = 1 if pending.data and len(pending.data) > 0: first_item = pending.data[0] if isinstance(first_item, dict): max_pos = first_item.get("queue_position") if isinstance(max_pos, int): next_position = max_pos + 1 # Parse usernames usernames = [u.strip() for u in username.split(',') if u.strip()] submission_data = { "submission_id": str(uuid.uuid4()), "status": "pending", "input": { "usernames": usernames, "ge_input": ge_id_and_lang }, "results": [], "queue_position": next_position, "retry_count": 0 } response = client.table("submissions").insert( submission_data).execute() if response.data and len(response.data) > 0: result = response.data[0] if isinstance(result, dict): return result return None except Exception as e: logger.error(f"Error creating retry submission: {e}") return None def get_submissions_supabase(limit: int = 50, status: Optional[str] = None) -> Any: """Fetch submissions from Supabase.""" try: client = get_supabase_client() query = client.table("submissions").select("*") if status: query = query.eq("status", status) response = query.order("created_at", desc=True).limit(limit).execute() return response.data except Exception as e: logger.error(f"Error fetching submissions: {e}") return [] def get_pending_submissions_supabase() -> Any: """Get pending submissions from Supabase ordered by created_at (FIFO). NOTE: Only fetch raw_download submissions. TMS permission submissions are handled by TypeScript backend. """ try: client = get_supabase_client() response = ( client.table("submissions") .select("*") .eq("status", "pending") .eq("submission_type", "raw_download") # Only raw downloads .order("created_at", desc=False) # FIFO order .execute() ) return response.data except Exception as e: logger.error(f"Error fetching pending submissions: {e}") return [] def get_processing_submissions_supabase() -> Any: """Get all submissions currently in 'processing' status. Used by worker to detect and reset stuck submissions. NOTE: Only fetch raw_download submissions. TMS permission submissions are handled by TypeScript backend. """ try: client = get_supabase_client() response = ( client.table("submissions") .select("*") .eq("status", "processing") .eq("submission_type", "raw_download") # Only raw downloads .execute() ) return response.data except Exception as e: logger.error(f"Error fetching processing submissions: {e}") return [] def update_submission_supabase(submission_id: str, **kwargs) -> bool: """Update a submission in Supabase.""" try: client = get_supabase_client() response = client.table("submissions").update( kwargs).eq("submission_id", submission_id).execute() return len(response.data) > 0 except Exception as e: logger.error(f"Error updating submission: {e}") return False def delete_submission_supabase(submission_id: str) -> bool: """Delete a submission from Supabase.""" try: client = get_supabase_client() response = client.table("submissions").delete().eq( "submission_id", submission_id).execute() # Supabase returns empty data on successful delete return True except Exception as e: logger.error(f"Error deleting submission: {e}") return False # =========================== # Raw Downloads History Management # =========================== def create_raw_download_history( ge_id: str, lang: str, destination_path: str, files: List[Dict[str, Any]], status: str = "success", total_files: int = 0, successful_downloads: int = 0, mongodb_path: Optional[str] = None, mode: str = "api" ) -> Optional[Dict[str, Any]]: """Create a new raw download history entry in Supabase.""" try: client = get_supabase_client() record = { "ge_id": ge_id, "lang": lang.upper(), "destination_path": destination_path, "files": files, "status": status, "total_files": total_files, "successful_downloads": successful_downloads, "mongodb_path": mongodb_path, "mode": mode } response = client.table( "raw_download_history").insert(record).execute() if response.data and len(response.data) > 0: first_item = response.data[0] if isinstance(first_item, dict): logger.debug(f"Created raw download history: {ge_id}_{lang}") return first_item return None except Exception as e: logger.error( f"Error creating raw download history: {e}", exc_info=True) return None def get_raw_download_history(limit: int = 50) -> List[Dict[str, Any]]: """Fetch raw download history from Supabase, newest first.""" try: client = get_supabase_client() response = client.table("raw_download_history").select( "*").order("created_at", desc=True).limit(limit).execute() if response.data and isinstance(response.data, list): return [item for item in response.data if isinstance(item, dict)] return [] except Exception as e: logger.error(f"Error fetching raw download history: {e}") return [] def delete_raw_download_history(download_id: str) -> bool: """Delete a raw download history entry by ID.""" try: client = get_supabase_client() response = client.table("raw_download_history").delete().eq( "id", download_id).execute() return True except Exception as e: logger.error(f"Error deleting raw download history: {e}") return False # =========================== # OLD QUEUE FUNCTIONS REMOVED # =========================== # All raw_download_queue related functions have been removed. # Use downloads_service.py instead for file download management.