diff --git a/jetforce.py b/jetforce.py index 091e89d..ee3c947 100755 --- a/jetforce.py +++ b/jetforce.py @@ -38,6 +38,7 @@ import argparse import asyncio import codecs import dataclasses +import datetime import mimetypes import os import pathlib @@ -51,6 +52,11 @@ import time import typing import urllib.parse +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa + if sys.version_info < (3, 7): sys.exit("Fatal Error: jetforce requires Python 3.7+") @@ -756,22 +762,45 @@ class GeminiServer: def generate_ad_hoc_certificate(hostname: str) -> typing.Tuple[str, str]: """ - Utility function to generate a self-signed SSL certificate key pair if - one isn't provided. Results may vary depending on your version of OpenSSL. + Utility function to generate an ad-hoc self-signed SSL certificate. """ - certfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.crt" - keyfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.key" - if not certfile.exists() or not keyfile.exists(): - print(f"Writing ad hoc TLS certificate to {certfile}") - subprocess.run( - [ - f"openssl req -newkey rsa:2048 -nodes -keyout {keyfile}" - f' -nodes -x509 -out {certfile} -subj "/CN={hostname}"' - ], - shell=True, - check=True, + certfile = os.path.join(tempfile.gettempdir(), f"{hostname}.crt") + keyfile = os.path.join(tempfile.gettempdir(), f"{hostname}.key") + + if not os.path.exists(certfile) or not os.path.exists(keyfile): + backend = default_backend() + + print("Generating private key...", file=sys.stderr) + private_key = rsa.generate_private_key(65537, 2048, default_backend()) + with open(keyfile, "wb") as fp: + # noinspection PyTypeChecker + key_data = private_key.private_bytes( + serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + fp.write(key_data) + + print("Generating certificate...", file=sys.stderr) + common_name = x509.NameAttribute(x509.NameOID.COMMON_NAME, hostname) + subject_name = x509.Name([common_name]) + not_valid_before = datetime.datetime.utcnow() + not_valid_after = not_valid_before + datetime.timedelta(days=365) + certificate = x509.CertificateBuilder( + subject_name=subject_name, + issuer_name=subject_name, + public_key=private_key.public_key(), + serial_number=x509.random_serial_number(), + not_valid_before=not_valid_before, + not_valid_after=not_valid_after, ) - return str(certfile), str(keyfile) + certificate = certificate.sign(private_key, hashes.SHA256(), backend) + with open(certfile, "wb") as fp: + # noinspection PyTypeChecker + cert_data = certificate.public_bytes(serialization.Encoding.PEM) + fp.write(cert_data) + + return certfile, keyfile def make_ssl_context(