script cleanup
This commit is contained in:
parent
e43587493e
commit
25dbeb4c64
|
@ -174,6 +174,22 @@ class BaseCheck:
|
||||||
response.body = fp.read().decode(response.charset)
|
response.body = fp.read().decode(response.charset)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def assert_success(self, response: GeminiResponse) -> None:
|
||||||
|
"""
|
||||||
|
Helper method to check if a response was successful.
|
||||||
|
"""
|
||||||
|
log("Status should return a success code (20 SUCCESS)")
|
||||||
|
style = "success" if response.status == "20" else "failure"
|
||||||
|
log(f"Received status of {response.status!r}", style)
|
||||||
|
|
||||||
|
def assert_permanent_failure(self, response: GeminiResponse) -> None:
|
||||||
|
"""
|
||||||
|
Helper method to assert that a response returned a permanent.
|
||||||
|
"""
|
||||||
|
log("Status should return a failure code (5X PERMANENT FAILURE)")
|
||||||
|
style = "success" if response.status.startswith("5") else "failure"
|
||||||
|
log(f"Received status of {response.status!r}", style)
|
||||||
|
|
||||||
|
|
||||||
class IPv4Address(BaseCheck):
|
class IPv4Address(BaseCheck):
|
||||||
"""Establish a connection over an IPv4 address"""
|
"""Establish a connection over an IPv4 address"""
|
||||||
|
@ -196,9 +212,7 @@ class IPv6Address(BaseCheck):
|
||||||
log(f"Looking up IPv6 address for {self.args.host!r}")
|
log(f"Looking up IPv6 address for {self.args.host!r}")
|
||||||
addr = self.resolve_host(socket.AF_INET6)
|
addr = self.resolve_host(socket.AF_INET6)
|
||||||
if ipaddress.ip_address(addr[0]).ipv4_mapped:
|
if ipaddress.ip_address(addr[0]).ipv4_mapped:
|
||||||
raise UserWarning(
|
raise UserWarning("Found IPv4-mapped address, skipping check")
|
||||||
"Found IPv4-mapped address, is your network IPv6 enabled?"
|
|
||||||
)
|
|
||||||
log(f"{addr[0]!r}", style="success")
|
log(f"{addr[0]!r}", style="success")
|
||||||
log(f"Attempting to connect to [{addr[0]}]:{addr[1]}")
|
log(f"Attempting to connect to [{addr[0]}]:{addr[1]}")
|
||||||
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
|
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
|
||||||
|
@ -208,7 +222,7 @@ class IPv6Address(BaseCheck):
|
||||||
|
|
||||||
|
|
||||||
class TLSVersion(BaseCheck):
|
class TLSVersion(BaseCheck):
|
||||||
"""Server must support TLS v1.2 or higher"""
|
"""Server must negotiate at least TLS v1.2, ideally TLS v1.3"""
|
||||||
|
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
log(f"Checking client library")
|
log(f"Checking client library")
|
||||||
|
@ -316,7 +330,7 @@ class TLSRequired(BaseCheck):
|
||||||
log(f"Connection closed by server", style="success")
|
log(f"Connection closed by server", style="success")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# A connection error is a valid response
|
# A connection error is a valid response
|
||||||
log(f"{e}", style="success")
|
log(f"{e!r}", style="success")
|
||||||
|
|
||||||
|
|
||||||
class ConcurrentConnections(BaseCheck):
|
class ConcurrentConnections(BaseCheck):
|
||||||
|
@ -346,9 +360,7 @@ class Homepage(BaseCheck):
|
||||||
url = f"gemini://{self.netloc}/\r\n"
|
url = f"gemini://{self.netloc}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
|
||||||
log("Status should be 20 (SUCCESS)")
|
self.assert_success(response)
|
||||||
style = "success" if response.status == "20" else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
log('Mime type should be "text/gemini"')
|
log('Mime type should be "text/gemini"')
|
||||||
style = "success" if response.mime == "text/gemini" else "failure"
|
style = "success" if response.mime == "text/gemini" else "failure"
|
||||||
|
@ -385,7 +397,7 @@ class HomepageRedirect(BaseCheck):
|
||||||
url = f"gemini://{self.netloc}\r\n"
|
url = f"gemini://{self.netloc}\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
|
||||||
log("Status should be 31 (REDIRECT PERMANENT)")
|
log("Status should return code 31 (REDIRECT PERMANENT)")
|
||||||
style = "success" if response.status == "31" else "failure"
|
style = "success" if response.status == "31" else "failure"
|
||||||
log(f"{response.status!r}", style)
|
log(f"{response.status!r}", style)
|
||||||
|
|
||||||
|
@ -409,7 +421,7 @@ class PageNotFound(BaseCheck):
|
||||||
url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n"
|
url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
|
||||||
log("Status should be 51 (NOT FOUND)")
|
log("Status should return code 51 (NOT FOUND)")
|
||||||
style = "success" if response.status == "51" else "failure"
|
style = "success" if response.status == "51" else "failure"
|
||||||
log(f"{response.status!r}", style)
|
log(f"{response.status!r}", style)
|
||||||
|
|
||||||
|
@ -443,10 +455,16 @@ class URLIncludePort(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"gemini://{self.args.host}:{self.args.port}/\r\n"
|
url = f"gemini://{self.args.host}:{self.args.port}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_success(response)
|
||||||
|
|
||||||
log("Status should be 20 (SUCCESS)")
|
|
||||||
style = "success" if response.status == "20" else "failure"
|
class URLSchemeMissing(BaseCheck):
|
||||||
log(f"{response.status!r}", style)
|
"""A URL without a scheme should be inferred as gemini"""
|
||||||
|
|
||||||
|
def check(self) -> None:
|
||||||
|
url = f"//{self.netloc}/\r\n"
|
||||||
|
response = self.make_request(url)
|
||||||
|
self.assert_success(response)
|
||||||
|
|
||||||
|
|
||||||
class URLByIPAddress(BaseCheck):
|
class URLByIPAddress(BaseCheck):
|
||||||
|
@ -457,8 +475,8 @@ class URLByIPAddress(BaseCheck):
|
||||||
url = f"gemini://{addr[0]}:{addr[1]}/\r\n"
|
url = f"gemini://{addr[0]}:{addr[1]}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
|
||||||
log("The appropriate status depends on desired behavior of the server")
|
log("Verify that the status matches your desired behavior")
|
||||||
log(f"{response.status!r}", style="success")
|
log(f"{response.status!r}", style="info")
|
||||||
|
|
||||||
|
|
||||||
class URLInvalidUTF8Byte(BaseCheck):
|
class URLInvalidUTF8Byte(BaseCheck):
|
||||||
|
@ -473,7 +491,7 @@ class URLInvalidUTF8Byte(BaseCheck):
|
||||||
except Exception:
|
except Exception:
|
||||||
response = None
|
response = None
|
||||||
|
|
||||||
log("Connection should either drop or return a 59 BAD REQUEST")
|
log("Connection should either drop, or return 59 (BAD REQUEST)")
|
||||||
if response is None:
|
if response is None:
|
||||||
log("Connection closed without response", style="success")
|
log("Connection closed without response", style="success")
|
||||||
else:
|
else:
|
||||||
|
@ -481,30 +499,6 @@ class URLInvalidUTF8Byte(BaseCheck):
|
||||||
log(f"{response.status!r}", style)
|
log(f"{response.status!r}", style)
|
||||||
|
|
||||||
|
|
||||||
class URLWrongPort(BaseCheck):
|
|
||||||
"""A URL with an incorrect port number should be rejected"""
|
|
||||||
|
|
||||||
def check(self) -> None:
|
|
||||||
url = f"gemini://{self.args.host}:443/\r\n"
|
|
||||||
response = self.make_request(url)
|
|
||||||
|
|
||||||
log("Response should return a 4x or 5x status code")
|
|
||||||
style = "success" if response.status.startswith(("4", "5")) else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLWrongHost(BaseCheck):
|
|
||||||
"""A URL with a foreign hostname should be rejected"""
|
|
||||||
|
|
||||||
def check(self) -> None:
|
|
||||||
url = f"gemini://wikipedia.org/\r\n"
|
|
||||||
response = self.make_request(url)
|
|
||||||
|
|
||||||
log("Response should return a 4x or 5x status code")
|
|
||||||
style = "success" if response.status.startswith(("4", "5")) else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLMaxSize(BaseCheck):
|
class URLMaxSize(BaseCheck):
|
||||||
"""Send a 1024 byte URL, the maximum allowed size"""
|
"""Send a 1024 byte URL, the maximum allowed size"""
|
||||||
|
|
||||||
|
@ -515,7 +509,7 @@ class URLMaxSize(BaseCheck):
|
||||||
url = base_url + buffer + "\r\n"
|
url = base_url + buffer + "\r\n"
|
||||||
|
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
log("Status should be 51 (NOT FOUND)")
|
log("Status should return code 51 (NOT FOUND)")
|
||||||
style = "success" if response.status == "51" else "failure"
|
style = "success" if response.status == "51" else "failure"
|
||||||
log(f"{response.status!r}", style)
|
log(f"{response.status!r}", style)
|
||||||
|
|
||||||
|
@ -534,7 +528,7 @@ class URLAboveMaxSize(BaseCheck):
|
||||||
except Exception:
|
except Exception:
|
||||||
response = None
|
response = None
|
||||||
|
|
||||||
log("Connection should either drop or return a 59 BAD REQUEST")
|
log("Connection should either drop, or return 59 (BAD REQUEST)")
|
||||||
if response is None:
|
if response is None:
|
||||||
log("Connection closed without response", style="success")
|
log("Connection closed without response", style="success")
|
||||||
else:
|
else:
|
||||||
|
@ -542,16 +536,31 @@ class URLAboveMaxSize(BaseCheck):
|
||||||
log(f"{response.status!r}", style)
|
log(f"{response.status!r}", style)
|
||||||
|
|
||||||
|
|
||||||
|
class URLWrongPort(BaseCheck):
|
||||||
|
"""A URL with an incorrect port number should be rejected"""
|
||||||
|
|
||||||
|
def check(self) -> None:
|
||||||
|
url = f"gemini://{self.args.host}:443/\r\n"
|
||||||
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
|
|
||||||
|
|
||||||
|
class URLWrongHost(BaseCheck):
|
||||||
|
"""A URL with a foreign hostname should be rejected"""
|
||||||
|
|
||||||
|
def check(self) -> None:
|
||||||
|
url = f"gemini://wikipedia.org/\r\n"
|
||||||
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
|
|
||||||
|
|
||||||
class URLSchemeHTTP(BaseCheck):
|
class URLSchemeHTTP(BaseCheck):
|
||||||
"""Send a URL with an HTTP scheme"""
|
"""Send a URL with an HTTP scheme"""
|
||||||
|
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"http://{self.netloc}/\r\n"
|
url = f"http://{self.netloc}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Response should return a 4x or 5x status code")
|
|
||||||
style = "success" if response.status.startswith(("4", "5")) else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLSchemeHTTPS(BaseCheck):
|
class URLSchemeHTTPS(BaseCheck):
|
||||||
|
@ -560,10 +569,7 @@ class URLSchemeHTTPS(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"https://{self.netloc}/\r\n"
|
url = f"https://{self.netloc}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Response should return a 4x or 5x status code")
|
|
||||||
style = "success" if response.status.startswith(("4", "5")) else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLSchemeGopher(BaseCheck):
|
class URLSchemeGopher(BaseCheck):
|
||||||
|
@ -572,22 +578,7 @@ class URLSchemeGopher(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"gopher://{self.netloc}/\r\n"
|
url = f"gopher://{self.netloc}/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Response should return a 4x or 5x status code")
|
|
||||||
style = "success" if response.status.startswith(("4", "5")) else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLSchemeMissing(BaseCheck):
|
|
||||||
"""A URL without a scheme should be inferred as gemini"""
|
|
||||||
|
|
||||||
def check(self) -> None:
|
|
||||||
url = f"//{self.netloc}/\r\n"
|
|
||||||
response = self.make_request(url)
|
|
||||||
|
|
||||||
log("Status should be 20 (SUCCESS)")
|
|
||||||
style = "success" if response.status == "20" else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLEmpty(BaseCheck):
|
class URLEmpty(BaseCheck):
|
||||||
|
@ -596,10 +587,7 @@ class URLEmpty(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"\r\n"
|
url = f"\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Status should be 5x (PERMANENT FAILURE)")
|
|
||||||
style = "success" if response.status.startswith("5") else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLRelative(BaseCheck):
|
class URLRelative(BaseCheck):
|
||||||
|
@ -608,10 +596,7 @@ class URLRelative(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"/\r\n"
|
url = f"/\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Status should be 5x (PERMANENT FAILURE)")
|
|
||||||
style = "success" if response.status.startswith("5") else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
class URLDotEscape(BaseCheck):
|
class URLDotEscape(BaseCheck):
|
||||||
|
@ -620,10 +605,7 @@ class URLDotEscape(BaseCheck):
|
||||||
def check(self) -> None:
|
def check(self) -> None:
|
||||||
url = f"gemini://{self.netloc}/../../\r\n"
|
url = f"gemini://{self.netloc}/../../\r\n"
|
||||||
response = self.make_request(url)
|
response = self.make_request(url)
|
||||||
|
self.assert_permanent_failure(response)
|
||||||
log("Status should be 5x (PERMANENT FAILURE)")
|
|
||||||
style = "success" if response.status.startswith("5") else "failure"
|
|
||||||
log(f"{response.status!r}", style)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: Test sending a transient client certificate
|
# TODO: Test sending a transient client certificate
|
||||||
|
@ -641,16 +623,16 @@ CHECKS = [
|
||||||
PageNotFound,
|
PageNotFound,
|
||||||
RequestMissingCR,
|
RequestMissingCR,
|
||||||
URLIncludePort,
|
URLIncludePort,
|
||||||
|
URLSchemeMissing,
|
||||||
URLByIPAddress,
|
URLByIPAddress,
|
||||||
URLInvalidUTF8Byte,
|
URLInvalidUTF8Byte,
|
||||||
URLWrongPort,
|
|
||||||
URLWrongHost,
|
|
||||||
URLMaxSize,
|
URLMaxSize,
|
||||||
URLAboveMaxSize,
|
URLAboveMaxSize,
|
||||||
|
URLWrongPort,
|
||||||
|
URLWrongHost,
|
||||||
URLSchemeHTTP,
|
URLSchemeHTTP,
|
||||||
URLSchemeHTTPS,
|
URLSchemeHTTPS,
|
||||||
URLSchemeGopher,
|
URLSchemeGopher,
|
||||||
URLSchemeMissing,
|
|
||||||
URLEmpty,
|
URLEmpty,
|
||||||
URLRelative,
|
URLRelative,
|
||||||
URLDotEscape,
|
URLDotEscape,
|
||||||
|
|
Loading…
Reference in New Issue