VPN/access/auth.py

160 lines
4.9 KiB
Python
Raw Permalink Normal View History

2025-09-27 16:06:32 +00:00
#!/usr/bin/env python3
"""
Authentication script for VPN Access Server.
This script is called by OpenVPN via the client-connect directive.
It validates user certificates (common_name) and MAC addresses against the MySQL database.
2025-09-27 16:06:32 +00:00
Exit codes:
- 0: Authentication successful
- 1: Authentication failed
- 2: Configuration error
"""
import sys
import os
# Add the access module to the Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import config
from db import db
from utils import setup_logging, get_env_vars, get_client_info, safe_exit
def authenticate_user(common_name: str) -> tuple[bool, dict]:
2025-09-27 16:06:32 +00:00
"""
Authenticate user by certificate common name against the database.
2025-09-27 16:06:32 +00:00
Args:
common_name: Certificate common name (username)
2025-09-27 16:06:32 +00:00
Returns:
Tuple of (success, user_info)
"""
try:
user = db.get_user_by_username(common_name)
2025-09-27 16:06:32 +00:00
if not user:
return False, {}
# Check if user is active
if not user.get('is_active', False):
return False, {}
2025-09-27 16:06:32 +00:00
return True, user
except Exception as e:
logger.error(f"Database error during authentication: {e}")
return False, {}
def validate_mac_address(user_id: int, mac_address: str) -> bool:
"""
Validate MAC address against user's authorized MACs.
Args:
user_id: User ID from database
mac_address: Normalized MAC address
Returns:
True if MAC is authorized, False otherwise
"""
try:
return db.is_mac_authorized(user_id, mac_address)
except Exception as e:
logger.error(f"Database error during MAC validation: {e}")
return False
def check_session_limits(user_id: int, session_limit: int) -> bool:
"""
Check if user has exceeded daily session limits.
Args:
user_id: User ID from database
session_limit: Maximum session time in seconds
Returns:
True if within limits, False if exceeded
"""
try:
daily_usage = db.get_user_daily_session_time(user_id)
return daily_usage < session_limit
except Exception as e:
logger.error(f"Database error during session limit check: {e}")
return False
def main():
"""Main authentication function."""
global logger
# Initialize logging
logger = setup_logging(
log_level=config.server.log_level,
log_file=config.server.log_file
)
try:
# Validate configuration
if not config.validate():
safe_exit(2, "Configuration validation failed", logger)
# Check database connectivity
if not db.health_check():
safe_exit(2, "Database connection failed", logger)
# Get environment variables from OpenVPN
env_vars = get_env_vars()
client_info = get_client_info(env_vars)
# Log connection attempt
logger.info(f"Authentication attempt for user: {client_info['common_name']} "
2025-09-27 16:06:32 +00:00
f"from IP: {client_info['untrusted_ip']} "
f"with MAC: {client_info['mac_address']}")
# Validate client information
if not client_info['is_valid']:
missing_fields = []
if not client_info['common_name']:
missing_fields.append('common_name')
2025-09-27 16:06:32 +00:00
if not client_info['mac_address']:
missing_fields.append('MAC address')
safe_exit(1, f"Missing required fields: {', '.join(missing_fields)}", logger)
# Authenticate user by certificate common name
auth_success, user_info = authenticate_user(client_info['common_name'])
2025-09-27 16:06:32 +00:00
if not auth_success:
safe_exit(1, f"Authentication failed for user: {client_info['common_name']}", logger)
2025-09-27 16:06:32 +00:00
user_id = user_info['id']
session_limit = user_info.get('session_limit', config.server.default_session_limit)
# Validate MAC address
if not validate_mac_address(user_id, client_info['mac_address']):
safe_exit(1, f"MAC address not authorized: {client_info['mac_address']} "
f"for user: {client_info['common_name']}", logger)
2025-09-27 16:06:32 +00:00
# Check session limits
if not check_session_limits(user_id, session_limit):
daily_usage = db.get_user_daily_session_time(user_id)
safe_exit(1, f"Daily session limit exceeded for user: {client_info['common_name']} "
2025-09-27 16:06:32 +00:00
f"(used: {daily_usage}s, limit: {session_limit}s)", logger)
# Authentication successful
logger.info(f"Authentication successful for user: {client_info['common_name']} "
2025-09-27 16:06:32 +00:00
f"with MAC: {client_info['mac_address']}")
safe_exit(0, "Authentication successful", logger)
except KeyboardInterrupt:
safe_exit(1, "Authentication interrupted", logger)
except Exception as e:
logger.error(f"Unexpected error during authentication: {e}")
safe_exit(2, f"Internal error: {e}", logger)
if __name__ == "__main__":
main()