From 7b086f893dd7849aa39c494dbd19fd56486fe028 Mon Sep 17 00:00:00 2001 From: arthur Date: Mon, 20 Oct 2025 13:35:58 +0700 Subject: [PATCH] add automatcially handle error when conflicting --- main.py | 3 + util/client/generate_client.py | 153 +++++++++++++++++++++++++++------ 2 files changed, 131 insertions(+), 25 deletions(-) diff --git a/main.py b/main.py index 48c3be5..57c5c66 100644 --- a/main.py +++ b/main.py @@ -44,6 +44,9 @@ def generate_client(request: ClientRequest): 2. Assemble the `.ovpn` file with the new certificate/key and the server's CA and TA keys. 3. Save the file to the server's client configuration directory. 4. Send the .ovpn file and user guide via email to the provided email address. + + The system automatically handles certificate conflicts by reusing existing certificates + or resolving conflicts transparently. """ logger.info(f"Client generation request for user: {request.username}, email: {request.email}") diff --git a/util/client/generate_client.py b/util/client/generate_client.py index 5f7b8ee..2e19b79 100644 --- a/util/client/generate_client.py +++ b/util/client/generate_client.py @@ -10,10 +10,70 @@ from typing import Tuple # Set up logging logger = logging.getLogger(__name__) +def revoke_client_certificate(username: str) -> Tuple[bool, str]: + """ + Revoke an existing client certificate. + + Args: + username: The username whose certificate to revoke + + Returns: + A tuple containing a boolean indicating success and a message. + """ + logger.info(f"Starting certificate revocation for user: {username}") + + easyrsa_dir = "/home/arthur/openvpn-ca/" + + if not os.path.exists(easyrsa_dir): + logger.error(f"EasyRSA directory does not exist: {easyrsa_dir}") + return False, f"Error: EasyRSA directory '{easyrsa_dir}' does not exist." + + try: + # First revoke the certificate + revoke_command = ["./easyrsa", "--batch", "revoke", username] + logger.info(f"Executing revoke command: {' '.join(revoke_command)}") + + revoke_process = subprocess.run( + revoke_command, + cwd=easyrsa_dir, + capture_output=True, + text=True, + check=True + ) + + logger.info("Certificate revoked successfully") + + # Then generate new CRL (Certificate Revocation List) + crl_command = ["./easyrsa", "--batch", "gen-crl"] + logger.info(f"Executing CRL generation command: {' '.join(crl_command)}") + + crl_process = subprocess.run( + crl_command, + cwd=easyrsa_dir, + capture_output=True, + text=True, + check=True + ) + + logger.info("CRL generated successfully") + return True, f"Successfully revoked certificate for user: {username}" + + except subprocess.CalledProcessError as e: + logger.error(f"Revocation failed for user {username}: {e.stderr}") + return False, f"Failed to revoke certificate for user {username}: {e.stderr}" + except Exception as e: + logger.error(f"Unexpected error during revocation: {e}") + return False, f"Unexpected error revoking certificate: {e}" + def generate_client_config(username: str, email: str) -> Tuple[bool, str]: """ Generates a .ovpn file for a given user. + This function automatically handles certificate conflicts by: + 1. Reusing existing certificates if they exist + 2. Automatically resolving request file conflicts + 3. Providing clear error messages only when automatic resolution fails + Args: username: The username for which to generate the config. email: The email for which to generate the config and used as password @@ -46,34 +106,77 @@ def generate_client_config(username: str, email: str) -> Tuple[bool, str]: # Step 1: Generate the client certificate logger.info("Step 1: Starting client certificate generation") - try: - command = ["./easyrsa", "--batch", "build-client-full", username] - logger.info(f"Executing command: {' '.join(command)} in directory: {easyrsa_dir}") - process = subprocess.run( - command, - cwd=easyrsa_dir, - env=env, - check=True, - capture_output=True, - text=True - ) + # Check if certificate already exists + if os.path.isfile(client_crt_path): + logger.info(f"Certificate already exists for user {username} at {client_crt_path}") + logger.info("Skipping certificate generation - will reuse existing certificate") + else: + logger.info(f"Certificate does not exist for user {username}, generating new one") + try: + command = ["./easyrsa", "--batch", "build-client-full", username] + logger.info(f"Executing command: {' '.join(command)} in directory: {easyrsa_dir}") - logger.info("EasyRSA command completed successfully") - if process.stdout: - logger.info(f"EasyRSA stdout: {process.stdout.strip()}") - if process.stderr: - logger.warning(f"EasyRSA stderr: {process.stderr.strip()}") + process = subprocess.run( + command, + cwd=easyrsa_dir, + env=env, + check=True, + capture_output=True, + text=True + ) - except FileNotFoundError as e: - logger.error(f"FileNotFoundError: EasyRSA script not found in {easyrsa_dir}") - logger.error(f"Error details: {e}") - return False, f"Error: 'easyrsa' script not found in {easyrsa_dir}. Please check the path." - except subprocess.CalledProcessError as e: - logger.error(f"CalledProcessError: EasyRSA command failed with return code {e.returncode}") - logger.error(f"EasyRSA stdout: {e.stdout}") - logger.error(f"EasyRSA stderr: {e.stderr}") - return False, f"Error generating certificate for user: {username}. Return code: {e.returncode}. Stdout: {e.stdout}. Stderr: {e.stderr}" + logger.info("EasyRSA command completed successfully") + if process.stdout: + logger.info(f"EasyRSA stdout: {process.stdout.strip()}") + if process.stderr: + logger.warning(f"EasyRSA stderr: {process.stderr.strip()}") + + except FileNotFoundError as e: + logger.error(f"FileNotFoundError: EasyRSA script not found in {easyrsa_dir}") + logger.error(f"Error details: {e}") + return False, f"Error: 'easyrsa' script not found in {easyrsa_dir}. Please check the path." + except subprocess.CalledProcessError as e: + logger.error(f"CalledProcessError: EasyRSA command failed with return code {e.returncode}") + logger.error(f"EasyRSA stdout: {e.stdout}") + logger.error(f"EasyRSA stderr: {e.stderr}") + + # Check if it's a conflict error (file already exists) + if e.returncode == 1 and "already exists" in e.stderr: + logger.warning(f"Certificate request already exists for user {username} - attempting automatic resolution") + + # Try to remove the conflicting request file and retry + req_file_path = f"{easyrsa_dir}/pki/reqs/{username}.req" + if os.path.exists(req_file_path): + logger.info(f"Removing conflicting request file: {req_file_path}") + try: + os.remove(req_file_path) + logger.info("Conflicting request file removed, retrying certificate generation") + + # Retry the certificate generation + retry_process = subprocess.run( + ["./easyrsa", "--batch", "build-client-full", username], + cwd=easyrsa_dir, + env=env, + check=True, + capture_output=True, + text=True + ) + + logger.info("Certificate generation succeeded after conflict resolution") + if retry_process.stdout: + logger.info(f"Retry stdout: {retry_process.stdout.strip()}") + if retry_process.stderr: + logger.warning(f"Retry stderr: {retry_process.stderr.strip()}") + + except (OSError, subprocess.CalledProcessError) as retry_error: + logger.error(f"Failed to resolve conflict automatically: {retry_error}") + return False, f"Certificate conflict detected for user: {username}. Automatic resolution failed. Please manually revoke the certificate: cd {easyrsa_dir} && ./easyrsa revoke {username}" + else: + logger.warning(f"Request file not found at expected location: {req_file_path}") + return False, f"Certificate conflict detected for user: {username}. Request file not found at expected location. Please manually revoke: cd {easyrsa_dir} && ./easyrsa revoke {username}" + else: + return False, f"Error generating certificate for user: {username}. Return code: {e.returncode}. Stdout: {e.stdout}. Stderr: {e.stderr}" # Step 2: Verify that all required files exist logger.info("Step 2: Verifying required certificate files exist")