VPN/util/client/generate_client.py

287 lines
12 KiB
Python

"""
Utility for generating OpenVPN client configuration files.
"""
import os
import subprocess
import logging
from typing import Tuple
# Set up logging
logger = logging.getLogger(__name__)
def revoke_client_certificate(username: str) -> Tuple[bool, str]:
"""
Revoke an existing client certificate.
Args:
username: The username whose certificate to revoke
Returns:
A tuple containing a boolean indicating success and a message.
"""
logger.info(f"Starting certificate revocation for user: {username}")
easyrsa_dir = "/home/arthur/openvpn-ca/"
if not os.path.exists(easyrsa_dir):
logger.error(f"EasyRSA directory does not exist: {easyrsa_dir}")
return False, f"Error: EasyRSA directory '{easyrsa_dir}' does not exist."
try:
# First revoke the certificate
revoke_command = ["./easyrsa", "--batch", "revoke", username]
logger.info(f"Executing revoke command: {' '.join(revoke_command)}")
revoke_process = subprocess.run(
revoke_command,
cwd=easyrsa_dir,
capture_output=True,
text=True,
check=True
)
logger.info("Certificate revoked successfully")
# Then generate new CRL (Certificate Revocation List)
crl_command = ["./easyrsa", "--batch", "gen-crl"]
logger.info(f"Executing CRL generation command: {' '.join(crl_command)}")
crl_process = subprocess.run(
crl_command,
cwd=easyrsa_dir,
capture_output=True,
text=True,
check=True
)
logger.info("CRL generated successfully")
return True, f"Successfully revoked certificate for user: {username}"
except subprocess.CalledProcessError as e:
logger.error(f"Revocation failed for user {username}: {e.stderr}")
return False, f"Failed to revoke certificate for user {username}: {e.stderr}"
except Exception as e:
logger.error(f"Unexpected error during revocation: {e}")
return False, f"Unexpected error revoking certificate: {e}"
def generate_client_config(username: str, email: str) -> Tuple[bool, str]:
"""
Generates a .ovpn file for a given user.
This function automatically handles certificate conflicts by:
1. Reusing existing certificates if they exist
2. Automatically resolving request file conflicts
3. Providing clear error messages only when automatic resolution fails
Args:
username: The username for which to generate the config.
email: The email for which to generate the config and used as password
Returns:
A tuple containing a boolean indicating success and a message.
"""
logger.info(f"Starting client configuration generation for user: {username}, email: {email}")
easyrsa_dir = "/home/arthur/openvpn-ca/"
ca_path = "/home/arthur/openvpn-ca/pki/ca.crt"
ta_path = "/home/arthur/openvpn-ca/ta.key"
client_crt_path = f"/home/arthur/openvpn-ca/pki/issued/{username}.crt"
client_key_path = f"/home/arthur/openvpn-ca/pki/private/{username}.key"
output_path = f"generated-clients/{username}.ovpn"
logger.info(f"EasyRSA directory: {easyrsa_dir}")
logger.info(f"CA certificate path: {ca_path}")
logger.info(f"TA key path: {ta_path}")
logger.info(f"Client certificate path: {client_crt_path}")
logger.info(f"Client key path: {client_key_path}")
logger.info(f"Output path: {output_path}")
# config password into env
password = email
env = os.environ.copy()
env["EASYRSA_PASSOUT"] = f"pass:{password}"
logger.info(f"Set EASYRSA_PASSOUT environment variable (length: {len(password)})")
# Step 1: Generate the client certificate
logger.info("Step 1: Starting client certificate generation")
# Check if certificate already exists
if os.path.isfile(client_crt_path):
logger.info(f"Certificate already exists for user {username} at {client_crt_path}")
logger.info("Skipping certificate generation - will reuse existing certificate")
else:
logger.info(f"Certificate does not exist for user {username}, generating new one")
try:
command = ["./easyrsa", "--batch", "build-client-full", username]
logger.info(f"Executing command: {' '.join(command)} in directory: {easyrsa_dir}")
process = subprocess.run(
command,
cwd=easyrsa_dir,
env=env,
check=True,
capture_output=True,
text=True
)
logger.info("EasyRSA command completed successfully")
if process.stdout:
logger.info(f"EasyRSA stdout: {process.stdout.strip()}")
if process.stderr:
logger.warning(f"EasyRSA stderr: {process.stderr.strip()}")
except FileNotFoundError as e:
logger.error(f"FileNotFoundError: EasyRSA script not found in {easyrsa_dir}")
logger.error(f"Error details: {e}")
return False, f"Error: 'easyrsa' script not found in {easyrsa_dir}. Please check the path."
except subprocess.CalledProcessError as e:
logger.error(f"CalledProcessError: EasyRSA command failed with return code {e.returncode}")
logger.error(f"EasyRSA stdout: {e.stdout}")
logger.error(f"EasyRSA stderr: {e.stderr}")
# Check if it's a conflict error (file already exists)
if e.returncode == 1 and "already exists" in e.stderr:
logger.warning(f"Certificate request already exists for user {username} - attempting automatic resolution")
# Try to remove the conflicting request file and retry
req_file_path = f"{easyrsa_dir}/pki/reqs/{username}.req"
if os.path.exists(req_file_path):
logger.info(f"Removing conflicting request file: {req_file_path}")
try:
os.remove(req_file_path)
logger.info("Conflicting request file removed, retrying certificate generation")
# Retry the certificate generation
retry_process = subprocess.run(
["./easyrsa", "--batch", "build-client-full", username],
cwd=easyrsa_dir,
env=env,
check=True,
capture_output=True,
text=True
)
logger.info("Certificate generation succeeded after conflict resolution")
if retry_process.stdout:
logger.info(f"Retry stdout: {retry_process.stdout.strip()}")
if retry_process.stderr:
logger.warning(f"Retry stderr: {retry_process.stderr.strip()}")
except (OSError, subprocess.CalledProcessError) as retry_error:
logger.error(f"Failed to resolve conflict automatically: {retry_error}")
return False, f"Certificate conflict detected for user: {username}. Automatic resolution failed. Please manually revoke the certificate: cd {easyrsa_dir} && ./easyrsa revoke {username}"
else:
logger.warning(f"Request file not found at expected location: {req_file_path}")
return False, f"Certificate conflict detected for user: {username}. Request file not found at expected location. Please manually revoke: cd {easyrsa_dir} && ./easyrsa revoke {username}"
else:
return False, f"Error generating certificate for user: {username}. Return code: {e.returncode}. Stdout: {e.stdout}. Stderr: {e.stderr}"
# Step 2: Verify that all required files exist
logger.info("Step 2: Verifying required certificate files exist")
required_files = [
("CA Certificate", ca_path),
("TA Key", ta_path),
("Client Certificate", client_crt_path),
("Client Key", client_key_path)
]
for file_desc, file_path in required_files:
if os.path.isfile(file_path):
logger.info(f"✓ Found {file_desc}: {file_path}")
else:
logger.error(f"✗ Missing {file_desc}: {file_path}")
return False, f"Error: Cannot read file '{file_path}' ({file_desc}). File not found after generation."
# Step 3: Read the content of the files
logger.info("Step 3: Reading certificate file contents")
try:
logger.info(f"Reading CA certificate from: {ca_path}")
with open(ca_path, 'r') as f:
ca_content = f.read()
logger.info(f"CA certificate loaded (length: {len(ca_content)} chars)")
logger.info(f"Reading client certificate from: {client_crt_path}")
with open(client_crt_path, 'r') as f:
client_crt_content = f.read()
logger.info(f"Client certificate loaded (length: {len(client_crt_content)} chars)")
logger.info(f"Reading client key from: {client_key_path}")
with open(client_key_path, 'r') as f:
client_key_content = f.read()
logger.info(f"Client key loaded (length: {len(client_key_content)} chars)")
logger.info(f"Reading TA key from: {ta_path}")
with open(ta_path, 'r') as f:
ta_content = f.read()
logger.info(f"TA key loaded (length: {len(ta_content)} chars)")
except IOError as e:
logger.error(f"IOError while reading certificate files: {e}")
return False, f"Error reading files: {e}"
# Step 4: Assemble the .ovpn configuration
logger.info("Step 4: Assembling OVPN configuration")
ovpn_config = f"""
client
dev tun
proto udp
remote 14.241.240.102 1194 # use FTP IP address
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
# push mac address info
push-peer-info
verb 3
<ca>
{ca_content}
</ca>
<cert>
{client_crt_content}
</cert>
<key>
{client_key_content}
</key>
<tls-auth>
{ta_content}
</tls-auth>
key-direction 1
"""
logger.info(f"OVPN configuration assembled (length: {len(ovpn_config)} chars)")
# Step 5: Write the configuration to the output file
logger.info("Step 5: Writing OVPN configuration to file")
try:
output_dir = os.path.dirname(output_path)
logger.info(f"Checking output directory: {output_dir}")
if not os.path.isdir(output_dir) or not os.access(output_dir, os.W_OK):
logger.warning(f"Output directory {output_dir} is not writable, using local directory")
local_output_dir = "generated-clients"
if not os.path.exists(local_output_dir):
logger.info(f"Creating local output directory: {local_output_dir}")
os.makedirs(local_output_dir)
local_output_path = os.path.join(local_output_dir, f"{username}.ovpn")
logger.info(f"Writing to local path: {local_output_path}")
with open(local_output_path, 'w') as f:
f.write(ovpn_config)
logger.info(f"Successfully wrote {len(ovpn_config)} chars to {local_output_path}")
return True, f"Successfully generated client config. Could not write to server path. Saved config locally to: {local_output_path}"
logger.info(f"Writing to server path: {output_path}")
with open(output_path, 'w') as f:
f.write(ovpn_config)
logger.info(f"Successfully wrote {len(ovpn_config)} chars to {output_path}")
return True, f"Successfully generated client config: {output_path}"
except IOError as e:
logger.error(f"IOError while writing OVPN file: {e}")
return False, f"Error writing to file: {e}"