certificate authentication, remove username/password

This commit is contained in:
arthur 2025-09-28 22:47:53 +07:00
parent ebb6fde04d
commit 9765095bab
7 changed files with 65 additions and 58 deletions

View File

@ -30,16 +30,17 @@ The goal of this project is to extend an OpenVPN-based VPN access server with th
## 3. Database Schema
### `employees` table
| Column | Type | Notes |
|-----------------|--------------|----------------------------------------------------|
| id | INT (PK) | Primary key |
| username | VARCHAR(255) | Unique, not null, indexed |
| employee_name | VARCHAR(255) | Employee/VPN name (not null) |
| employee_email | VARCHAR(255) | Unique email, indexed |
| is_active | BOOLEAN | Default TRUE, indexed |
| session_limit | INT | Max allowed session (secs) |
| created_at | DATETIME | Default CURRENT_TIMESTAMP |
| updated_at | DATETIME | Auto-updated on change |
| Column | Type | Notes |
|----------------|--------------|----------------------------------------------------|
| id | INT (PK) | Primary key |
| username | VARCHAR(255) | Unique, not null, indexed |
| password | VARCHAR(255) | not null |
| employee_name | VARCHAR(255) | Employee/VPN name (not null) |
| employee_email | VARCHAR(255) | Unique email, indexed |
| is_active | BOOLEAN | Default TRUE, indexed |
| session_limit | INT | Max allowed session (secs) |
| created_at | DATETIME | Default CURRENT_TIMESTAMP |
| updated_at | DATETIME | Auto-updated on change |
**Constraints**: `UNIQUE (username)`, `UNIQUE (employee_email)`
**Indexes**: `idx_username`, `idx_email`, `idx_active`

View File

@ -2,8 +2,8 @@
"""
Authentication script for VPN Access Server.
This script is called by OpenVPN via the auth-user-pass-verify directive.
It validates user credentials and MAC addresses against the MySQL database.
This script is called by OpenVPN via the client-connect directive.
It validates user certificates (common_name) and MAC addresses against the MySQL database.
Exit codes:
- 0: Authentication successful
@ -22,32 +22,24 @@ from db import db
from utils import setup_logging, get_env_vars, get_client_info, safe_exit
def authenticate_user(username: str, password: str) -> tuple[bool, dict]:
def authenticate_user(common_name: str) -> tuple[bool, dict]:
"""
Authenticate user credentials against the database.
Authenticate user by certificate common name against the database.
Args:
username: VPN username
password: VPN password
common_name: Certificate common name (username)
Returns:
Tuple of (success, user_info)
"""
try:
user = db.get_user_by_username(username)
user = db.get_user_by_username(common_name)
if not user:
return False, {}
# For now, we'll do simple password comparison
# In production, you should hash passwords and compare hashes
# This is a simplified implementation for demonstration
if user.get('password_hash'): # If password hashing is implemented
# You would implement proper password verification here
pass
else:
# Simple comparison for demonstration (NOT secure for production)
# In production, store and compare password hashes
pass
# Check if user is active
if not user.get('is_active', False):
return False, {}
return True, user
@ -117,30 +109,25 @@ def main():
client_info = get_client_info(env_vars)
# Log connection attempt
logger.info(f"Authentication attempt for user: {client_info['username']} "
logger.info(f"Authentication attempt for user: {client_info['common_name']} "
f"from IP: {client_info['untrusted_ip']} "
f"with MAC: {client_info['mac_address']}")
# Validate client information
if not client_info['is_valid']:
missing_fields = []
if not client_info['username']:
missing_fields.append('username')
if not client_info['password']:
missing_fields.append('password')
if not client_info['common_name']:
missing_fields.append('common_name')
if not client_info['mac_address']:
missing_fields.append('MAC address')
safe_exit(1, f"Missing required fields: {', '.join(missing_fields)}", logger)
# Authenticate user
auth_success, user_info = authenticate_user(
client_info['username'],
client_info['password']
)
# Authenticate user by certificate common name
auth_success, user_info = authenticate_user(client_info['common_name'])
if not auth_success:
safe_exit(1, f"Authentication failed for user: {client_info['username']}", logger)
safe_exit(1, f"Authentication failed for user: {client_info['common_name']}", logger)
user_id = user_info['id']
session_limit = user_info.get('session_limit', config.server.default_session_limit)
@ -148,16 +135,16 @@ def main():
# Validate MAC address
if not validate_mac_address(user_id, client_info['mac_address']):
safe_exit(1, f"MAC address not authorized: {client_info['mac_address']} "
f"for user: {client_info['username']}", logger)
f"for user: {client_info['common_name']}", logger)
# Check session limits
if not check_session_limits(user_id, session_limit):
daily_usage = db.get_user_daily_session_time(user_id)
safe_exit(1, f"Daily session limit exceeded for user: {client_info['username']} "
safe_exit(1, f"Daily session limit exceeded for user: {client_info['common_name']} "
f"(used: {daily_usage}s, limit: {session_limit}s)", logger)
# Authentication successful
logger.info(f"Authentication successful for user: {client_info['username']} "
logger.info(f"Authentication successful for user: {client_info['common_name']} "
f"with MAC: {client_info['mac_address']}")
safe_exit(0, "Authentication successful", logger)

View File

@ -9,7 +9,7 @@ import logging
import mysql.connector
from mysql.connector import pooling, Error
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from contextlib import contextmanager
from config import config
@ -114,6 +114,15 @@ class DatabaseManager:
"""
return self.execute_query(query, (user_id,), 'one')
def get_user_password(self, username: str) -> Optional[Dict[str, Any]]:
"""Get user details by username and password."""
query = """
SELECT PASSWORD FROM VPN.EMPLOYEES
WHERE VPN.EMPLOYEES.USERNAME = %s
AND VPN.EMPLOYEES.IS_ACTIVE = TRUE
"""
return self.execute_query(query, (username,), 'one')
# MAC address operations
def get_user_mac_addresses(self, user_id: int) -> List[str]:
"""Get all MAC addresses for a user."""
@ -141,7 +150,7 @@ class DatabaseManager:
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE updated_at = VALUES(updated_at)
"""
now = datetime.utcnow()
now = datetime.now(timezone(timedelta(hours=7)))
try:
self.execute_query(query, (user_id, mac_address, now, now))
return True
@ -156,7 +165,7 @@ class DatabaseManager:
INSERT INTO sessions (employee_id, start_time, created_at, updated_at)
VALUES (%s, %s, %s, %s)
"""
now = datetime.utcnow()
now = datetime.now(timezone(timedelta(hours=7)))
try:
self.execute_query(query, (user_id, now, now, now))
# Get the last inserted ID
@ -178,7 +187,7 @@ class DatabaseManager:
updated_at = %s
WHERE id = %s AND end_time IS NULL
"""
now = datetime.utcnow()
now = datetime.now(timezone(timedelta(hours=7)))
try:
rows_affected = self.execute_query(query, (now, now, now, session_id))
return rows_affected > 0
@ -189,7 +198,7 @@ class DatabaseManager:
def get_user_daily_session_time(self, user_id: int, date: datetime = None) -> int:
"""Get total session time for a user on a specific date (in seconds)."""
if date is None:
date = datetime.utcnow().date()
now = datetime.now(timezone(timedelta(hours=7))).date()
start_of_day = datetime.combine(date, datetime.min.time())
end_of_day = start_of_day + timedelta(days=1)

View File

@ -11,7 +11,7 @@ import sys
import logging
import hashlib
from typing import Optional, Dict, Any, Tuple
from datetime import datetime
from datetime import datetime, timezone
def setup_logging(log_level: str = 'INFO', log_file: Optional[str] = None) -> logging.Logger:
@ -107,10 +107,10 @@ def get_env_vars() -> Dict[str, str]:
# Common OpenVPN environment variables
openvpn_vars = [
'username', 'password', 'common_name', 'trusted_ip', 'trusted_port',
'common_name', 'trusted_ip', 'trusted_port',
'untrusted_ip', 'untrusted_port', 'ifconfig_pool_remote_ip',
'script_type', 'time_ascii', 'time_unix', 'time_duration',
'CLIENT_MAC' # Custom variable set by client
'IV_HWADDR', 'CLIENT_MAC' # MAC address variables
]
for var in openvpn_vars:
@ -220,24 +220,25 @@ def get_client_info(env_vars: Dict[str, str]) -> Dict[str, Any]:
Dictionary with validated client information
"""
client_info = {
'username': env_vars.get('username', '').strip(),
'password': env_vars.get('password', '').strip(),
'common_name': env_vars.get('common_name', '').strip(),
'trusted_ip': env_vars.get('trusted_ip', '').strip(),
'untrusted_ip': env_vars.get('untrusted_ip', '').strip(),
'mac_address': None,
'timestamp': datetime.utcnow()
'timestamp': datetime.now(timezone.utc)
}
# Process MAC address
raw_mac = env_vars.get('CLIENT_MAC', '').strip()
# Process MAC address from IV_HWADDR (requires push-peer-info in client config)
raw_mac = env_vars.get('IV_HWADDR', '').strip()
if not raw_mac:
# Fallback to CLIENT_MAC if IV_HWADDR not available
raw_mac = env_vars.get('CLIENT_MAC', '').strip()
if raw_mac:
client_info['mac_address'] = normalize_mac_address(raw_mac)
# Validate required fields
# Validate required fields for certificate-based authentication
client_info['is_valid'] = bool(
client_info['username'] and
client_info['password'] and
client_info['common_name'] and
client_info['mac_address']
)

View File

@ -136,19 +136,28 @@ journalctl -u openvpn@server -f
## Step 8: Prepare Client Configuration
Information about the required file:
- ca.crt — The certificate authority file, used to verify the server certificate.
- [client_name].crt — The client certificate created (`/home/arthur/openvpn-ca/pki/issued/[client_name].crt`).
- [client_name].key — The client private key file, which should be located at `/home/arthur/openvpn-ca/pki/private/[client_name].key`
- ta.key — The TLS authentication key generated during server setup.
Example client config (`client.ovpn`):
```conf
client
dev tun
proto udp
remote <SERVER_PUBLIC_IP> 1194
remote 113.22.221.198 1194 # use FTP IP address
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
# push mac address info
push-peer-info
verb 3
<ca>