Add diagnostics check script

This commit is contained in:
Michael Lazar 2020-01-06 23:23:40 -05:00
parent a2c4a1a28a
commit cbbedb5f1d
2 changed files with 678 additions and 1 deletions

676
jetforce_diagnostics.py Executable file
View File

@ -0,0 +1,676 @@
#!/usr/bin/env python3
"""
A diagnostic tool for gemini servers.
This program will barrage your server with a series of requests in
an attempt to uncover unexpected behavior. Not all of these checks
adhere strictly to the gemini specification. Some of them are
general best practices, and some trigger undefined behavior. Results
should be taken with a grain of salt and analyzed on their own merit.
"""
import argparse
import contextlib
import datetime
import socket
import ssl
import sys
import time
import typing
if sys.version_info < (3, 7):
sys.exit("Fatal Error: script requires Python 3.7+")
socket.setdefaulttimeout(5)
# ANSI color codes
A_BOLD = 1
FG_BLACK = 30
FG_RED = 31
FG_GREEN = 32
FG_YELLOW = 33
FG_BLUE = 34
FG_MAGENTA = 35
FG_CYAN = 36
FG_WHITE = 37
def colorize(text: str, color: int) -> str:
"""
Colorize text using ANSI escape codes.
"""
if sys.stdout.isatty():
return f"\033[{color}m{text}\033[0m"
else:
return text
def log(text: str, style: str = "normal") -> None:
"""
Print formatted text to stdout with optional styling.
"""
if style == "title":
text = colorize(text, A_BOLD)
if style == "warning":
text = colorize(f" {text}", FG_YELLOW)
elif style == "info":
text = colorize(f" {text}", FG_CYAN)
elif style == "success":
text = colorize(f"{text}", FG_GREEN)
elif style == "failure":
text = colorize(f" x {text}", FG_RED)
print(text)
def log_error(err: Exception) -> None:
"""
Helper method for formatting exceptions as error messages.
"""
if isinstance(err, Warning):
log(str(err), style="warning")
else:
log(str(err), style="failure")
class GeminiResponse:
def __init__(self, header):
self.charset: str = "utf-8"
self.header: str = header
self.body: str = ""
self.meta: typing.Optional[str] = None
self.status: typing.Optional[str] = None
self.mime: typing.Optional[str] = None
class BaseCheck:
"""
Abstract base class for implementing server checks.
"""
description: str = ""
def __init__(self, args: argparse.Namespace):
self.args = args
def run(self) -> None:
"""
Run the check and log any unhandled exceptions.
"""
log(f"[{self.__class__.__name__}] {self.__doc__}", style="title")
try:
self.check()
except Exception as e:
log_error(e)
log("")
def check(self) -> None:
raise NotImplemented
@property
def netloc(self):
if self.args.port == 1965:
return self.args.host
else:
return f"{self.args.host}:{self.args.port}"
def resolve_host(self, family: socket.AddressFamily) -> tuple:
"""
Retrieve the IP address and connection information for the host.
"""
host = self.args.host
port = self.args.port
type_ = socket.SOCK_STREAM
proto = socket.IPPROTO_TCP
addr_info = socket.getaddrinfo(host, port, family, type_, proto)
if not addr_info:
raise UserWarning("No address found for host")
return addr_info[0][4]
@contextlib.contextmanager
def connection(
self, context: typing.Optional[ssl.SSLContext] = None
) -> ssl.SSLSocket:
"""
Setup an unverified TLS socket connection with the host.
"""
if context is None:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection(
(self.args.host, self.args.port), timeout=5
) as sock:
with context.wrap_socket(sock) as ssock:
yield ssock
def make_request(self, url: str) -> GeminiResponse:
"""
Send the request verbatim to the server and parse the response bytes.
"""
log("Requesting URL")
log(repr(url), style="info")
with self.connection() as sock:
sock.sendall(url.encode(errors="surrogateescape"))
fp = sock.makefile("rb")
header = fp.readline().decode()
log("Response header")
log(repr(header), style="info")
response = GeminiResponse(header)
try:
response.status, response.meta = header.strip().split(maxsplit=1)
except ValueError:
return response
if response.status.startswith("2"):
meta_parts = [part.strip() for part in response.meta.split(";")]
response.mime = meta_parts[0]
for part in meta_parts[1:]:
if part.lower().startswith("charset="):
response.charset = part[8:]
response.body = fp.read().decode(response.charset)
return response
class IPv4Address(BaseCheck):
"""Establish a connection over an IPv4 address"""
def check(self):
log(f"Looking up IPv4 address for {self.args.host!r}")
addr = self.resolve_host(socket.AF_INET)
log(f"{addr[0]!r}", style="success")
log(f"Attempting to connect to {addr[0]}:{addr[1]}")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(addr)
sock.close()
log(f"Successfully established connection", style="success")
class IPv6Address(BaseCheck):
"""Establish a connection over an IPv6 address"""
def check(self) -> None:
log(f"Looking up IPv6 address for {self.args.host!r}")
addr = self.resolve_host(socket.AF_INET6)
if addr[0].startswith("::ffff:"):
raise UserWarning("Found an IPv4-mapped address, skipping check")
log(f"{addr[0]!r}", style="success")
log(f"Attempting to connect to [{addr[0]}]:{addr[1]}")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.connect(addr)
sock.close()
log(f"Successfully established connection", style="success")
class TLSVersion(BaseCheck):
"""Server must support TLS v1.2 or higher"""
def check(self) -> None:
log(f"Checking client library")
log(f"{ssl.OPENSSL_VERSION!r}", style="info")
log("Determining highest supported TLS version")
with self.connection() as sock:
version = sock.version()
if version in ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1"):
log(f"Negotiated {version}", style="failure")
elif version == "TLSv1.2":
log(f"Negotiated {version}", style="warning")
else:
log(f"Negotiated {version}", style="success")
class TLSClaims(BaseCheck):
"""Certificate claims must be valid"""
def check(self) -> None:
try:
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
except ImportError:
raise UserWarning("Could not import cryptography library, skipping")
with self.connection() as sock:
der_x509 = sock.getpeercert(binary_form=True)
cert = default_backend().load_der_x509_certificate(der_x509)
now = datetime.datetime.utcnow()
log('Checking "Not Valid Before" timestamp')
style = "success" if cert.not_valid_before <= now else "failure"
log(f"{cert.not_valid_before} UTC", style)
log('Checking "Not Valid After" timestamp')
style = "success" if cert.not_valid_after >= now else "failure"
log(f"{cert.not_valid_after} UTC", style)
log("Checking common name matches hostname")
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
cert_dict = {"subject": ((("commonName", cn),),)}
ssl.match_hostname(cert_dict, self.args.host)
log(repr(cn), style="success")
class TLSVerified(BaseCheck):
"""Certificate should be self-signed or have a trusted issuer"""
def check(self) -> None:
log("Connecting over verified SSL socket")
context = ssl.create_default_context()
try:
with socket.create_connection((self.args.host, self.args.port)) as sock:
with context.wrap_socket(sock, server_hostname=self.args.host) as ssock:
ssock.sendall(f"gemini://{self.netloc}\r\n".encode())
except Exception as e:
if getattr(e, "verify_code", None) == 18:
log("Self-signed TLS certificate detected", style="warning")
else:
raise
else:
log("Established trusted TLS connection", style="success")
class TLSRequired(BaseCheck):
"""Non-TLS requests should be refused"""
def check(self) -> None:
log("Sending non-TLS request")
with socket.create_connection((self.args.host, self.args.port)) as sock:
sock.sendall(f"gemini://{self.netloc}\r\n".encode())
fp = sock.makefile("rb")
header = fp.readline().decode()
if header:
log(f"Received unexpected response {header!r}", style="failure")
else:
log(f"Connection closed by server", style="success")
class ConcurrentConnections(BaseCheck):
"""Server should support concurrent connections"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\r\n"
log(f"Attempting to establish two connections")
with self.connection() as sock:
log("Opening socket 1", style="info")
sock.send(url[0].encode())
with self.connection() as sock2:
log("Opening socket 2", style="info")
sock2.sendall(url.encode())
log("Closing socket 2", style="info")
sock.sendall(url[1:].encode())
log("Closing socket 1", style="info")
log(f"Concurrent connections supported", style="success")
class Homepage(BaseCheck):
"""Request the gemini homepage"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\r\n"
response = self.make_request(url)
log("Status should be 20 (SUCCESS)")
style = "success" if response.status == "20" else "failure"
log(f"{response.status!r}", style)
log('Mime type should be "text/gemini"')
style = "success" if response.mime == "text/gemini" else "failure"
log(f"{response.mime!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be non-empty")
style = "success" if response.body else "failure"
log(f"{response.body[:50]!r}", style)
log("Body should terminate with a newline")
style = "success" if response.body.endswith("\n") else "failure"
log(f"{response.body[-1:]!r}", style)
log('Body should use "\\r\\n" line endings')
bad_line = None
for line in response.body.splitlines(True):
if not line.endswith("\r\n"):
bad_line = line
break
if bad_line is None:
log("All lines end with '\\r\\n'", style="success")
else:
log(f"Invalid line ending {bad_line!r}", style="failure")
class HomepageRedirect(BaseCheck):
"""A URL with no trailing slash should redirect to the canonical resource"""
def check(self) -> None:
url = f"gemini://{self.netloc}\r\n"
response = self.make_request(url)
log("Status should be 31 (REDIRECT PERMANENT)")
style = "success" if response.status == "31" else "failure"
log(f"{response.status!r}", style)
log('Meta should redirect to location "gemini://[hostname]/"')
style = "success" if response.meta == f"gemini://{self.netloc}/" else "failure"
log(f"{response.meta!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be empty")
style = "success" if response.body == "" else "failure"
log(f"{response.body[:50]!r}", style)
class PageNotFound(BaseCheck):
"""Request a gemini URL that does not exist"""
def check(self) -> None:
url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n"
response = self.make_request(url)
log("Status should be 51 (NOT FOUND)")
style = "success" if response.status == "51" else "failure"
log(f"{response.status!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be empty")
style = "success" if response.body == "" else "failure"
log(f"{response.body[:50]!r}", style)
class RequestMissingCR(BaseCheck):
"""A request without a <CR> should timeout"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\n"
try:
response = self.make_request(url)
except Exception as e:
log("No response should be received")
log(f"{e}", style="success")
else:
log("No response should be received")
log(f"{response.status!r}", style="failure")
class URLIncludePort(BaseCheck):
"""Send the URL with the port explicitly defined"""
def check(self) -> None:
url = f"gemini://{self.args.host}:{self.args.port}/\r\n"
response = self.make_request(url)
log("Status should be 20 (SUCCESS)")
style = "success" if response.status == "20" else "failure"
log(f"{response.status!r}", style)
class URLByIPAddress(BaseCheck):
"""Send the URL using the IPv4 address"""
def check(self) -> None:
addr = self.resolve_host(socket.AF_INET)
url = f"gemini://{addr[0]}:{addr[1]}/\r\n"
response = self.make_request(url)
log("The appropriate status depends on desired behavior of the server")
log(f"{response.status!r}", style="success")
class URLInvalidUTF8Byte(BaseCheck):
"""Send a URL containing a non-UTF8 byte sequence"""
def check(self) -> None:
non_utf8_character = "\udcdc" # Surrogate-escaped byte sequence
url = f"gemini://{self.netloc}/{non_utf8_character}\r\n"
try:
response = self.make_request(url)
except Exception:
response = None
log("Connection should either drop or return a 59 BAD REQUEST")
if response is None:
log("Connection closed without response", style="success")
else:
style = "success" if response.status == "59" else "failure"
log(f"{response.status!r}", style)
class URLWrongPort(BaseCheck):
"""A URL with an incorrect port number should be rejected"""
def check(self) -> None:
url = f"gemini://{self.args.host}:443/\r\n"
response = self.make_request(url)
log("Response should return a 4x or 5x status code")
style = "success" if response.status.startswith(("4", "5")) else "failure"
log(f"{response.status!r}", style)
class URLWrongHost(BaseCheck):
"""A URL with a foreign hostname should be rejected"""
def check(self) -> None:
url = f"gemini://wikipedia.org/\r\n"
response = self.make_request(url)
log("Response should return a 4x or 5x status code")
style = "success" if response.status.startswith(("4", "5")) else "failure"
log(f"{response.status!r}", style)
class URLMaxSize(BaseCheck):
"""Send a 1024 byte URL, the maximum allowed size"""
def check(self) -> None:
# Per the spec, the <CR><LF> are not included in the total size
base_url = f"gemini://{self.netloc}/"
buffer = "0" * (1024 - len(base_url.encode("utf-8")))
url = base_url + buffer + "\r\n"
response = self.make_request(url)
log("Status should be 51 (NOT FOUND)")
style = "success" if response.status == "51" else "failure"
log(f"{response.status!r}", style)
class URLAboveMaxSize(BaseCheck):
"""Send a 1025 byte URL, above the maximum allowed size"""
def check(self) -> None:
# Per the spec, the <CR><LF> are not included in the total size
base_url = f"gemini://{self.netloc}/"
buffer = "0" * (1025 - len(base_url.encode("utf-8")))
url = base_url + buffer + "\r\n"
try:
response = self.make_request(url)
except Exception:
response = None
log("Connection should either drop or return a 59 BAD REQUEST")
if response is None:
log("Connection closed without response", style="success")
else:
style = "success" if response.status == "59" else "failure"
log(f"{response.status!r}", style)
class URLSchemeHTTP(BaseCheck):
"""Send a URL with an HTTP scheme"""
def check(self) -> None:
url = f"http://{self.netloc}/\r\n"
response = self.make_request(url)
log("Response should return a 4x or 5x status code")
style = "success" if response.status.startswith(("4", "5")) else "failure"
log(f"{response.status!r}", style)
class URLSchemeHTTPS(BaseCheck):
"""Send a URL with an HTTPS scheme"""
def check(self) -> None:
url = f"https://{self.netloc}/\r\n"
response = self.make_request(url)
log("Response should return a 4x or 5x status code")
style = "success" if response.status.startswith(("4", "5")) else "failure"
log(f"{response.status!r}", style)
class URLSchemeGopher(BaseCheck):
"""Send a URL with a Gopher scheme"""
def check(self) -> None:
url = f"gopher://{self.netloc}/\r\n"
response = self.make_request(url)
log("Response should return a 4x or 5x status code")
style = "success" if response.status.startswith(("4", "5")) else "failure"
log(f"{response.status!r}", style)
class URLSchemeMissing(BaseCheck):
"""A URL without a scheme should be inferred as gemini"""
def check(self) -> None:
url = f"://{self.netloc}/\r\n"
response = self.make_request(url)
log("Status should be 20 (SUCCESS)")
style = "success" if response.status == "20" else "failure"
log(f"{response.status!r}", style)
class URLEmpty(BaseCheck):
"""Empty URLs should not be accepted by the server"""
def check(self) -> None:
url = f"\r\n"
response = self.make_request(url)
log("Status should be 5x (PERMANENT FAILURE)")
style = "success" if response.status.startswith("5") else "failure"
log(f"{response.status!r}", style)
class URLRelative(BaseCheck):
"""Relative URLs should not be accepted by the server"""
def check(self) -> None:
url = f"/\r\n"
response = self.make_request(url)
log("Status should be 5x (PERMANENT FAILURE)")
style = "success" if response.status.startswith("5") else "failure"
log(f"{response.status!r}", style)
class URLDotEscape(BaseCheck):
"""A URL should not be able to escape the root using dot notation"""
def check(self) -> None:
url = f"gemini://{self.netloc}/../../\r\n"
response = self.make_request(url)
log("Status should be 5x (PERMANENT FAILURE)")
style = "success" if response.status.startswith("5") else "failure"
log(f"{response.status!r}", style)
CHECKS = [
IPv4Address,
IPv6Address,
TLSVersion,
TLSClaims,
TLSVerified,
TLSRequired,
ConcurrentConnections,
Homepage,
HomepageRedirect,
PageNotFound,
RequestMissingCR,
URLIncludePort,
URLByIPAddress,
URLInvalidUTF8Byte,
URLWrongPort,
URLWrongHost,
URLMaxSize,
URLAboveMaxSize,
URLSchemeHTTP,
URLSchemeHTTPS,
URLSchemeGopher,
URLSchemeMissing,
URLEmpty,
URLRelative,
URLDotEscape,
]
def build_epilog():
epilog = ["list of checks:"]
for check in CHECKS:
epilog.append(colorize(f" [{check.__name__}]", A_BOLD))
epilog.append(f" {check.__doc__}")
return "\n".join(epilog)
parser = argparse.ArgumentParser(
usage="%(prog)s host [port] [--help]",
description=__doc__,
epilog=build_epilog(),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("host", help="server hostname to connect to")
parser.add_argument(
"port",
nargs="?",
type=int,
default=1965,
help="server port to connect to (default: 1965)",
)
parser.add_argument("--checks", help="comma separated list of checks to apply")
parser.add_argument(
"--delay",
type=float,
default=2,
help="seconds to sleep between checks (default: 2)",
)
def run():
args = parser.parse_args()
if args.checks:
check_names = {cls.__name__: cls for cls in CHECKS}
check_list = []
for name in args.checks.split(","):
name = name.strip()
if name not in check_names:
raise ValueError(f"unknown check {name!r}")
check_list.append(check_names[name])
else:
check_list = CHECKS
log(f"Running gemini server diagnostics check against {args.host}:{args.port}")
log("...\n")
for check in check_list:
time.sleep(args.delay)
check(args).run()
log("Done!")
if __name__ == "__main__":
run()

View File

@ -18,11 +18,12 @@ setuptools.setup(
description="An Experimental Gemini Server",
long_description=long_description(),
long_description_content_type="text/markdown",
py_modules=["jetforce", "jetforce_client"],
py_modules=["jetforce", "jetforce_client", "jetforce_diagnostics"],
entry_points={
"console_scripts": [
"jetforce=jetforce:run_server",
"jetforce-client=jetforce_client:run_client",
"jetforce-diagnostics=jetforce_diagnostics:run",
]
},
python_requires=">=3.7",