""" Utility for generating OpenVPN client configuration files. """ import os import subprocess import logging from typing import Tuple # Set up logging logger = logging.getLogger(__name__) def revoke_client_certificate(username: str) -> Tuple[bool, str]: """ Revoke an existing client certificate. Args: username: The username whose certificate to revoke Returns: A tuple containing a boolean indicating success and a message. """ logger.info(f"Starting certificate revocation for user: {username}") easyrsa_dir = "/home/arthur/openvpn-ca/" if not os.path.exists(easyrsa_dir): logger.error(f"EasyRSA directory does not exist: {easyrsa_dir}") return False, f"Error: EasyRSA directory '{easyrsa_dir}' does not exist." try: # First revoke the certificate revoke_command = ["./easyrsa", "--batch", "revoke", username] logger.info(f"Executing revoke command: {' '.join(revoke_command)}") revoke_process = subprocess.run( revoke_command, cwd=easyrsa_dir, capture_output=True, text=True, check=True ) logger.info("Certificate revoked successfully") # Then generate new CRL (Certificate Revocation List) crl_command = ["./easyrsa", "--batch", "gen-crl"] logger.info(f"Executing CRL generation command: {' '.join(crl_command)}") crl_process = subprocess.run( crl_command, cwd=easyrsa_dir, capture_output=True, text=True, check=True ) logger.info("CRL generated successfully") return True, f"Successfully revoked certificate for user: {username}" except subprocess.CalledProcessError as e: logger.error(f"Revocation failed for user {username}: {e.stderr}") return False, f"Failed to revoke certificate for user {username}: {e.stderr}" except Exception as e: logger.error(f"Unexpected error during revocation: {e}") return False, f"Unexpected error revoking certificate: {e}" def generate_client_config(username: str, email: str) -> Tuple[bool, str]: """ Generates a .ovpn file for a given user. This function automatically handles certificate conflicts by: 1. Reusing existing certificates if they exist 2. Automatically resolving request file conflicts 3. Providing clear error messages only when automatic resolution fails Args: username: The username for which to generate the config. email: The email for which to generate the config and used as password Returns: A tuple containing a boolean indicating success and a message. """ logger.info(f"Starting client configuration generation for user: {username}, email: {email}") easyrsa_dir = "/home/arthur/openvpn-ca/" ca_path = "/home/arthur/openvpn-ca/pki/ca.crt" ta_path = "/home/arthur/openvpn-ca/ta.key" client_crt_path = f"/home/arthur/openvpn-ca/pki/issued/{username}.crt" client_key_path = f"/home/arthur/openvpn-ca/pki/private/{username}.key" output_path = f"generated-clients/{username}.ovpn" logger.info(f"EasyRSA directory: {easyrsa_dir}") logger.info(f"CA certificate path: {ca_path}") logger.info(f"TA key path: {ta_path}") logger.info(f"Client certificate path: {client_crt_path}") logger.info(f"Client key path: {client_key_path}") logger.info(f"Output path: {output_path}") # config password into env password = email env = os.environ.copy() env["EASYRSA_PASSOUT"] = f"pass:{password}" logger.info(f"Set EASYRSA_PASSOUT environment variable (length: {len(password)})") # Step 1: Generate the client certificate logger.info("Step 1: Starting client certificate generation") # Check if certificate already exists if os.path.isfile(client_crt_path): logger.info(f"Certificate already exists for user {username} at {client_crt_path}") logger.info("Skipping certificate generation - will reuse existing certificate") else: logger.info(f"Certificate does not exist for user {username}, generating new one") try: command = ["./easyrsa", "--batch", "build-client-full", username] logger.info(f"Executing command: {' '.join(command)} in directory: {easyrsa_dir}") process = subprocess.run( command, cwd=easyrsa_dir, env=env, check=True, capture_output=True, text=True ) logger.info("EasyRSA command completed successfully") if process.stdout: logger.info(f"EasyRSA stdout: {process.stdout.strip()}") if process.stderr: logger.warning(f"EasyRSA stderr: {process.stderr.strip()}") except FileNotFoundError as e: logger.error(f"FileNotFoundError: EasyRSA script not found in {easyrsa_dir}") logger.error(f"Error details: {e}") return False, f"Error: 'easyrsa' script not found in {easyrsa_dir}. Please check the path." except subprocess.CalledProcessError as e: logger.error(f"CalledProcessError: EasyRSA command failed with return code {e.returncode}") logger.error(f"EasyRSA stdout: {e.stdout}") logger.error(f"EasyRSA stderr: {e.stderr}") # Check if it's a conflict error (file already exists) if e.returncode == 1 and "already exists" in e.stderr: logger.warning(f"Certificate request already exists for user {username} - attempting automatic resolution") # Try to remove the conflicting request file and retry req_file_path = f"{easyrsa_dir}/pki/reqs/{username}.req" if os.path.exists(req_file_path): logger.info(f"Removing conflicting request file: {req_file_path}") try: os.remove(req_file_path) logger.info("Conflicting request file removed, retrying certificate generation") # Retry the certificate generation retry_process = subprocess.run( ["./easyrsa", "--batch", "build-client-full", username], cwd=easyrsa_dir, env=env, check=True, capture_output=True, text=True ) logger.info("Certificate generation succeeded after conflict resolution") if retry_process.stdout: logger.info(f"Retry stdout: {retry_process.stdout.strip()}") if retry_process.stderr: logger.warning(f"Retry stderr: {retry_process.stderr.strip()}") except (OSError, subprocess.CalledProcessError) as retry_error: logger.error(f"Failed to resolve conflict automatically: {retry_error}") return False, f"Certificate conflict detected for user: {username}. Automatic resolution failed. Please manually revoke the certificate: cd {easyrsa_dir} && ./easyrsa revoke {username}" else: logger.warning(f"Request file not found at expected location: {req_file_path}") return False, f"Certificate conflict detected for user: {username}. Request file not found at expected location. Please manually revoke: cd {easyrsa_dir} && ./easyrsa revoke {username}" else: return False, f"Error generating certificate for user: {username}. Return code: {e.returncode}. Stdout: {e.stdout}. Stderr: {e.stderr}" # Step 2: Verify that all required files exist logger.info("Step 2: Verifying required certificate files exist") required_files = [ ("CA Certificate", ca_path), ("TA Key", ta_path), ("Client Certificate", client_crt_path), ("Client Key", client_key_path) ] for file_desc, file_path in required_files: if os.path.isfile(file_path): logger.info(f"✓ Found {file_desc}: {file_path}") else: logger.error(f"✗ Missing {file_desc}: {file_path}") return False, f"Error: Cannot read file '{file_path}' ({file_desc}). File not found after generation." # Step 3: Read the content of the files logger.info("Step 3: Reading certificate file contents") try: logger.info(f"Reading CA certificate from: {ca_path}") with open(ca_path, 'r') as f: ca_content = f.read() logger.info(f"CA certificate loaded (length: {len(ca_content)} chars)") logger.info(f"Reading client certificate from: {client_crt_path}") with open(client_crt_path, 'r') as f: client_crt_content = f.read() logger.info(f"Client certificate loaded (length: {len(client_crt_content)} chars)") logger.info(f"Reading client key from: {client_key_path}") with open(client_key_path, 'r') as f: client_key_content = f.read() logger.info(f"Client key loaded (length: {len(client_key_content)} chars)") logger.info(f"Reading TA key from: {ta_path}") with open(ta_path, 'r') as f: ta_content = f.read() logger.info(f"TA key loaded (length: {len(ta_content)} chars)") except IOError as e: logger.error(f"IOError while reading certificate files: {e}") return False, f"Error reading files: {e}" # Step 4: Assemble the .ovpn configuration logger.info("Step 4: Assembling OVPN configuration") ovpn_config = f""" client dev tun proto udp remote 14.241.240.102 1194 # use FTP IP address resolv-retry infinite nobind persist-key persist-tun remote-cert-tls server cipher AES-256-GCM # push mac address info push-peer-info verb 3 {ca_content} {client_crt_content} {client_key_content} {ta_content} key-direction 1 """ logger.info(f"OVPN configuration assembled (length: {len(ovpn_config)} chars)") # Step 5: Write the configuration to the output file logger.info("Step 5: Writing OVPN configuration to file") try: output_dir = os.path.dirname(output_path) logger.info(f"Checking output directory: {output_dir}") if not os.path.isdir(output_dir) or not os.access(output_dir, os.W_OK): logger.warning(f"Output directory {output_dir} is not writable, using local directory") local_output_dir = "generated-clients" if not os.path.exists(local_output_dir): logger.info(f"Creating local output directory: {local_output_dir}") os.makedirs(local_output_dir) local_output_path = os.path.join(local_output_dir, f"{username}.ovpn") logger.info(f"Writing to local path: {local_output_path}") with open(local_output_path, 'w') as f: f.write(ovpn_config) logger.info(f"Successfully wrote {len(ovpn_config)} chars to {local_output_path}") return True, f"Successfully generated client config. Could not write to server path. Saved config locally to: {local_output_path}" logger.info(f"Writing to server path: {output_path}") with open(output_path, 'w') as f: f.write(ovpn_config) logger.info(f"Successfully wrote {len(ovpn_config)} chars to {output_path}") return True, f"Successfully generated client config: {output_path}" except IOError as e: logger.error(f"IOError while writing OVPN file: {e}") return False, f"Error writing to file: {e}"