VPN/scripts/init_db.py

211 lines
6.9 KiB
Python
Raw Normal View History

2025-09-27 16:06:32 +00:00
#!/usr/bin/env python3
"""
Database initialization script for VPN Access Server.
Creates the required database schema with tables, indexes, and constraints
as defined in the project requirements.
"""
import sys
import os
# Add the access module to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'access'))
import mysql.connector
from mysql.connector import Error
from config import config
from utils import setup_logging
def create_database_schema():
"""Create the complete database schema."""
# SQL statements for creating tables
sql_statements = [
# Create employees table
"""
CREATE TABLE IF NOT EXISTS employees (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(255) NOT NULL UNIQUE,
employee_name VARCHAR(255) NOT NULL,
employee_email VARCHAR(255) NOT NULL UNIQUE,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
session_limit INT NOT NULL DEFAULT 28800,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (employee_email),
INDEX idx_active (is_active)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
# Create mac_addresses table
"""
CREATE TABLE IF NOT EXISTS mac_addresses (
id INT PRIMARY KEY AUTO_INCREMENT,
employee_id INT NOT NULL,
mac VARCHAR(17) NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE,
UNIQUE KEY unique_employee_mac (employee_id, mac),
INDEX idx_mac (mac)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""",
# Create sessions table
"""
CREATE TABLE IF NOT EXISTS sessions (
id INT PRIMARY KEY AUTO_INCREMENT,
employee_id INT NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NULL,
duration INT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE,
INDEX idx_employee_id (employee_id),
INDEX idx_start_time (start_time),
INDEX idx_end_time (end_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""
]
try:
# Connect to MySQL server (without specifying database)
connection = mysql.connector.connect(
host=config.database.host,
port=config.database.port,
user=config.database.username,
password=config.database.password,
charset=config.database.charset
)
cursor = connection.cursor()
# Create database if it doesn't exist
logger.info(f"Creating database '{config.database.database}' if it doesn't exist...")
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {config.database.database} "
f"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
# Use the database
cursor.execute(f"USE {config.database.database}")
# Execute each SQL statement
for i, sql in enumerate(sql_statements, 1):
logger.info(f"Executing schema statement {i}/{len(sql_statements)}...")
cursor.execute(sql)
logger.info(f"Schema statement {i} executed successfully")
connection.commit()
logger.info("Database schema created successfully!")
# Verify tables were created
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
logger.info(f"Created tables: {[table[0] for table in tables]}")
return True
except Error as e:
logger.error(f"Database error: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
finally:
if 'connection' in locals() and connection.is_connected():
cursor.close()
connection.close()
logger.info("Database connection closed")
def verify_schema():
"""Verify that the schema was created correctly."""
try:
connection = mysql.connector.connect(
host=config.database.host,
port=config.database.port,
database=config.database.database,
user=config.database.username,
password=config.database.password,
charset=config.database.charset
)
cursor = connection.cursor()
# Check tables exist
expected_tables = ['employees', 'mac_addresses', 'sessions']
cursor.execute("SHOW TABLES")
existing_tables = [table[0] for table in cursor.fetchall()]
missing_tables = set(expected_tables) - set(existing_tables)
if missing_tables:
logger.error(f"Missing tables: {missing_tables}")
return False
# Check foreign key constraints
cursor.execute("""
SELECT TABLE_NAME, CONSTRAINT_NAME, REFERENCED_TABLE_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE REFERENCED_TABLE_SCHEMA = %s
AND REFERENCED_TABLE_NAME IS NOT NULL
""", (config.database.database,))
foreign_keys = cursor.fetchall()
logger.info(f"Foreign key constraints: {len(foreign_keys)}")
# Check indexes
for table in expected_tables:
cursor.execute(f"SHOW INDEX FROM {table}")
indexes = cursor.fetchall()
logger.info(f"Table '{table}' has {len(indexes)} indexes")
logger.info("Schema verification completed successfully")
return True
except Error as e:
logger.error(f"Schema verification failed: {e}")
return False
finally:
if 'connection' in locals() and connection.is_connected():
cursor.close()
connection.close()
def main():
"""Main function to initialize the database."""
global logger
# Setup logging
logger = setup_logging(log_level='INFO')
logger.info("Starting database initialization...")
# Validate configuration
if not config.validate():
logger.error("Configuration validation failed")
sys.exit(1)
# Create schema
if not create_database_schema():
logger.error("Failed to create database schema")
sys.exit(1)
# Verify schema
if not verify_schema():
logger.error("Schema verification failed")
sys.exit(1)
logger.info("Database initialization completed successfully!")
if __name__ == "__main__":
main()