From 4160aefe94c6cb831974ba34d93761d2266aa783 Mon Sep 17 00:00:00 2001 From: Michael Lazar Date: Thu, 22 Aug 2019 18:53:02 -0400 Subject: [PATCH] More significant refactoring --- examples/server_echo.py | 8 +- examples/server_guestbook.py | 95 +++++-------- examples/server_http_proxy.py | 41 +++--- jetforce.py | 243 +++++++++++++++++++++++----------- 4 files changed, 215 insertions(+), 172 deletions(-) diff --git a/examples/server_echo.py b/examples/server_echo.py index b2e730a..f13ccf3 100644 --- a/examples/server_echo.py +++ b/examples/server_echo.py @@ -2,20 +2,18 @@ A simple Gemini server that echos back the request to the client. """ import asyncio -import typing import jetforce -def echo(environ: dict, send_status: typing.Callable) -> typing.Iterator[bytes]: +def echo(environ, send_status): url = environ["URL"] - send_status(jetforce.STATUS_SUCCESS, "text/gemini") + send_status(jetforce.Status.SUCCESS, "text/gemini") yield f"Received path: {url}".encode() if __name__ == "__main__": - parser = jetforce.build_argument_parser() - args = parser.parse_args() + args = jetforce.command_line_parser().parse_args() server = jetforce.GeminiServer( host=args.host, port=args.port, diff --git a/examples/server_guestbook.py b/examples/server_guestbook.py index b599d06..c55e058 100644 --- a/examples/server_guestbook.py +++ b/examples/server_guestbook.py @@ -1,83 +1,52 @@ """ -A guestbook application that accepts user messages using the INPUT response type -and stores messages in a simple SQLite database file. +A guestbook application that accepts input from guests and stores messages in +a simple text file. """ import asyncio -import sqlite3 -import typing -import urllib.parse +import pathlib from datetime import datetime import jetforce +from jetforce import Response, Status + +guestbook = pathlib.Path("guestbook.txt") -class GuestbookApplication(jetforce.BaseApplication): +app = jetforce.JetforceApplication() - db_file = "guestbook.sql" - def connect_db(self) -> typing.Tuple[sqlite3.Connection, sqlite3.Cursor]: - db = sqlite3.connect(self.db_file, detect_types=sqlite3.PARSE_DECLTYPES) - cursor = db.cursor() - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS guestbook ( - id int PRIMARY KEY, - message text, - created timestamp, - ip_address text); - """ - ) - db.commit() - return db, cursor +@app.route("$") +def index(request): + data = ["Guestbook", "=>/submit Leave a Message", ""] - def __call__( - self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: - url = environ["URL"] - url_parts = urllib.parse.urlparse(url) - - error_message = self.block_proxy_requests(url, environ["HOSTNAME"]) - if error_message: - return send_status(jetforce.STATUS_PROXY_REQUEST_REFUSED, error_message) - - if url_parts.path in ("", "/"): - send_status(jetforce.STATUS_SUCCESS, "text/gemini") - yield from self.list_messages() - elif url_parts.path == "/submit": - if url_parts.query: - self.save_message(url_parts.query, environ["REMOTE_ADDR"]) - return send_status(jetforce.STATUS_REDIRECT_TEMPORARY, "/") + guestbook.touch(exist_ok=True) + with guestbook.open("r") as fp: + for line in fp: + line = line.strip() + if line.startswith("=>"): + # Protect guests from writing messages that contain links + data.append(line[2:]) else: - return send_status( - jetforce.STATUS_INPUT, "Enter your message (max 256 characters)" - ) - else: - return send_status(jetforce.STATUS_NOT_FOUND, "Invalid address") + data.append(line) - def save_message(self, message: str, ip_address: str) -> None: - message = message[:256] + data.extend(["", "...", ""]) + return Response(Status.SUCCESS, "text/gemini", "\n".join(data)) + + +@app.route("/submit") +def submit(request): + if request.query: + message = request.query[:256] created = datetime.utcnow() - - db, cursor = self.connect_db() - sql = "INSERT INTO guestbook(message, created, ip_address) VALUES (?, ?, ?)" - cursor.execute(sql, (message, created, ip_address)) - db.commit() - - def list_messages(self) -> typing.Iterator[bytes]: - yield "Guestbook\n=>/submit Leave a Message\n".encode() - - db, cursor = self.connect_db() - cursor.execute("SELECT created, message FROM guestbook ORDER BY created DESC") - for row in cursor.fetchall(): - yield f"\n[{row[0]:%Y-%m-%d %I:%M %p}]\n{row[1]}\n".encode() - - yield "\n...\n".encode() + with guestbook.open("a") as fp: + fp.write(f"\n[{created:%Y-%m-%d %I:%M %p}]\n{message}\n") + return Response(Status.REDIRECT_TEMPORARY, "") + else: + return Response(Status.INPUT, "Enter your message (max 256 characters)") if __name__ == "__main__": - parser = jetforce.build_argument_parser() - args = parser.parse_args() - app = GuestbookApplication() + args = jetforce.command_line_parser().parse_args() server = jetforce.GeminiServer( host=args.host, port=args.port, diff --git a/examples/server_http_proxy.py b/examples/server_http_proxy.py index e7b2bb9..836919d 100644 --- a/examples/server_http_proxy.py +++ b/examples/server_http_proxy.py @@ -5,39 +5,28 @@ locally using the `w3m` tool, and render the output to the client as plain text. """ import asyncio import subprocess -import typing -import urllib.parse import jetforce +from jetforce import Response, Status + +app = jetforce.JetforceApplication() -class HTTPProxyApplication(jetforce.BaseApplication): - - command = [b"w3m", b"-dump"] - - def __call__( - self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: - url = environ["URL"] - url_parts = urllib.parse.urlparse(url) - if url_parts.scheme not in ("http", "https"): - return send_status(jetforce.STATUS_NOT_FOUND, "Invalid Resource") - - try: - command = self.command + [url.encode()] - out = subprocess.run(command, stdout=subprocess.PIPE) - out.check_returncode() - except Exception: - send_status(jetforce.STATUS_CGI_ERROR, "Failed to load URL") - else: - send_status(jetforce.STATUS_SUCCESS, "text/plain") - yield out.stdout +@app.route(scheme="https", strict_hostname=False) +@app.route(scheme="http", strict_hostname=False) +def proxy_request(request): + command = [b"w3m", b"-dump", request.url.encode()] + try: + out = subprocess.run(command, stdout=subprocess.PIPE) + out.check_returncode() + except Exception: + return Response(Status.CGI_ERROR, "Failed to load URL") + else: + return Response(Status.SUCCESS, "text/plain", out.stdout) if __name__ == "__main__": - parser = jetforce.build_argument_parser() - args = parser.parse_args() - app = HTTPProxyApplication() + args = jetforce.command_line_parser().parse_args() server = jetforce.GeminiServer( host=args.host, port=args.port, diff --git a/jetforce.py b/jetforce.py index 9900680..911ad8b 100755 --- a/jetforce.py +++ b/jetforce.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3.7 import argparse import asyncio +import dataclasses import datetime import mimetypes import os import pathlib +import re import ssl import subprocess import sys @@ -12,7 +14,7 @@ import tempfile import typing import urllib.parse -# Fail early to avoid crashing with an obscure error +# Fail early to avoid crashing with some obscure error if sys.version_info < (3, 7): sys.exit("Fatal Error: jetforce requires Python 3.7+") @@ -39,64 +41,155 @@ If the TLS cert/keyfile is not provided, a self-signed certificate will automatically be generated and saved to your temporary file directory. """ -STATUS_INPUT = 10 -STATUS_SUCCESS = 20 -STATUS_SUCCESS_END_OF_SESSION = 21 - -STATUS_REDIRECT_TEMPORARY = 30 -STATUS_REDIRECT_PERMANENT = 31 -STATUS_TEMPORARY_FAILURE = 40 -STATUS_SERVER_UNAVAILABLE = 41 -STATUS_CGI_ERROR = 42 -STATUS_PROXY_ERROR = 43 -STATUS_SLOW_DOWN = 44 - -STATUS_PERMANENT_FAILURE = 50 -STATUS_NOT_FOUND = 51 -STATUS_GONE = 52 -STATUS_PROXY_REQUEST_REFUSED = 53 -STATUS_BAD_REQUEST = 59 - -STATUS_CLIENT_CERTIFICATE_REQUIRED = 60 -STATUS_TRANSIENT_CERTIFICATE_REQUESTED = 61 -STATUS_AUTHORISED_CERTIFICATE_REQUIRED = 62 -STATUS_CERTIFICATE_NOT_ACCEPTED = 63 -STATUS_FUTURE_CERTIFICATE_REJECTED = 64 -STATUS_EXPIRED_CERTIFICATE_REJECTED = 65 - - -class BaseApplication: +class Status: """ - Base Jetforce application class, analogous to a WSGI app. - - At a minimum you must implement the ``__call__()`` method, which will - perform the following tasks: - - 1. Send the response header by calling ``send_status()``. - 2. Optionally, yield the response body as bytes. + Gemini response status codes. """ - def __call__( - self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: - raise NotImplemented + INPUT = 10 - @staticmethod - def block_proxy_requests(url: str, hostname: str) -> typing.Optional[str]: + SUCCESS = 20 + SUCCESS_END_OF_SESSION = 21 + + REDIRECT_TEMPORARY = 30 + REDIRECT_PERMANENT = 31 + + TEMPORARY_FAILURE = 40 + SERVER_UNAVAILABLE = 41 + CGI_ERROR = 42 + PROXY_ERROR = 43 + SLOW_DOWN = 44 + + PERMANENT_FAILURE = 50 + NOT_FOUND = 51 + GONE = 52 + PROXY_REQUEST_REFUSED = 53 + BAD_REQUEST = 59 + + CLIENT_CERTIFICATE_REQUIRED = 60 + TRANSIENT_CERTIFICATE_REQUESTED = 61 + AUTHORISED_CERTIFICATE_REQUIRED = 62 + CERTIFICATE_NOT_ACCEPTED = 63 + FUTURE_CERTIFICATE_REJECTED = 64 + EXPIRED_CERTIFICATE_REJECTED = 65 + + +class Request: + """ + Object that encapsulates information about a single gemini request. + """ + + def __init__(self, environ: dict): + self.environ = environ + + url = urllib.parse.urlparse(environ["URL"]) + self.scheme = url.scheme + self.hostname = url.hostname + self.port = url.port + self.path = url.path + self.params = url.params + self.query = url.query + self.fragment = url.fragment + self.url = url.geturl() + + +@dataclasses.dataclass +class Response: + """ + Object that encapsulates information about a single gemini response. + """ + + status: int + meta: str + body: typing.Union[None, bytes, str, typing.Iterator[bytes]] = None + + +@dataclasses.dataclass +class RoutePattern: + """ + A pattern for matching URLs with a single endpoint or route. + """ + + path: str = "" + scheme: str = "gemini" + strict_hostname: bool = True + strict_trailing_slash: bool = False + + def match(self, request: Request) -> bool: """ - Optional method that may be used to restrict request URLs that do not - match your current host and the gemini:// scheme. This may be used if - you don't want to worry about proxying traffic to other servers. + Check if the given request URL matches this route pattern. """ - url_parts = urllib.parse.urlparse(url) - if url_parts.scheme and url_parts.scheme != "gemini": - return 'URL scheme must be "gemini://"' - if url_parts.hostname and url_parts.hostname != hostname: - return f'URL hostname must be "{hostname}"' + server_hostname = request.environ["HOSTNAME"] + + if self.strict_hostname and request.hostname != server_hostname: + return False + if self.scheme and self.scheme != request.scheme: + return False + + if self.strict_trailing_slash: + request_path = request.path + else: + request_path = request.path.rstrip("/") + + return bool(re.match(self.path, request_path)) -class StaticDirectoryApplication(BaseApplication): +class JetforceApplication: + """ + Base Jetforce application class with primitive URL routing. + """ + + def __init__(self): + self.routes: typing.List[ + typing.Tuple[RoutePattern, typing.Callable[[Request], Response]] + ] = [] + + def __call__(self, environ: dict, send_status: typing.Callable): + request = Request(environ) + for route_pattern, callback in self.routes[::-1]: + if route_pattern.match(request): + response = callback(request) + send_status(response.status, response.meta) + if response.body: + if isinstance(response.body, bytes): + yield response.body + elif isinstance(response.body, str): + yield response.body.encode() + else: + yield from response.body + break + else: + send_status(Status.NOT_FOUND, "Not Found") + + def route( + self, + path: str = "", + scheme: str = "gemini", + strict_hostname: bool = True, + strict_trailing_slash: bool = False, + ) -> typing.Callable: + """ + Decorator for binding a function to a route based on the URL path. + + app = JetforceApplication() + + @app.route('/my-path') + def my_path(request): + return Response(Status.SUCCESS, 'text/plain', 'Hello world!') + """ + route_pattern = RoutePattern( + path, scheme, strict_hostname, strict_trailing_slash + ) + + def wrap(func: typing.Callable) -> typing.Callable: + self.routes.append((route_pattern, func)) + return func + + return wrap + + +class StaticDirectoryApplication(JetforceApplication): """ Serve a static directory over Gemini. @@ -105,44 +198,38 @@ class StaticDirectoryApplication(BaseApplication): directory listing will be auto-generated. """ - def __init__(self, directory="/var/gemini"): + def __init__(self, directory: str = "/var/gemini"): + super().__init__() + self.routes.append((RoutePattern(), self.serve_static_file)) + self.root = pathlib.Path(directory).resolve(strict=True) self.mimetypes = mimetypes.MimeTypes() + self.mimetypes.add_type("text/gemini", ".gmi") - def __call__( - self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: - url = environ["URL"] - url_parts = urllib.parse.urlparse(url) - url_path = pathlib.Path(url_parts.path.strip("/")) + def serve_static_file(self, request: Request): - error_message = self.block_proxy_requests(url, environ["HOSTNAME"]) - if error_message: - return send_status(STATUS_PROXY_REQUEST_REFUSED, error_message) + url_path = pathlib.Path(request.path.strip("/")) filename = pathlib.Path(os.path.normpath(str(url_path))) if filename.is_absolute() or str(filename.name).startswith(".."): # Guard against breaking out of the directory - return send_status(STATUS_NOT_FOUND, "Not Found") + return Response(Status.NOT_FOUND, "Not Found") filesystem_path = self.root / filename if filesystem_path.is_file(): mimetype = self.guess_mimetype(filesystem_path.name) - send_status(STATUS_SUCCESS, mimetype) - yield from self.load_file(filesystem_path) - + generator = self.load_file(filesystem_path) + return Response(Status.SUCCESS, mimetype, generator) elif filesystem_path.is_dir(): gemini_file = filesystem_path / ".gemini" if gemini_file.exists(): - send_status(STATUS_SUCCESS, "text/gemini") - yield from self.load_file(gemini_file) + generator = self.load_file(gemini_file) else: - send_status(STATUS_SUCCESS, "text/gemini") - yield from self.list_directory(url_path, filesystem_path) - + generator = self.list_directory(url_path, filesystem_path) + return Response(Status.SUCCESS, "text/gemini", generator) else: - return send_status(STATUS_NOT_FOUND, "Not Found") + return Response(Status.NOT_FOUND, "Not Found") def load_file(self, filesystem_path: pathlib.Path): with filesystem_path.open("rb") as fp: @@ -152,6 +239,9 @@ class StaticDirectoryApplication(BaseApplication): data = fp.read(1024) def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path): + """ + Auto-generate a text/gemini document based on the contents of the file system. + """ yield f"Directory: /{url_path}\r\n".encode() if url_path.parent != url_path: yield f"=>/{url_path.parent}\t..\r\n".encode() @@ -169,10 +259,8 @@ class StaticDirectoryApplication(BaseApplication): mime, encoding = self.mimetypes.guess_type(filename) if encoding: return f"{mime}; charset={encoding}" - elif mime: - return mime else: - return "text/plain" + return mime or "text/plain" class GeminiRequestHandler: @@ -218,7 +306,7 @@ class GeminiRequestHandler: await self.parse_header() except Exception: # Malformed request, throw it away and exit immediately - self.write_status(STATUS_BAD_REQUEST, "Could not understand request line") + self.write_status(Status.BAD_REQUEST, "Could not understand request line") return await self.close_connection() try: @@ -227,7 +315,7 @@ class GeminiRequestHandler: for data in app: await self.write_body(data) except Exception as e: - self.write_status(STATUS_CGI_ERROR, str(e)) + self.write_status(Status.CGI_ERROR, str(e)) raise finally: await self.close_connection() @@ -272,7 +360,6 @@ class GeminiRequestHandler: that must be flushed. This is done so that the status can be updated as long as no other data has been written to the stream yet. """ - # TODO: enforce restriction on response meta <= 1024 bytes self.status = status self.meta = meta self.response_buffer = f"{status}\t{meta}\r\n" @@ -404,7 +491,7 @@ class GeminiServer: return str(certfile), str(keyfile) -def build_argument_parser() -> argparse.ArgumentParser: +def command_line_parser() -> argparse.ArgumentParser: """ Construct the default argument parser when launching the server from the command line. @@ -431,7 +518,7 @@ def run_server() -> None: """ Entry point for running the command line static directory server. """ - parser = build_argument_parser() + parser = command_line_parser() parser.add_argument("--dir", help="local directory to serve", default="/var/gemini") args = parser.parse_args()