From cbbedb5f1d09ad5f25d37609cf41418a272c6ad5 Mon Sep 17 00:00:00 2001 From: Michael Lazar Date: Mon, 6 Jan 2020 23:23:40 -0500 Subject: [PATCH] Add diagnostics check script --- jetforce_diagnostics.py | 676 ++++++++++++++++++++++++++++++++++++++++ setup.py | 3 +- 2 files changed, 678 insertions(+), 1 deletion(-) create mode 100755 jetforce_diagnostics.py diff --git a/jetforce_diagnostics.py b/jetforce_diagnostics.py new file mode 100755 index 0000000..8ffa8a8 --- /dev/null +++ b/jetforce_diagnostics.py @@ -0,0 +1,676 @@ +#!/usr/bin/env python3 +""" +A diagnostic tool for gemini servers. + +This program will barrage your server with a series of requests in +an attempt to uncover unexpected behavior. Not all of these checks +adhere strictly to the gemini specification. Some of them are +general best practices, and some trigger undefined behavior. Results +should be taken with a grain of salt and analyzed on their own merit. +""" +import argparse +import contextlib +import datetime +import socket +import ssl +import sys +import time +import typing + +if sys.version_info < (3, 7): + sys.exit("Fatal Error: script requires Python 3.7+") + +socket.setdefaulttimeout(5) + +# ANSI color codes +A_BOLD = 1 +FG_BLACK = 30 +FG_RED = 31 +FG_GREEN = 32 +FG_YELLOW = 33 +FG_BLUE = 34 +FG_MAGENTA = 35 +FG_CYAN = 36 +FG_WHITE = 37 + + +def colorize(text: str, color: int) -> str: + """ + Colorize text using ANSI escape codes. + """ + if sys.stdout.isatty(): + return f"\033[{color}m{text}\033[0m" + else: + return text + + +def log(text: str, style: str = "normal") -> None: + """ + Print formatted text to stdout with optional styling. + """ + if style == "title": + text = colorize(text, A_BOLD) + if style == "warning": + text = colorize(f" {text}", FG_YELLOW) + elif style == "info": + text = colorize(f" {text}", FG_CYAN) + elif style == "success": + text = colorize(f" ✓ {text}", FG_GREEN) + elif style == "failure": + text = colorize(f" x {text}", FG_RED) + print(text) + + +def log_error(err: Exception) -> None: + """ + Helper method for formatting exceptions as error messages. + """ + if isinstance(err, Warning): + log(str(err), style="warning") + else: + log(str(err), style="failure") + + +class GeminiResponse: + def __init__(self, header): + self.charset: str = "utf-8" + self.header: str = header + self.body: str = "" + self.meta: typing.Optional[str] = None + self.status: typing.Optional[str] = None + self.mime: typing.Optional[str] = None + + +class BaseCheck: + """ + Abstract base class for implementing server checks. + """ + + description: str = "" + + def __init__(self, args: argparse.Namespace): + self.args = args + + def run(self) -> None: + """ + Run the check and log any unhandled exceptions. + """ + log(f"[{self.__class__.__name__}] {self.__doc__}", style="title") + try: + self.check() + except Exception as e: + log_error(e) + log("") + + def check(self) -> None: + raise NotImplemented + + @property + def netloc(self): + if self.args.port == 1965: + return self.args.host + else: + return f"{self.args.host}:{self.args.port}" + + def resolve_host(self, family: socket.AddressFamily) -> tuple: + """ + Retrieve the IP address and connection information for the host. + """ + host = self.args.host + port = self.args.port + type_ = socket.SOCK_STREAM + proto = socket.IPPROTO_TCP + addr_info = socket.getaddrinfo(host, port, family, type_, proto) + if not addr_info: + raise UserWarning("No address found for host") + return addr_info[0][4] + + @contextlib.contextmanager + def connection( + self, context: typing.Optional[ssl.SSLContext] = None + ) -> ssl.SSLSocket: + """ + Setup an unverified TLS socket connection with the host. + """ + if context is None: + context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + with socket.create_connection( + (self.args.host, self.args.port), timeout=5 + ) as sock: + with context.wrap_socket(sock) as ssock: + yield ssock + + def make_request(self, url: str) -> GeminiResponse: + """ + Send the request verbatim to the server and parse the response bytes. + """ + log("Requesting URL") + log(repr(url), style="info") + with self.connection() as sock: + sock.sendall(url.encode(errors="surrogateescape")) + fp = sock.makefile("rb") + header = fp.readline().decode() + + log("Response header") + log(repr(header), style="info") + + response = GeminiResponse(header) + try: + response.status, response.meta = header.strip().split(maxsplit=1) + except ValueError: + return response + + if response.status.startswith("2"): + meta_parts = [part.strip() for part in response.meta.split(";")] + response.mime = meta_parts[0] + for part in meta_parts[1:]: + if part.lower().startswith("charset="): + response.charset = part[8:] + + response.body = fp.read().decode(response.charset) + return response + + +class IPv4Address(BaseCheck): + """Establish a connection over an IPv4 address""" + + def check(self): + log(f"Looking up IPv4 address for {self.args.host!r}") + addr = self.resolve_host(socket.AF_INET) + log(f"{addr[0]!r}", style="success") + log(f"Attempting to connect to {addr[0]}:{addr[1]}") + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect(addr) + sock.close() + log(f"Successfully established connection", style="success") + + +class IPv6Address(BaseCheck): + """Establish a connection over an IPv6 address""" + + def check(self) -> None: + log(f"Looking up IPv6 address for {self.args.host!r}") + addr = self.resolve_host(socket.AF_INET6) + if addr[0].startswith("::ffff:"): + raise UserWarning("Found an IPv4-mapped address, skipping check") + log(f"{addr[0]!r}", style="success") + log(f"Attempting to connect to [{addr[0]}]:{addr[1]}") + with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: + sock.connect(addr) + sock.close() + log(f"Successfully established connection", style="success") + + +class TLSVersion(BaseCheck): + """Server must support TLS v1.2 or higher""" + + def check(self) -> None: + log(f"Checking client library") + log(f"{ssl.OPENSSL_VERSION!r}", style="info") + log("Determining highest supported TLS version") + with self.connection() as sock: + version = sock.version() + if version in ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1"): + log(f"Negotiated {version}", style="failure") + elif version == "TLSv1.2": + log(f"Negotiated {version}", style="warning") + else: + log(f"Negotiated {version}", style="success") + + +class TLSClaims(BaseCheck): + """Certificate claims must be valid""" + + def check(self) -> None: + try: + from cryptography.hazmat.backends import default_backend + from cryptography.x509.oid import NameOID + except ImportError: + raise UserWarning("Could not import cryptography library, skipping") + + with self.connection() as sock: + der_x509 = sock.getpeercert(binary_form=True) + cert = default_backend().load_der_x509_certificate(der_x509) + now = datetime.datetime.utcnow() + + log('Checking "Not Valid Before" timestamp') + style = "success" if cert.not_valid_before <= now else "failure" + log(f"{cert.not_valid_before} UTC", style) + + log('Checking "Not Valid After" timestamp') + style = "success" if cert.not_valid_after >= now else "failure" + log(f"{cert.not_valid_after} UTC", style) + + log("Checking common name matches hostname") + cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + cert_dict = {"subject": ((("commonName", cn),),)} + ssl.match_hostname(cert_dict, self.args.host) + log(repr(cn), style="success") + + +class TLSVerified(BaseCheck): + """Certificate should be self-signed or have a trusted issuer""" + + def check(self) -> None: + log("Connecting over verified SSL socket") + context = ssl.create_default_context() + try: + with socket.create_connection((self.args.host, self.args.port)) as sock: + with context.wrap_socket(sock, server_hostname=self.args.host) as ssock: + ssock.sendall(f"gemini://{self.netloc}\r\n".encode()) + except Exception as e: + if getattr(e, "verify_code", None) == 18: + log("Self-signed TLS certificate detected", style="warning") + else: + raise + else: + log("Established trusted TLS connection", style="success") + + +class TLSRequired(BaseCheck): + """Non-TLS requests should be refused""" + + def check(self) -> None: + log("Sending non-TLS request") + with socket.create_connection((self.args.host, self.args.port)) as sock: + sock.sendall(f"gemini://{self.netloc}\r\n".encode()) + fp = sock.makefile("rb") + header = fp.readline().decode() + if header: + log(f"Received unexpected response {header!r}", style="failure") + else: + log(f"Connection closed by server", style="success") + + +class ConcurrentConnections(BaseCheck): + """Server should support concurrent connections""" + + def check(self) -> None: + url = f"gemini://{self.netloc}/\r\n" + + log(f"Attempting to establish two connections") + with self.connection() as sock: + log("Opening socket 1", style="info") + sock.send(url[0].encode()) + with self.connection() as sock2: + log("Opening socket 2", style="info") + sock2.sendall(url.encode()) + log("Closing socket 2", style="info") + sock.sendall(url[1:].encode()) + log("Closing socket 1", style="info") + + log(f"Concurrent connections supported", style="success") + + +class Homepage(BaseCheck): + """Request the gemini homepage""" + + def check(self) -> None: + url = f"gemini://{self.netloc}/\r\n" + response = self.make_request(url) + + log("Status should be 20 (SUCCESS)") + style = "success" if response.status == "20" else "failure" + log(f"{response.status!r}", style) + + log('Mime type should be "text/gemini"') + style = "success" if response.mime == "text/gemini" else "failure" + log(f"{response.mime!r}", style) + + log('Header should end with "\\r\\n"') + style = "success" if response.header.endswith("\r\n") else "failure" + log(f"{response.header[-2:]!r}", style) + + log("Body should be non-empty") + style = "success" if response.body else "failure" + log(f"{response.body[:50]!r}", style) + + log("Body should terminate with a newline") + style = "success" if response.body.endswith("\n") else "failure" + log(f"{response.body[-1:]!r}", style) + + log('Body should use "\\r\\n" line endings') + bad_line = None + for line in response.body.splitlines(True): + if not line.endswith("\r\n"): + bad_line = line + break + if bad_line is None: + log("All lines end with '\\r\\n'", style="success") + else: + log(f"Invalid line ending {bad_line!r}", style="failure") + + +class HomepageRedirect(BaseCheck): + """A URL with no trailing slash should redirect to the canonical resource""" + + def check(self) -> None: + url = f"gemini://{self.netloc}\r\n" + response = self.make_request(url) + + log("Status should be 31 (REDIRECT PERMANENT)") + style = "success" if response.status == "31" else "failure" + log(f"{response.status!r}", style) + + log('Meta should redirect to location "gemini://[hostname]/"') + style = "success" if response.meta == f"gemini://{self.netloc}/" else "failure" + log(f"{response.meta!r}", style) + + log('Header should end with "\\r\\n"') + style = "success" if response.header.endswith("\r\n") else "failure" + log(f"{response.header[-2:]!r}", style) + + log("Body should be empty") + style = "success" if response.body == "" else "failure" + log(f"{response.body[:50]!r}", style) + + +class PageNotFound(BaseCheck): + """Request a gemini URL that does not exist""" + + def check(self) -> None: + url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n" + response = self.make_request(url) + + log("Status should be 51 (NOT FOUND)") + style = "success" if response.status == "51" else "failure" + log(f"{response.status!r}", style) + + log('Header should end with "\\r\\n"') + style = "success" if response.header.endswith("\r\n") else "failure" + log(f"{response.header[-2:]!r}", style) + + log("Body should be empty") + style = "success" if response.body == "" else "failure" + log(f"{response.body[:50]!r}", style) + + +class RequestMissingCR(BaseCheck): + """A request without a should timeout""" + + def check(self) -> None: + url = f"gemini://{self.netloc}/\n" + try: + response = self.make_request(url) + except Exception as e: + log("No response should be received") + log(f"{e}", style="success") + else: + log("No response should be received") + log(f"{response.status!r}", style="failure") + + +class URLIncludePort(BaseCheck): + """Send the URL with the port explicitly defined""" + + def check(self) -> None: + url = f"gemini://{self.args.host}:{self.args.port}/\r\n" + response = self.make_request(url) + + log("Status should be 20 (SUCCESS)") + style = "success" if response.status == "20" else "failure" + log(f"{response.status!r}", style) + + +class URLByIPAddress(BaseCheck): + """Send the URL using the IPv4 address""" + + def check(self) -> None: + addr = self.resolve_host(socket.AF_INET) + url = f"gemini://{addr[0]}:{addr[1]}/\r\n" + response = self.make_request(url) + + log("The appropriate status depends on desired behavior of the server") + log(f"{response.status!r}", style="success") + + +class URLInvalidUTF8Byte(BaseCheck): + """Send a URL containing a non-UTF8 byte sequence""" + + def check(self) -> None: + non_utf8_character = "\udcdc" # Surrogate-escaped byte sequence + url = f"gemini://{self.netloc}/{non_utf8_character}\r\n" + + try: + response = self.make_request(url) + except Exception: + response = None + + log("Connection should either drop or return a 59 BAD REQUEST") + if response is None: + log("Connection closed without response", style="success") + else: + style = "success" if response.status == "59" else "failure" + log(f"{response.status!r}", style) + + +class URLWrongPort(BaseCheck): + """A URL with an incorrect port number should be rejected""" + + def check(self) -> None: + url = f"gemini://{self.args.host}:443/\r\n" + response = self.make_request(url) + + log("Response should return a 4x or 5x status code") + style = "success" if response.status.startswith(("4", "5")) else "failure" + log(f"{response.status!r}", style) + + +class URLWrongHost(BaseCheck): + """A URL with a foreign hostname should be rejected""" + + def check(self) -> None: + url = f"gemini://wikipedia.org/\r\n" + response = self.make_request(url) + + log("Response should return a 4x or 5x status code") + style = "success" if response.status.startswith(("4", "5")) else "failure" + log(f"{response.status!r}", style) + + +class URLMaxSize(BaseCheck): + """Send a 1024 byte URL, the maximum allowed size""" + + def check(self) -> None: + # Per the spec, the are not included in the total size + base_url = f"gemini://{self.netloc}/" + buffer = "0" * (1024 - len(base_url.encode("utf-8"))) + url = base_url + buffer + "\r\n" + + response = self.make_request(url) + log("Status should be 51 (NOT FOUND)") + style = "success" if response.status == "51" else "failure" + log(f"{response.status!r}", style) + + +class URLAboveMaxSize(BaseCheck): + """Send a 1025 byte URL, above the maximum allowed size""" + + def check(self) -> None: + # Per the spec, the are not included in the total size + base_url = f"gemini://{self.netloc}/" + buffer = "0" * (1025 - len(base_url.encode("utf-8"))) + url = base_url + buffer + "\r\n" + + try: + response = self.make_request(url) + except Exception: + response = None + + log("Connection should either drop or return a 59 BAD REQUEST") + if response is None: + log("Connection closed without response", style="success") + else: + style = "success" if response.status == "59" else "failure" + log(f"{response.status!r}", style) + + +class URLSchemeHTTP(BaseCheck): + """Send a URL with an HTTP scheme""" + + def check(self) -> None: + url = f"http://{self.netloc}/\r\n" + response = self.make_request(url) + + log("Response should return a 4x or 5x status code") + style = "success" if response.status.startswith(("4", "5")) else "failure" + log(f"{response.status!r}", style) + + +class URLSchemeHTTPS(BaseCheck): + """Send a URL with an HTTPS scheme""" + + def check(self) -> None: + url = f"https://{self.netloc}/\r\n" + response = self.make_request(url) + + log("Response should return a 4x or 5x status code") + style = "success" if response.status.startswith(("4", "5")) else "failure" + log(f"{response.status!r}", style) + + +class URLSchemeGopher(BaseCheck): + """Send a URL with a Gopher scheme""" + + def check(self) -> None: + url = f"gopher://{self.netloc}/\r\n" + response = self.make_request(url) + + log("Response should return a 4x or 5x status code") + style = "success" if response.status.startswith(("4", "5")) else "failure" + log(f"{response.status!r}", style) + + +class URLSchemeMissing(BaseCheck): + """A URL without a scheme should be inferred as gemini""" + + def check(self) -> None: + url = f"://{self.netloc}/\r\n" + response = self.make_request(url) + + log("Status should be 20 (SUCCESS)") + style = "success" if response.status == "20" else "failure" + log(f"{response.status!r}", style) + + +class URLEmpty(BaseCheck): + """Empty URLs should not be accepted by the server""" + + def check(self) -> None: + url = f"\r\n" + response = self.make_request(url) + + log("Status should be 5x (PERMANENT FAILURE)") + style = "success" if response.status.startswith("5") else "failure" + log(f"{response.status!r}", style) + + +class URLRelative(BaseCheck): + """Relative URLs should not be accepted by the server""" + + def check(self) -> None: + url = f"/\r\n" + response = self.make_request(url) + + log("Status should be 5x (PERMANENT FAILURE)") + style = "success" if response.status.startswith("5") else "failure" + log(f"{response.status!r}", style) + + +class URLDotEscape(BaseCheck): + """A URL should not be able to escape the root using dot notation""" + + def check(self) -> None: + url = f"gemini://{self.netloc}/../../\r\n" + response = self.make_request(url) + + log("Status should be 5x (PERMANENT FAILURE)") + style = "success" if response.status.startswith("5") else "failure" + log(f"{response.status!r}", style) + + +CHECKS = [ + IPv4Address, + IPv6Address, + TLSVersion, + TLSClaims, + TLSVerified, + TLSRequired, + ConcurrentConnections, + Homepage, + HomepageRedirect, + PageNotFound, + RequestMissingCR, + URLIncludePort, + URLByIPAddress, + URLInvalidUTF8Byte, + URLWrongPort, + URLWrongHost, + URLMaxSize, + URLAboveMaxSize, + URLSchemeHTTP, + URLSchemeHTTPS, + URLSchemeGopher, + URLSchemeMissing, + URLEmpty, + URLRelative, + URLDotEscape, +] + + +def build_epilog(): + epilog = ["list of checks:"] + for check in CHECKS: + epilog.append(colorize(f" [{check.__name__}]", A_BOLD)) + epilog.append(f" {check.__doc__}") + return "\n".join(epilog) + + +parser = argparse.ArgumentParser( + usage="%(prog)s host [port] [--help]", + description=__doc__, + epilog=build_epilog(), + formatter_class=argparse.RawDescriptionHelpFormatter, +) +parser.add_argument("host", help="server hostname to connect to") +parser.add_argument( + "port", + nargs="?", + type=int, + default=1965, + help="server port to connect to (default: 1965)", +) +parser.add_argument("--checks", help="comma separated list of checks to apply") +parser.add_argument( + "--delay", + type=float, + default=2, + help="seconds to sleep between checks (default: 2)", +) + + +def run(): + args = parser.parse_args() + if args.checks: + check_names = {cls.__name__: cls for cls in CHECKS} + check_list = [] + for name in args.checks.split(","): + name = name.strip() + if name not in check_names: + raise ValueError(f"unknown check {name!r}") + check_list.append(check_names[name]) + else: + check_list = CHECKS + + log(f"Running gemini server diagnostics check against {args.host}:{args.port}") + log("...\n") + for check in check_list: + time.sleep(args.delay) + check(args).run() + log("Done!") + + +if __name__ == "__main__": + run() diff --git a/setup.py b/setup.py index 8c18809..07b83f3 100644 --- a/setup.py +++ b/setup.py @@ -18,11 +18,12 @@ setuptools.setup( description="An Experimental Gemini Server", long_description=long_description(), long_description_content_type="text/markdown", - py_modules=["jetforce", "jetforce_client"], + py_modules=["jetforce", "jetforce_client", "jetforce_diagnostics"], entry_points={ "console_scripts": [ "jetforce=jetforce:run_server", "jetforce-client=jetforce_client:run_client", + "jetforce-diagnostics=jetforce_diagnostics:run", ] }, python_requires=">=3.7",