ge-tool/backend/services/supabase_service.py

363 lines
13 KiB
Python
Raw Permalink Normal View History

2025-12-10 06:41:43 +00:00
"""
Supabase service wrapper for managing userslist and submissions.
Uses Supabase for userslist storage and can optionally use for submissions.
"""
import os
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime
from dotenv import load_dotenv
from supabase import create_client
# Load .env.local
load_dotenv('.env.local')
load_dotenv()
logger = logging.getLogger(__name__)
# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
_supabase_client = None
def get_supabase_client():
"""Get or create Supabase client instance."""
global _supabase_client
if _supabase_client is None:
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise ValueError(
"SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY not configured")
_supabase_client = create_client(
SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
logger.debug("Supabase client initialized")
return _supabase_client
# ===========================
# Userslist Management
# ===========================
def get_userslist() -> List[str]:
"""Fetch all usernames from Supabase userslist table."""
try:
client = get_supabase_client()
response = client.table("userslist").select(
"username").order("username", desc=False).execute()
usernames = []
for item in response.data:
if isinstance(item, dict) and "username" in item:
usernames.append(item["username"])
return usernames
except Exception as e:
logger.error(f"Error fetching userslist: {e}")
return []
def add_username(username: str) -> Dict[str, Any]:
"""Add a new username to Supabase userslist table."""
username = username.strip()
if not username:
return {"success": False, "message": "Empty username"}
try:
client = get_supabase_client()
# Check if username already exists
existing = client.table("userslist").select(
"id").eq("username", username).execute()
if existing.data:
all_usernames = get_userslist()
return {"success": False, "message": "Already exists", "usernames": all_usernames}
# Insert new username
client.table("userslist").insert({"username": username}).execute()
all_usernames = get_userslist()
return {"success": True, "message": "Added", "usernames": all_usernames}
except Exception as e:
logger.error(f"Error adding username: {e}")
return {"success": False, "message": f"Could not save: {str(e)}"}
def delete_username(username: str) -> Dict[str, Any]:
"""Delete a username from Supabase userslist table."""
username = username.strip()
try:
client = get_supabase_client()
# Check if username exists
existing = client.table("userslist").select(
"id").eq("username", username).execute()
if not existing.data:
all_usernames = get_userslist()
return {"success": False, "message": "Not found", "usernames": all_usernames}
# Delete username
client.table("userslist").delete().eq("username", username).execute()
all_usernames = get_userslist()
return {"success": True, "message": "Deleted", "usernames": all_usernames}
except Exception as e:
logger.error(f"Error deleting username: {e}")
return {"success": False, "message": f"Could not delete: {str(e)}"}
# ===========================
# Submissions Management (via Supabase)
# ===========================
def create_submission_supabase(submission_id: str, usernames: List[str], ge_input: str) -> Dict[str, Any]:
"""Create a new submission in Supabase."""
try:
client = get_supabase_client()
# Get max queue_position from pending submissions
pending = client.table("submissions").select("queue_position").eq(
"status", "pending").order("queue_position", desc=True).limit(1).execute()
next_position = 1
if pending.data and len(pending.data) > 0:
first_item = pending.data[0]
if isinstance(first_item, dict):
max_pos = first_item.get("queue_position")
if isinstance(max_pos, int):
next_position = max_pos + 1
submission_data = {
"submission_id": submission_id,
"status": "pending",
"input": {
"usernames": usernames,
"ge_input": ge_input
},
"results": [],
"queue_position": next_position,
"retry_count": 0
}
response = client.table("submissions").insert(
submission_data).execute()
if response.data:
return {"success": True, "submission": response.data[0]}
return {"success": False, "message": "Failed to create submission"}
except Exception as e:
logger.error(f"Error creating submission: {e}")
return {"success": False, "message": str(e)}
def get_submission_by_id(submission_id: str) -> Optional[Dict[str, Any]]:
"""Get a single submission by its ID."""
try:
client = get_supabase_client()
response = client.table("submissions").select(
"*").eq("id", submission_id).limit(1).execute()
if response.data and len(response.data) > 0:
result = response.data[0]
if isinstance(result, dict):
return result
return None
except Exception as e:
logger.error(f"Error fetching submission by id: {e}")
return None
def create_retry_submission(username: str, ge_id_and_lang: str) -> Optional[Dict[str, Any]]:
"""Create a new submission for retry (simpler version for retrying errors)."""
try:
import uuid
client = get_supabase_client()
# Get max queue_position from pending submissions
pending = client.table("submissions").select("queue_position").eq(
"status", "pending").order("queue_position", desc=True).limit(1).execute()
next_position = 1
if pending.data and len(pending.data) > 0:
first_item = pending.data[0]
if isinstance(first_item, dict):
max_pos = first_item.get("queue_position")
if isinstance(max_pos, int):
next_position = max_pos + 1
# Parse usernames
usernames = [u.strip() for u in username.split(',') if u.strip()]
submission_data = {
"submission_id": str(uuid.uuid4()),
"status": "pending",
"input": {
"usernames": usernames,
"ge_input": ge_id_and_lang
},
"results": [],
"queue_position": next_position,
"retry_count": 0
}
response = client.table("submissions").insert(
submission_data).execute()
if response.data and len(response.data) > 0:
result = response.data[0]
if isinstance(result, dict):
return result
return None
except Exception as e:
logger.error(f"Error creating retry submission: {e}")
return None
def get_submissions_supabase(limit: int = 50, status: Optional[str] = None) -> Any:
"""Fetch submissions from Supabase."""
try:
client = get_supabase_client()
query = client.table("submissions").select("*")
if status:
query = query.eq("status", status)
response = query.order("created_at", desc=True).limit(limit).execute()
return response.data
except Exception as e:
logger.error(f"Error fetching submissions: {e}")
return []
def get_pending_submissions_supabase() -> Any:
"""Get pending submissions from Supabase ordered by created_at (FIFO).
NOTE: Only fetch raw_download submissions. TMS permission submissions are handled by TypeScript backend.
"""
try:
client = get_supabase_client()
response = (
client.table("submissions")
.select("*")
.eq("status", "pending")
.eq("submission_type", "raw_download") # Only raw downloads
.order("created_at", desc=False) # FIFO order
.execute()
)
return response.data
except Exception as e:
logger.error(f"Error fetching pending submissions: {e}")
return []
def get_processing_submissions_supabase() -> Any:
"""Get all submissions currently in 'processing' status.
Used by worker to detect and reset stuck submissions.
NOTE: Only fetch raw_download submissions. TMS permission submissions are handled by TypeScript backend.
"""
try:
client = get_supabase_client()
response = (
client.table("submissions")
.select("*")
.eq("status", "processing")
.eq("submission_type", "raw_download") # Only raw downloads
.execute()
)
return response.data
except Exception as e:
logger.error(f"Error fetching processing submissions: {e}")
return []
def update_submission_supabase(submission_id: str, **kwargs) -> bool:
"""Update a submission in Supabase."""
try:
client = get_supabase_client()
response = client.table("submissions").update(
kwargs).eq("submission_id", submission_id).execute()
return len(response.data) > 0
except Exception as e:
logger.error(f"Error updating submission: {e}")
return False
def delete_submission_supabase(submission_id: str) -> bool:
"""Delete a submission from Supabase."""
try:
client = get_supabase_client()
response = client.table("submissions").delete().eq(
"submission_id", submission_id).execute()
# Supabase returns empty data on successful delete
return True
except Exception as e:
logger.error(f"Error deleting submission: {e}")
return False
# ===========================
# Raw Downloads History Management
# ===========================
def create_raw_download_history(
ge_id: str,
lang: str,
destination_path: str,
files: List[Dict[str, Any]],
status: str = "success",
total_files: int = 0,
successful_downloads: int = 0,
mongodb_path: Optional[str] = None,
mode: str = "api"
) -> Optional[Dict[str, Any]]:
"""Create a new raw download history entry in Supabase."""
try:
client = get_supabase_client()
record = {
"ge_id": ge_id,
"lang": lang.upper(),
"destination_path": destination_path,
"files": files,
"status": status,
"total_files": total_files,
"successful_downloads": successful_downloads,
"mongodb_path": mongodb_path,
"mode": mode
}
response = client.table(
"raw_download_history").insert(record).execute()
if response.data and len(response.data) > 0:
first_item = response.data[0]
if isinstance(first_item, dict):
logger.debug(f"Created raw download history: {ge_id}_{lang}")
return first_item
return None
except Exception as e:
logger.error(
f"Error creating raw download history: {e}", exc_info=True)
return None
def get_raw_download_history(limit: int = 50) -> List[Dict[str, Any]]:
"""Fetch raw download history from Supabase, newest first."""
try:
client = get_supabase_client()
response = client.table("raw_download_history").select(
"*").order("created_at", desc=True).limit(limit).execute()
if response.data and isinstance(response.data, list):
return [item for item in response.data if isinstance(item, dict)]
return []
except Exception as e:
logger.error(f"Error fetching raw download history: {e}")
return []
def delete_raw_download_history(download_id: str) -> bool:
"""Delete a raw download history entry by ID."""
try:
client = get_supabase_client()
response = client.table("raw_download_history").delete().eq(
"id", download_id).execute()
return True
except Exception as e:
logger.error(f"Error deleting raw download history: {e}")
return False
# ===========================
# OLD QUEUE FUNCTIONS REMOVED
# ===========================
# All raw_download_queue related functions have been removed.
# Use downloads_service.py instead for file download management.