#!/usr/bin/env python3 """ A diagnostic tool for gemini servers. This program will barrage your server with a series of requests in an attempt to uncover unexpected behavior. Not all of these checks adhere strictly to the gemini specification. Some of them are general best practices, and some trigger undefined behavior. Results should be taken with a grain of salt and analyzed on their own merit. """ import argparse import contextlib import datetime import ipaddress import socket import ssl import sys import time import typing if sys.version_info < (3, 7): sys.exit("Fatal Error: script requires Python 3.7+") socket.setdefaulttimeout(5) # ANSI color codes A_BOLD = 1 FG_BLACK = 30 FG_RED = 31 FG_GREEN = 32 FG_YELLOW = 33 FG_BLUE = 34 FG_MAGENTA = 35 FG_CYAN = 36 FG_WHITE = 37 def colorize(text: str, color: int) -> str: """ Colorize text using ANSI escape codes. """ if sys.stdout.isatty(): return f"\033[{color}m{text}\033[0m" else: return text def log(text: str, style: str = "normal") -> None: """ Print formatted text to stdout with optional styling. """ if style == "title": text = colorize(text, A_BOLD) if style == "warning": text = colorize(f" {text}", FG_YELLOW) elif style == "info": text = colorize(f" {text}", FG_CYAN) elif style == "success": text = colorize(f" ✓ {text}", FG_GREEN) elif style == "failure": text = colorize(f" x {text}", FG_RED) print(text) def log_error(err: Exception) -> None: """ Helper method for formatting exceptions as error messages. """ if isinstance(err, Warning): log(str(err), style="warning") else: log(str(err), style="failure") class GeminiResponse: def __init__(self, header): self.charset: str = "utf-8" self.header: str = header self.body: str = "" self.meta: typing.Optional[str] = None self.status: typing.Optional[str] = None self.mime: typing.Optional[str] = None class BaseCheck: """ Abstract base class for implementing server checks. """ description: str = "" def __init__(self, args: argparse.Namespace): self.args = args def run(self) -> None: """ Run the check and log any unhandled exceptions. """ log(f"[{self.__class__.__name__}] {self.__doc__}", style="title") try: self.check() except Exception as e: log_error(e) log("") def check(self) -> None: raise NotImplemented @property def netloc(self): if self.args.port == 1965: return self.args.host else: return f"{self.args.host}:{self.args.port}" def resolve_host(self, family: socket.AddressFamily) -> tuple: """ Retrieve the IP address and connection information for the host. """ host = self.args.host port = self.args.port type_ = socket.SOCK_STREAM proto = socket.IPPROTO_TCP addr_info = socket.getaddrinfo(host, port, family, type_, proto) if not addr_info: raise UserWarning(f"No {family} address found for host") # Gemini IPv6 return addr_info[0][4] @contextlib.contextmanager def connection( self, context: typing.Optional[ssl.SSLContext] = None ) -> ssl.SSLSocket: """ Setup an unverified TLS socket connection with the host. """ if context is None: context = ssl.SSLContext(ssl.PROTOCOL_TLS) context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection( (self.args.host, self.args.port), timeout=5 ) as sock: with context.wrap_socket(sock) as ssock: yield ssock def make_request(self, url: str) -> GeminiResponse: """ Send the request verbatim to the server and parse the response bytes. """ log("Requesting URL") log(repr(url), style="info") with self.connection() as sock: sock.sendall(url.encode(errors="surrogateescape")) fp = sock.makefile("rb") header = fp.readline().decode() log("Response header") log(repr(header), style="info") response = GeminiResponse(header) try: response.status, response.meta = header.strip().split(maxsplit=1) except ValueError: return response if response.status.startswith("2"): meta_parts = [part.strip() for part in response.meta.split(";")] response.mime = meta_parts[0] for part in meta_parts[1:]: if part.lower().startswith("charset="): response.charset = part[8:] response.body = fp.read().decode(response.charset) return response class IPv4Address(BaseCheck): """Establish a connection over an IPv4 address""" def check(self): log(f"Looking up IPv4 address for {self.args.host!r}") addr = self.resolve_host(socket.AF_INET) log(f"{addr[0]!r}", style="success") log(f"Attempting to connect to {addr[0]}:{addr[1]}") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect(addr) sock.close() log(f"Successfully established connection", style="success") class IPv6Address(BaseCheck): """Establish a connection over an IPv6 address""" def check(self) -> None: log(f"Looking up IPv6 address for {self.args.host!r}") addr = self.resolve_host(socket.AF_INET6) if ipaddress.ip_address(addr[0]).ipv4_mapped: raise UserWarning( "Found IPv4-mapped address, is your network IPv6 enabled?" ) log(f"{addr[0]!r}", style="success") log(f"Attempting to connect to [{addr[0]}]:{addr[1]}") with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: sock.connect(addr) sock.close() log(f"Successfully established connection", style="success") class TLSVersion(BaseCheck): """Server must support TLS v1.2 or higher""" def check(self) -> None: log(f"Checking client library") log(f"{ssl.OPENSSL_VERSION!r}", style="info") log("Determining highest supported TLS version") with self.connection() as sock: version = sock.version() if version in ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1"): log(f"Negotiated {version}", style="failure") elif version == "TLSv1.2": log(f"Negotiated {version}", style="warning") else: log(f"Negotiated {version}", style="success") class TLSClaims(BaseCheck): """Certificate claims must be valid""" def check(self) -> None: try: # $ pip install cryptography import cryptography from cryptography.hazmat.backends import default_backend from cryptography.x509.oid import NameOID, ExtensionOID except ImportError: raise UserWarning("cryptography library not installed, skipping check") with self.connection() as sock: # Python refuses to parse a certificate unless the issuer is validated. # Because many gemini servers use self-signed certs, we need to use # a third-party library to parse the certs from their binary form. der_x509 = sock.getpeercert(binary_form=True) cert = default_backend().load_der_x509_certificate(der_x509) now = datetime.datetime.utcnow() log('Checking "Not Valid Before" timestamp') style = "success" if cert.not_valid_before <= now else "failure" log(f"{cert.not_valid_before} UTC", style) log('Checking "Not Valid After" timestamp') style = "success" if cert.not_valid_after >= now else "failure" log(f"{cert.not_valid_after} UTC", style) log("Checking subject claim matches server hostname") subject = [] for cn in cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME): subject.append(("commonName", cn.value)) subject_alt_name = [] try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.SUBJECT_ALTERNATIVE_NAME ) except cryptography.x509.ExtensionNotFound: pass else: for dns in ext.value.get_values_for_type(cryptography.x509.DNSName): subject_alt_name.append(("DNS", dns)) for ip_address in ext.value.get_values_for_type( cryptography.x509.IPAddress ): subject_alt_name.append(("IP Address", ip_address)) cert_dict = { "subject": (tuple(subject),), "subjectAltName": tuple(subject_alt_name), } log(f"{cert_dict!r}", style="info") ssl.match_hostname(cert_dict, self.args.host) log(f"Hostname {self.args.host!r} matches claim", style="success") class TLSVerified(BaseCheck): """Certificate should be self-signed or have a trusted issuer""" def check(self) -> None: log("Connecting over verified SSL socket") context = ssl.create_default_context() try: with socket.create_connection((self.args.host, self.args.port)) as sock: with context.wrap_socket(sock, server_hostname=self.args.host) as ssock: ssock.sendall(f"gemini://{self.netloc}\r\n".encode()) except Exception as e: if getattr(e, "verify_code", None) == 18: log("Self-signed TLS certificate detected", style="warning") else: raise else: log("Established trusted TLS connection", style="success") class TLSRequired(BaseCheck): """Non-TLS requests should be refused""" def check(self) -> None: log("Sending non-TLS request") try: with socket.create_connection((self.args.host, self.args.port)) as sock: sock.sendall(f"gemini://{self.netloc}\r\n".encode()) fp = sock.makefile("rb") header = fp.readline().decode() if header: log(f"Received unexpected response {header!r}", style="failure") else: log(f"Connection closed by server", style="success") except Exception as e: # A connection error is a valid response log(f"{e}", style="success") class ConcurrentConnections(BaseCheck): """Server should support concurrent connections""" def check(self) -> None: url = f"gemini://{self.netloc}/\r\n" log(f"Attempting to establish two connections") with self.connection() as sock: log("Opening socket 1", style="info") sock.send(url[0].encode()) with self.connection() as sock2: log("Opening socket 2", style="info") sock2.sendall(url.encode()) log("Closing socket 2", style="info") sock.sendall(url[1:].encode()) log("Closing socket 1", style="info") log(f"Concurrent connections supported", style="success") class Homepage(BaseCheck): """Request the gemini homepage""" def check(self) -> None: url = f"gemini://{self.netloc}/\r\n" response = self.make_request(url) log("Status should be 20 (SUCCESS)") style = "success" if response.status == "20" else "failure" log(f"{response.status!r}", style) log('Mime type should be "text/gemini"') style = "success" if response.mime == "text/gemini" else "failure" log(f"{response.mime!r}", style) log('Header should end with "\\r\\n"') style = "success" if response.header.endswith("\r\n") else "failure" log(f"{response.header[-2:]!r}", style) log("Body should be non-empty") style = "success" if response.body else "failure" log(f"{response.body[:50]!r}", style) log("Body should terminate with a newline") style = "success" if response.body.endswith("\n") else "failure" log(f"{response.body[-1:]!r}", style) log('Body should use "\\r\\n" line endings') bad_line = None for line in response.body.splitlines(True): if not line.endswith("\r\n"): bad_line = line break if bad_line is None: log("All lines end with '\\r\\n'", style="success") else: log(f"Invalid line ending {bad_line!r}", style="failure") class HomepageRedirect(BaseCheck): """A URL with no trailing slash should redirect to the canonical resource""" def check(self) -> None: url = f"gemini://{self.netloc}\r\n" response = self.make_request(url) log("Status should be 31 (REDIRECT PERMANENT)") style = "success" if response.status == "31" else "failure" log(f"{response.status!r}", style) log('Meta should redirect to location "gemini://[hostname]/"') style = "success" if response.meta == f"gemini://{self.netloc}/" else "failure" log(f"{response.meta!r}", style) log('Header should end with "\\r\\n"') style = "success" if response.header.endswith("\r\n") else "failure" log(f"{response.header[-2:]!r}", style) log("Body should be empty") style = "success" if response.body == "" else "failure" log(f"{response.body[:50]!r}", style) class PageNotFound(BaseCheck): """Request a gemini URL that does not exist""" def check(self) -> None: url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n" response = self.make_request(url) log("Status should be 51 (NOT FOUND)") style = "success" if response.status == "51" else "failure" log(f"{response.status!r}", style) log('Header should end with "\\r\\n"') style = "success" if response.header.endswith("\r\n") else "failure" log(f"{response.header[-2:]!r}", style) log("Body should be empty") style = "success" if response.body == "" else "failure" log(f"{response.body[:50]!r}", style) class RequestMissingCR(BaseCheck): """A request without a should timeout""" def check(self) -> None: url = f"gemini://{self.netloc}/\n" try: response = self.make_request(url) except Exception as e: log("No response should be received") log(f"{e}", style="success") else: log("No response should be received") log(f"{response.status!r}", style="failure") class URLIncludePort(BaseCheck): """Send the URL with the port explicitly defined""" def check(self) -> None: url = f"gemini://{self.args.host}:{self.args.port}/\r\n" response = self.make_request(url) log("Status should be 20 (SUCCESS)") style = "success" if response.status == "20" else "failure" log(f"{response.status!r}", style) class URLByIPAddress(BaseCheck): """Send the URL using the IPv4 address""" def check(self) -> None: addr = self.resolve_host(socket.AF_INET) url = f"gemini://{addr[0]}:{addr[1]}/\r\n" response = self.make_request(url) log("The appropriate status depends on desired behavior of the server") log(f"{response.status!r}", style="success") class URLInvalidUTF8Byte(BaseCheck): """Send a URL containing a non-UTF8 byte sequence""" def check(self) -> None: non_utf8_character = "\udcdc" # Surrogate-escaped byte sequence url = f"gemini://{self.netloc}/{non_utf8_character}\r\n" try: response = self.make_request(url) except Exception: response = None log("Connection should either drop or return a 59 BAD REQUEST") if response is None: log("Connection closed without response", style="success") else: style = "success" if response.status == "59" else "failure" log(f"{response.status!r}", style) class URLWrongPort(BaseCheck): """A URL with an incorrect port number should be rejected""" def check(self) -> None: url = f"gemini://{self.args.host}:443/\r\n" response = self.make_request(url) log("Response should return a 4x or 5x status code") style = "success" if response.status.startswith(("4", "5")) else "failure" log(f"{response.status!r}", style) class URLWrongHost(BaseCheck): """A URL with a foreign hostname should be rejected""" def check(self) -> None: url = f"gemini://wikipedia.org/\r\n" response = self.make_request(url) log("Response should return a 4x or 5x status code") style = "success" if response.status.startswith(("4", "5")) else "failure" log(f"{response.status!r}", style) class URLMaxSize(BaseCheck): """Send a 1024 byte URL, the maximum allowed size""" def check(self) -> None: # Per the spec, the are not included in the total size base_url = f"gemini://{self.netloc}/" buffer = "0" * (1024 - len(base_url.encode("utf-8"))) url = base_url + buffer + "\r\n" response = self.make_request(url) log("Status should be 51 (NOT FOUND)") style = "success" if response.status == "51" else "failure" log(f"{response.status!r}", style) class URLAboveMaxSize(BaseCheck): """Send a 1025 byte URL, above the maximum allowed size""" def check(self) -> None: # Per the spec, the are not included in the total size base_url = f"gemini://{self.netloc}/" buffer = "0" * (1025 - len(base_url.encode("utf-8"))) url = base_url + buffer + "\r\n" try: response = self.make_request(url) except Exception: response = None log("Connection should either drop or return a 59 BAD REQUEST") if response is None: log("Connection closed without response", style="success") else: style = "success" if response.status == "59" else "failure" log(f"{response.status!r}", style) class URLSchemeHTTP(BaseCheck): """Send a URL with an HTTP scheme""" def check(self) -> None: url = f"http://{self.netloc}/\r\n" response = self.make_request(url) log("Response should return a 4x or 5x status code") style = "success" if response.status.startswith(("4", "5")) else "failure" log(f"{response.status!r}", style) class URLSchemeHTTPS(BaseCheck): """Send a URL with an HTTPS scheme""" def check(self) -> None: url = f"https://{self.netloc}/\r\n" response = self.make_request(url) log("Response should return a 4x or 5x status code") style = "success" if response.status.startswith(("4", "5")) else "failure" log(f"{response.status!r}", style) class URLSchemeGopher(BaseCheck): """Send a URL with a Gopher scheme""" def check(self) -> None: url = f"gopher://{self.netloc}/\r\n" response = self.make_request(url) log("Response should return a 4x or 5x status code") style = "success" if response.status.startswith(("4", "5")) else "failure" log(f"{response.status!r}", style) class URLSchemeMissing(BaseCheck): """A URL without a scheme should be inferred as gemini""" def check(self) -> None: url = f"//{self.netloc}/\r\n" response = self.make_request(url) log("Status should be 20 (SUCCESS)") style = "success" if response.status == "20" else "failure" log(f"{response.status!r}", style) class URLEmpty(BaseCheck): """Empty URLs should not be accepted by the server""" def check(self) -> None: url = f"\r\n" response = self.make_request(url) log("Status should be 5x (PERMANENT FAILURE)") style = "success" if response.status.startswith("5") else "failure" log(f"{response.status!r}", style) class URLRelative(BaseCheck): """Relative URLs should not be accepted by the server""" def check(self) -> None: url = f"/\r\n" response = self.make_request(url) log("Status should be 5x (PERMANENT FAILURE)") style = "success" if response.status.startswith("5") else "failure" log(f"{response.status!r}", style) class URLDotEscape(BaseCheck): """A URL should not be able to escape the root using dot notation""" def check(self) -> None: url = f"gemini://{self.netloc}/../../\r\n" response = self.make_request(url) log("Status should be 5x (PERMANENT FAILURE)") style = "success" if response.status.startswith("5") else "failure" log(f"{response.status!r}", style) # TODO: Test sending a transient client certificate # TODO: Test with client pinned to TLS v1.1 CHECKS = [ IPv4Address, IPv6Address, TLSVersion, TLSClaims, TLSVerified, TLSRequired, ConcurrentConnections, Homepage, HomepageRedirect, PageNotFound, RequestMissingCR, URLIncludePort, URLByIPAddress, URLInvalidUTF8Byte, URLWrongPort, URLWrongHost, URLMaxSize, URLAboveMaxSize, URLSchemeHTTP, URLSchemeHTTPS, URLSchemeGopher, URLSchemeMissing, URLEmpty, URLRelative, URLDotEscape, ] def build_epilog(): epilog = ["list of checks:"] for check in CHECKS: epilog.append(colorize(f" [{check.__name__}]", A_BOLD)) epilog.append(f" {check.__doc__}") return "\n".join(epilog) parser = argparse.ArgumentParser( usage="%(prog)s host [port] [--help]", description=__doc__, epilog=build_epilog(), formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("host", help="server hostname to connect to") parser.add_argument( "port", nargs="?", type=int, default=1965, help="server port to connect to (default: 1965)", ) parser.add_argument("--checks", help="comma separated list of checks to apply") parser.add_argument( "--delay", type=float, default=2, help="seconds to sleep between checks (default: 2)", ) def run(): args = parser.parse_args() if args.checks: check_names = {cls.__name__: cls for cls in CHECKS} check_list = [] for name in args.checks.split(","): name = name.strip() if name not in check_names: raise ValueError(f"unknown check {name!r}") check_list.append(check_names[name]) else: check_list = CHECKS log(f"Running gemini server diagnostics check against {args.host}:{args.port}") log("...\n") for check in check_list: time.sleep(args.delay) check(args).run() log("Done!") if __name__ == "__main__": run()