diff --git a/jetforce_diagnostics.py b/jetforce_diagnostics.py index 8ffa8a8..efa831a 100755 --- a/jetforce_diagnostics.py +++ b/jetforce_diagnostics.py @@ -11,6 +11,7 @@ should be taken with a grain of salt and analyzed on their own merit. import argparse import contextlib import datetime +import ipaddress import socket import ssl import sys @@ -122,7 +123,8 @@ class BaseCheck: proto = socket.IPPROTO_TCP addr_info = socket.getaddrinfo(host, port, family, type_, proto) if not addr_info: - raise UserWarning("No address found for host") + raise UserWarning(f"No {family} address found for host") + # Gemini IPv6 return addr_info[0][4] @contextlib.contextmanager @@ -193,8 +195,10 @@ class IPv6Address(BaseCheck): def check(self) -> None: log(f"Looking up IPv6 address for {self.args.host!r}") addr = self.resolve_host(socket.AF_INET6) - if addr[0].startswith("::ffff:"): - raise UserWarning("Found an IPv4-mapped address, skipping check") + if ipaddress.ip_address(addr[0]).ipv4_mapped: + raise UserWarning( + "Found IPv4-mapped address, is your network IPv6 enabled?" + ) log(f"{addr[0]!r}", style="success") log(f"Attempting to connect to [{addr[0]}]:{addr[1]}") with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: @@ -225,12 +229,17 @@ class TLSClaims(BaseCheck): def check(self) -> None: try: + # $ pip install cryptography + import cryptography from cryptography.hazmat.backends import default_backend - from cryptography.x509.oid import NameOID + from cryptography.x509.oid import NameOID, ExtensionOID except ImportError: - raise UserWarning("Could not import cryptography library, skipping") + raise UserWarning("cryptography library not installed, skipping check") with self.connection() as sock: + # Python refuses to parse a certificate unless the issuer is validated. + # Because many gemini servers use self-signed certs, we need to use + # a third-party library to parse the certs from their binary form. der_x509 = sock.getpeercert(binary_form=True) cert = default_backend().load_der_x509_certificate(der_x509) now = datetime.datetime.utcnow() @@ -243,11 +252,33 @@ class TLSClaims(BaseCheck): style = "success" if cert.not_valid_after >= now else "failure" log(f"{cert.not_valid_after} UTC", style) - log("Checking common name matches hostname") - cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value - cert_dict = {"subject": ((("commonName", cn),),)} + log("Checking subject claim matches server hostname") + subject = [] + for cn in cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME): + subject.append(("commonName", cn.value)) + + subject_alt_name = [] + try: + ext = cert.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_ALTERNATIVE_NAME + ) + except cryptography.x509.ExtensionNotFound: + pass + else: + for dns in ext.value.get_values_for_type(cryptography.x509.DNSName): + subject_alt_name.append(("DNS", dns)) + for ip_address in ext.value.get_values_for_type( + cryptography.x509.IPAddress + ): + subject_alt_name.append(("IP Address", ip_address)) + + cert_dict = { + "subject": (tuple(subject),), + "subjectAltName": tuple(subject_alt_name), + } + log(f"{cert_dict!r}", style="info") ssl.match_hostname(cert_dict, self.args.host) - log(repr(cn), style="success") + log(f"Hostname {self.args.host!r} matches claim", style="success") class TLSVerified(BaseCheck): @@ -274,14 +305,18 @@ class TLSRequired(BaseCheck): def check(self) -> None: log("Sending non-TLS request") - with socket.create_connection((self.args.host, self.args.port)) as sock: - sock.sendall(f"gemini://{self.netloc}\r\n".encode()) - fp = sock.makefile("rb") - header = fp.readline().decode() - if header: - log(f"Received unexpected response {header!r}", style="failure") - else: - log(f"Connection closed by server", style="success") + try: + with socket.create_connection((self.args.host, self.args.port)) as sock: + sock.sendall(f"gemini://{self.netloc}\r\n".encode()) + fp = sock.makefile("rb") + header = fp.readline().decode() + if header: + log(f"Received unexpected response {header!r}", style="failure") + else: + log(f"Connection closed by server", style="success") + except Exception as e: + # A connection error is a valid response + log(f"{e}", style="success") class ConcurrentConnections(BaseCheck): @@ -547,7 +582,7 @@ class URLSchemeMissing(BaseCheck): """A URL without a scheme should be inferred as gemini""" def check(self) -> None: - url = f"://{self.netloc}/\r\n" + url = f"//{self.netloc}/\r\n" response = self.make_request(url) log("Status should be 20 (SUCCESS)") @@ -591,6 +626,8 @@ class URLDotEscape(BaseCheck): log(f"{response.status!r}", style) +# TODO: Test sending a transient client certificate +# TODO: Test with client pinned to TLS v1.1 CHECKS = [ IPv4Address, IPv6Address,