#!/usr/bin/env python3 """ Authentication script for VPN Access Server. This script is called by OpenVPN via the client-connect directive. It validates user certificates (common_name) and MAC addresses against the MySQL database. Exit codes: - 0: Authentication successful - 1: Authentication failed - 2: Configuration error """ import sys import os # Add the access module to the Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import config from db import db from utils import setup_logging, get_env_vars, get_client_info, safe_exit def authenticate_user(common_name: str) -> tuple[bool, dict]: """ Authenticate user by certificate common name against the database. Args: common_name: Certificate common name (username) Returns: Tuple of (success, user_info) """ try: user = db.get_user_by_username(common_name) if not user: return False, {} # Check if user is active if not user.get('is_active', False): return False, {} return True, user except Exception as e: logger.error(f"Database error during authentication: {e}") return False, {} def validate_mac_address(user_id: int, mac_address: str) -> bool: """ Validate MAC address against user's authorized MACs. Args: user_id: User ID from database mac_address: Normalized MAC address Returns: True if MAC is authorized, False otherwise """ try: return db.is_mac_authorized(user_id, mac_address) except Exception as e: logger.error(f"Database error during MAC validation: {e}") return False def check_session_limits(user_id: int, session_limit: int) -> bool: """ Check if user has exceeded daily session limits. Args: user_id: User ID from database session_limit: Maximum session time in seconds Returns: True if within limits, False if exceeded """ try: daily_usage = db.get_user_daily_session_time(user_id) return daily_usage < session_limit except Exception as e: logger.error(f"Database error during session limit check: {e}") return False def main(): """Main authentication function.""" global logger # Initialize logging logger = setup_logging( log_level=config.server.log_level, log_file=config.server.log_file ) try: # Validate configuration if not config.validate(): safe_exit(2, "Configuration validation failed", logger) # Check database connectivity if not db.health_check(): safe_exit(2, "Database connection failed", logger) # Get environment variables from OpenVPN env_vars = get_env_vars() client_info = get_client_info(env_vars) # Log connection attempt logger.info(f"Authentication attempt for user: {client_info['common_name']} " f"from IP: {client_info['untrusted_ip']} " f"with MAC: {client_info['mac_address']}") # Validate client information if not client_info['is_valid']: missing_fields = [] if not client_info['common_name']: missing_fields.append('common_name') if not client_info['mac_address']: missing_fields.append('MAC address') safe_exit(1, f"Missing required fields: {', '.join(missing_fields)}", logger) # Authenticate user by certificate common name auth_success, user_info = authenticate_user(client_info['common_name']) if not auth_success: safe_exit(1, f"Authentication failed for user: {client_info['common_name']}", logger) user_id = user_info['id'] session_limit = user_info.get('session_limit', config.server.default_session_limit) # Validate MAC address if not validate_mac_address(user_id, client_info['mac_address']): safe_exit(1, f"MAC address not authorized: {client_info['mac_address']} " f"for user: {client_info['common_name']}", logger) # Check session limits if not check_session_limits(user_id, session_limit): daily_usage = db.get_user_daily_session_time(user_id) safe_exit(1, f"Daily session limit exceeded for user: {client_info['common_name']} " f"(used: {daily_usage}s, limit: {session_limit}s)", logger) # Authentication successful logger.info(f"Authentication successful for user: {client_info['common_name']} " f"with MAC: {client_info['mac_address']}") safe_exit(0, "Authentication successful", logger) except KeyboardInterrupt: safe_exit(1, "Authentication interrupted", logger) except Exception as e: logger.error(f"Unexpected error during authentication: {e}") safe_exit(2, f"Internal error: {e}", logger) if __name__ == "__main__": main()