diff --git a/jetforce_diagnostics.py b/jetforce_diagnostics.py index efa831a..31993c4 100755 --- a/jetforce_diagnostics.py +++ b/jetforce_diagnostics.py @@ -174,6 +174,22 @@ class BaseCheck: response.body = fp.read().decode(response.charset) return response + def assert_success(self, response: GeminiResponse) -> None: + """ + Helper method to check if a response was successful. + """ + log("Status should return a success code (20 SUCCESS)") + style = "success" if response.status == "20" else "failure" + log(f"Received status of {response.status!r}", style) + + def assert_permanent_failure(self, response: GeminiResponse) -> None: + """ + Helper method to assert that a response returned a permanent. + """ + log("Status should return a failure code (5X PERMANENT FAILURE)") + style = "success" if response.status.startswith("5") else "failure" + log(f"Received status of {response.status!r}", style) + class IPv4Address(BaseCheck): """Establish a connection over an IPv4 address""" @@ -196,9 +212,7 @@ class IPv6Address(BaseCheck): log(f"Looking up IPv6 address for {self.args.host!r}") addr = self.resolve_host(socket.AF_INET6) if ipaddress.ip_address(addr[0]).ipv4_mapped: - raise UserWarning( - "Found IPv4-mapped address, is your network IPv6 enabled?" - ) + raise UserWarning("Found IPv4-mapped address, skipping check") log(f"{addr[0]!r}", style="success") log(f"Attempting to connect to [{addr[0]}]:{addr[1]}") with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: @@ -208,7 +222,7 @@ class IPv6Address(BaseCheck): class TLSVersion(BaseCheck): - """Server must support TLS v1.2 or higher""" + """Server must negotiate at least TLS v1.2, ideally TLS v1.3""" def check(self) -> None: log(f"Checking client library") @@ -316,7 +330,7 @@ class TLSRequired(BaseCheck): log(f"Connection closed by server", style="success") except Exception as e: # A connection error is a valid response - log(f"{e}", style="success") + log(f"{e!r}", style="success") class ConcurrentConnections(BaseCheck): @@ -346,9 +360,7 @@ class Homepage(BaseCheck): url = f"gemini://{self.netloc}/\r\n" response = self.make_request(url) - log("Status should be 20 (SUCCESS)") - style = "success" if response.status == "20" else "failure" - log(f"{response.status!r}", style) + self.assert_success(response) log('Mime type should be "text/gemini"') style = "success" if response.mime == "text/gemini" else "failure" @@ -385,7 +397,7 @@ class HomepageRedirect(BaseCheck): url = f"gemini://{self.netloc}\r\n" response = self.make_request(url) - log("Status should be 31 (REDIRECT PERMANENT)") + log("Status should return code 31 (REDIRECT PERMANENT)") style = "success" if response.status == "31" else "failure" log(f"{response.status!r}", style) @@ -409,7 +421,7 @@ class PageNotFound(BaseCheck): url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n" response = self.make_request(url) - log("Status should be 51 (NOT FOUND)") + log("Status should return code 51 (NOT FOUND)") style = "success" if response.status == "51" else "failure" log(f"{response.status!r}", style) @@ -443,10 +455,16 @@ class URLIncludePort(BaseCheck): def check(self) -> None: url = f"gemini://{self.args.host}:{self.args.port}/\r\n" response = self.make_request(url) + self.assert_success(response) - log("Status should be 20 (SUCCESS)") - style = "success" if response.status == "20" else "failure" - log(f"{response.status!r}", style) + +class URLSchemeMissing(BaseCheck): + """A URL without a scheme should be inferred as gemini""" + + def check(self) -> None: + url = f"//{self.netloc}/\r\n" + response = self.make_request(url) + self.assert_success(response) class URLByIPAddress(BaseCheck): @@ -457,8 +475,8 @@ class URLByIPAddress(BaseCheck): url = f"gemini://{addr[0]}:{addr[1]}/\r\n" response = self.make_request(url) - log("The appropriate status depends on desired behavior of the server") - log(f"{response.status!r}", style="success") + log("Verify that the status matches your desired behavior") + log(f"{response.status!r}", style="info") class URLInvalidUTF8Byte(BaseCheck): @@ -473,7 +491,7 @@ class URLInvalidUTF8Byte(BaseCheck): except Exception: response = None - log("Connection should either drop or return a 59 BAD REQUEST") + log("Connection should either drop, or return 59 (BAD REQUEST)") if response is None: log("Connection closed without response", style="success") else: @@ -481,30 +499,6 @@ class URLInvalidUTF8Byte(BaseCheck): log(f"{response.status!r}", style) -class URLWrongPort(BaseCheck): - """A URL with an incorrect port number should be rejected""" - - def check(self) -> None: - url = f"gemini://{self.args.host}:443/\r\n" - response = self.make_request(url) - - log("Response should return a 4x or 5x status code") - style = "success" if response.status.startswith(("4", "5")) else "failure" - log(f"{response.status!r}", style) - - -class URLWrongHost(BaseCheck): - """A URL with a foreign hostname should be rejected""" - - def check(self) -> None: - url = f"gemini://wikipedia.org/\r\n" - response = self.make_request(url) - - log("Response should return a 4x or 5x status code") - style = "success" if response.status.startswith(("4", "5")) else "failure" - log(f"{response.status!r}", style) - - class URLMaxSize(BaseCheck): """Send a 1024 byte URL, the maximum allowed size""" @@ -515,7 +509,7 @@ class URLMaxSize(BaseCheck): url = base_url + buffer + "\r\n" response = self.make_request(url) - log("Status should be 51 (NOT FOUND)") + log("Status should return code 51 (NOT FOUND)") style = "success" if response.status == "51" else "failure" log(f"{response.status!r}", style) @@ -534,7 +528,7 @@ class URLAboveMaxSize(BaseCheck): except Exception: response = None - log("Connection should either drop or return a 59 BAD REQUEST") + log("Connection should either drop, or return 59 (BAD REQUEST)") if response is None: log("Connection closed without response", style="success") else: @@ -542,16 +536,31 @@ class URLAboveMaxSize(BaseCheck): log(f"{response.status!r}", style) +class URLWrongPort(BaseCheck): + """A URL with an incorrect port number should be rejected""" + + def check(self) -> None: + url = f"gemini://{self.args.host}:443/\r\n" + response = self.make_request(url) + self.assert_permanent_failure(response) + + +class URLWrongHost(BaseCheck): + """A URL with a foreign hostname should be rejected""" + + def check(self) -> None: + url = f"gemini://wikipedia.org/\r\n" + response = self.make_request(url) + self.assert_permanent_failure(response) + + class URLSchemeHTTP(BaseCheck): """Send a URL with an HTTP scheme""" def check(self) -> None: url = f"http://{self.netloc}/\r\n" response = self.make_request(url) - - log("Response should return a 4x or 5x status code") - style = "success" if response.status.startswith(("4", "5")) else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) class URLSchemeHTTPS(BaseCheck): @@ -560,10 +569,7 @@ class URLSchemeHTTPS(BaseCheck): def check(self) -> None: url = f"https://{self.netloc}/\r\n" response = self.make_request(url) - - log("Response should return a 4x or 5x status code") - style = "success" if response.status.startswith(("4", "5")) else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) class URLSchemeGopher(BaseCheck): @@ -572,22 +578,7 @@ class URLSchemeGopher(BaseCheck): def check(self) -> None: url = f"gopher://{self.netloc}/\r\n" response = self.make_request(url) - - log("Response should return a 4x or 5x status code") - style = "success" if response.status.startswith(("4", "5")) else "failure" - log(f"{response.status!r}", style) - - -class URLSchemeMissing(BaseCheck): - """A URL without a scheme should be inferred as gemini""" - - def check(self) -> None: - url = f"//{self.netloc}/\r\n" - response = self.make_request(url) - - log("Status should be 20 (SUCCESS)") - style = "success" if response.status == "20" else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) class URLEmpty(BaseCheck): @@ -596,10 +587,7 @@ class URLEmpty(BaseCheck): def check(self) -> None: url = f"\r\n" response = self.make_request(url) - - log("Status should be 5x (PERMANENT FAILURE)") - style = "success" if response.status.startswith("5") else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) class URLRelative(BaseCheck): @@ -608,10 +596,7 @@ class URLRelative(BaseCheck): def check(self) -> None: url = f"/\r\n" response = self.make_request(url) - - log("Status should be 5x (PERMANENT FAILURE)") - style = "success" if response.status.startswith("5") else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) class URLDotEscape(BaseCheck): @@ -620,10 +605,7 @@ class URLDotEscape(BaseCheck): def check(self) -> None: url = f"gemini://{self.netloc}/../../\r\n" response = self.make_request(url) - - log("Status should be 5x (PERMANENT FAILURE)") - style = "success" if response.status.startswith("5") else "failure" - log(f"{response.status!r}", style) + self.assert_permanent_failure(response) # TODO: Test sending a transient client certificate @@ -641,16 +623,16 @@ CHECKS = [ PageNotFound, RequestMissingCR, URLIncludePort, + URLSchemeMissing, URLByIPAddress, URLInvalidUTF8Byte, - URLWrongPort, - URLWrongHost, URLMaxSize, URLAboveMaxSize, + URLWrongPort, + URLWrongHost, URLSchemeHTTP, URLSchemeHTTPS, URLSchemeGopher, - URLSchemeMissing, URLEmpty, URLRelative, URLDotEscape,