diff --git a/examples/server_echo.py b/examples/server_echo.py new file mode 100644 index 0000000..b2e730a --- /dev/null +++ b/examples/server_echo.py @@ -0,0 +1,27 @@ +""" +A simple Gemini server that echos back the request to the client. +""" +import asyncio +import typing + +import jetforce + + +def echo(environ: dict, send_status: typing.Callable) -> typing.Iterator[bytes]: + url = environ["URL"] + send_status(jetforce.STATUS_SUCCESS, "text/gemini") + yield f"Received path: {url}".encode() + + +if __name__ == "__main__": + parser = jetforce.build_argument_parser() + args = parser.parse_args() + server = jetforce.GeminiServer( + host=args.host, + port=args.port, + certfile=args.certfile, + keyfile=args.keyfile, + hostname=args.hostname, + app=echo, + ) + asyncio.run(server.run()) diff --git a/examples/server_guestbook.py b/examples/server_guestbook.py new file mode 100644 index 0000000..b599d06 --- /dev/null +++ b/examples/server_guestbook.py @@ -0,0 +1,89 @@ +""" +A guestbook application that accepts user messages using the INPUT response type +and stores messages in a simple SQLite database file. +""" +import asyncio +import sqlite3 +import typing +import urllib.parse +from datetime import datetime + +import jetforce + + +class GuestbookApplication(jetforce.BaseApplication): + + db_file = "guestbook.sql" + + def connect_db(self) -> typing.Tuple[sqlite3.Connection, sqlite3.Cursor]: + db = sqlite3.connect(self.db_file, detect_types=sqlite3.PARSE_DECLTYPES) + cursor = db.cursor() + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS guestbook ( + id int PRIMARY KEY, + message text, + created timestamp, + ip_address text); + """ + ) + db.commit() + return db, cursor + + def __call__( + self, environ: dict, send_status: typing.Callable + ) -> typing.Iterator[bytes]: + url = environ["URL"] + url_parts = urllib.parse.urlparse(url) + + error_message = self.block_proxy_requests(url, environ["HOSTNAME"]) + if error_message: + return send_status(jetforce.STATUS_PROXY_REQUEST_REFUSED, error_message) + + if url_parts.path in ("", "/"): + send_status(jetforce.STATUS_SUCCESS, "text/gemini") + yield from self.list_messages() + elif url_parts.path == "/submit": + if url_parts.query: + self.save_message(url_parts.query, environ["REMOTE_ADDR"]) + return send_status(jetforce.STATUS_REDIRECT_TEMPORARY, "/") + else: + return send_status( + jetforce.STATUS_INPUT, "Enter your message (max 256 characters)" + ) + else: + return send_status(jetforce.STATUS_NOT_FOUND, "Invalid address") + + def save_message(self, message: str, ip_address: str) -> None: + message = message[:256] + created = datetime.utcnow() + + db, cursor = self.connect_db() + sql = "INSERT INTO guestbook(message, created, ip_address) VALUES (?, ?, ?)" + cursor.execute(sql, (message, created, ip_address)) + db.commit() + + def list_messages(self) -> typing.Iterator[bytes]: + yield "Guestbook\n=>/submit Leave a Message\n".encode() + + db, cursor = self.connect_db() + cursor.execute("SELECT created, message FROM guestbook ORDER BY created DESC") + for row in cursor.fetchall(): + yield f"\n[{row[0]:%Y-%m-%d %I:%M %p}]\n{row[1]}\n".encode() + + yield "\n...\n".encode() + + +if __name__ == "__main__": + parser = jetforce.build_argument_parser() + args = parser.parse_args() + app = GuestbookApplication() + server = jetforce.GeminiServer( + host=args.host, + port=args.port, + certfile=args.certfile, + keyfile=args.keyfile, + hostname=args.hostname, + app=app, + ) + asyncio.run(server.run()) diff --git a/examples/server_http_proxy.py b/examples/server_http_proxy.py new file mode 100644 index 0000000..e7b2bb9 --- /dev/null +++ b/examples/server_http_proxy.py @@ -0,0 +1,49 @@ +""" +This is an example of setting up a Gemini server to proxy requests to other +protocols. This application will accept HTTP URLs, download and render them +locally using the `w3m` tool, and render the output to the client as plain text. +""" +import asyncio +import subprocess +import typing +import urllib.parse + +import jetforce + + +class HTTPProxyApplication(jetforce.BaseApplication): + + command = [b"w3m", b"-dump"] + + def __call__( + self, environ: dict, send_status: typing.Callable + ) -> typing.Iterator[bytes]: + url = environ["URL"] + url_parts = urllib.parse.urlparse(url) + if url_parts.scheme not in ("http", "https"): + return send_status(jetforce.STATUS_NOT_FOUND, "Invalid Resource") + + try: + command = self.command + [url.encode()] + out = subprocess.run(command, stdout=subprocess.PIPE) + out.check_returncode() + except Exception: + send_status(jetforce.STATUS_CGI_ERROR, "Failed to load URL") + else: + send_status(jetforce.STATUS_SUCCESS, "text/plain") + yield out.stdout + + +if __name__ == "__main__": + parser = jetforce.build_argument_parser() + args = parser.parse_args() + app = HTTPProxyApplication() + server = jetforce.GeminiServer( + host=args.host, + port=args.port, + certfile=args.certfile, + keyfile=args.keyfile, + hostname=args.hostname, + app=app, + ) + asyncio.run(server.run()) diff --git a/jetforce.py b/jetforce.py index b05a75c..9900680 100755 --- a/jetforce.py +++ b/jetforce.py @@ -66,22 +66,37 @@ STATUS_FUTURE_CERTIFICATE_REJECTED = 64 STATUS_EXPIRED_CERTIFICATE_REJECTED = 65 -class EchoApp: +class BaseApplication: """ - A simple application that echos back the requested path. + Base Jetforce application class, analogous to a WSGI app. + + At a minimum you must implement the ``__call__()`` method, which will + perform the following tasks: + + 1. Send the response header by calling ``send_status()``. + 2. Optionally, yield the response body as bytes. """ - def __init__(self, environ: dict, send_status: typing.Callable) -> None: - self.environ = environ - self.send_status = send_status + def __call__( + self, environ: dict, send_status: typing.Callable + ) -> typing.Iterator[bytes]: + raise NotImplemented - def __iter__(self) -> typing.Iterator[bytes]: - self.send_status(STATUS_SUCCESS, "text/plain") - url = self.environ["RAW_URL"] - yield f"Received path: {url}".encode() + @staticmethod + def block_proxy_requests(url: str, hostname: str) -> typing.Optional[str]: + """ + Optional method that may be used to restrict request URLs that do not + match your current host and the gemini:// scheme. This may be used if + you don't want to worry about proxying traffic to other servers. + """ + url_parts = urllib.parse.urlparse(url) + if url_parts.scheme and url_parts.scheme != "gemini": + return 'URL scheme must be "gemini://"' + if url_parts.hostname and url_parts.hostname != hostname: + return f'URL hostname must be "{hostname}"' -class StaticDirectoryApp: +class StaticDirectoryApplication(BaseApplication): """ Serve a static directory over Gemini. @@ -90,50 +105,46 @@ class StaticDirectoryApp: directory listing will be auto-generated. """ - def __init__(self, root: str, environ: dict, send_status: typing.Callable) -> None: - self.root = pathlib.Path(root).resolve(strict=True) - self.environ = environ - self.send_status = send_status + def __init__(self, directory="/var/gemini"): + self.root = pathlib.Path(directory).resolve(strict=True) self.mimetypes = mimetypes.MimeTypes() - @classmethod - def serve_directory(cls, root: str) -> typing.Callable: - """ - Return an app that points to the given root directory on the file system. - """ + def __call__( + self, environ: dict, send_status: typing.Callable + ) -> typing.Iterator[bytes]: + url = environ["URL"] + url_parts = urllib.parse.urlparse(url) + url_path = pathlib.Path(url_parts.path.strip("/")) - def build_class(environ: dict, send_status: typing.Callable): - return cls(root, environ, send_status) - - return build_class - - def __iter__(self) -> typing.Iterator[bytes]: - url_path = pathlib.Path(self.environ["URL"].path.strip("/")) + error_message = self.block_proxy_requests(url, environ["HOSTNAME"]) + if error_message: + return send_status(STATUS_PROXY_REQUEST_REFUSED, error_message) filename = pathlib.Path(os.path.normpath(str(url_path))) if filename.is_absolute() or str(filename.name).startswith(".."): # Guard against breaking out of the directory - self.send_status(STATUS_NOT_FOUND, "Not Found") - return - else: - filesystem_path = self.root / filename + return send_status(STATUS_NOT_FOUND, "Not Found") + + filesystem_path = self.root / filename if filesystem_path.is_file(): mimetype = self.guess_mimetype(filesystem_path.name) - yield from self.load_file(filesystem_path, mimetype) + send_status(STATUS_SUCCESS, mimetype) + yield from self.load_file(filesystem_path) elif filesystem_path.is_dir(): gemini_file = filesystem_path / ".gemini" if gemini_file.exists(): - yield from self.load_file(gemini_file, "text/gemini") + send_status(STATUS_SUCCESS, "text/gemini") + yield from self.load_file(gemini_file) else: + send_status(STATUS_SUCCESS, "text/gemini") yield from self.list_directory(url_path, filesystem_path) else: - self.send_status(STATUS_NOT_FOUND, "Not Found") + return send_status(STATUS_NOT_FOUND, "Not Found") - def load_file(self, filesystem_path: pathlib.Path, mimetype: str): - self.send_status(STATUS_SUCCESS, mimetype) + def load_file(self, filesystem_path: pathlib.Path): with filesystem_path.open("rb") as fp: data = fp.read(1024) while data: @@ -141,8 +152,6 @@ class StaticDirectoryApp: data = fp.read(1024) def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path): - self.send_status(STATUS_SUCCESS, "text/gemini") - yield f"Directory: /{url_path}\r\n".encode() if url_path.parent != url_path: yield f"=>/{url_path.parent}\t..\r\n".encode() @@ -184,7 +193,6 @@ class GeminiRequestHandler: self.writer: typing.Optional[asyncio.StreamWriter] = None self.received_timestamp: typing.Optional[datetime.datetime] = None self.remote_addr: typing.Optional[str] = None - self.raw_url: typing.Optional[str] = None self.url: typing.Optional[urllib.parse.ParseResult] = None self.status: typing.Optional[int] = None self.meta: typing.Optional[str] = None @@ -213,19 +221,6 @@ class GeminiRequestHandler: self.write_status(STATUS_BAD_REQUEST, "Could not understand request line") return await self.close_connection() - # Discard proxy requests, may revisit this in a later version - if self.url.scheme and self.url.scheme != "gemini": - self.write_status( - STATUS_PROXY_REQUEST_REFUSED, 'URL scheme must be "gemini://"' - ) - return await self.close_connection() - elif self.url.hostname and self.url.hostname != self.server.hostname: - self.write_status( - STATUS_PROXY_REQUEST_REFUSED, - f'URL hostname must be "{self.server.hostname}"', - ) - return await self.close_connection() - try: environ = self.build_environ() app = self.app(environ, self.write_status) @@ -246,7 +241,6 @@ class GeminiRequestHandler: "SERVER_PORT": self.server.port, "REMOTE_ADDR": self.remote_addr, "HOSTNAME": self.server.hostname, - "RAW_URL": self.raw_url, "URL": self.url, } @@ -261,12 +255,7 @@ class GeminiRequestHandler: if len(data) > 1024: raise ValueError("URL exceeds max length of 1024 bytes") - self.raw_url = data.decode() - self.url = urllib.parse.urlparse(self.raw_url) - if not self.url.netloc: - # URL does not contain a scheme and was not prefixed with // per RFC 1808 - # TODO: Suggest spec should enforce // when scheme is omitted - self.url = urllib.parse.urlparse(f"//{self.raw_url}") + self.url = data.decode() def write_status(self, status: int, meta: str) -> None: """ @@ -323,7 +312,7 @@ class GeminiRequestHandler: self.server.log_message( f"{self.remote_addr} " f"[{self.received_timestamp:%d/%b/%Y:%H:%M:%S +0000}] " - f'"{self.raw_url}" ' + f'"{self.url}" ' f"{self.status} " f'"{self.meta}" ' f"{self.response_size}" @@ -341,16 +330,25 @@ class GeminiServer: self, host: str, port: int, - ssl_context: ssl.SSLContext, + certfile: typing.Optional[str], + keyfile: typing.Optional[str], hostname: str, app: typing.Callable, ) -> None: + self.host = host self.port = port - self.ssl_context = ssl_context self.hostname = hostname self.app = app + if not certfile: + certfile, keyfile = self.generate_tls_certificate(hostname) + + self.ssl_context = ssl.SSLContext() + self.ssl_context.verify_mode = ssl.CERT_NONE + self.ssl_context.check_hostname = False + self.ssl_context.load_cert_chain(certfile, keyfile) + async def run(self) -> None: """ The main asynchronous server loop. @@ -385,30 +383,31 @@ class GeminiServer: """ print(message, file=sys.stderr) - -def generate_tls_certificate(hostname: str) -> typing.Tuple[str, str]: - """ - Utility function to generate a self-signed SSL certificate key pair if - one isn't provided. Results may vary depending on your version of OpenSSL. - """ - certfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.crt" - keyfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.key" - if not certfile.exists() or not keyfile.exists(): - print(f"Writing ad hoc TLS certificate to {certfile}") - subprocess.run( - [ - f"openssl req -newkey rsa:2048 -nodes -keyout {keyfile}" - f' -nodes -x509 -out {certfile} -subj "/CN={hostname}"' - ], - shell=True, - check=True, - ) - return str(certfile), str(keyfile) + @staticmethod + def generate_tls_certificate(hostname: str) -> typing.Tuple[str, str]: + """ + Utility function to generate a self-signed SSL certificate key pair if + one isn't provided. Results may vary depending on your version of OpenSSL. + """ + certfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.crt" + keyfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.key" + if not certfile.exists() or not keyfile.exists(): + print(f"Writing ad hoc TLS certificate to {certfile}") + subprocess.run( + [ + f"openssl req -newkey rsa:2048 -nodes -keyout {keyfile}" + f' -nodes -x509 -out {certfile} -subj "/CN={hostname}"' + ], + shell=True, + check=True, + ) + return str(certfile), str(keyfile) -def run_server() -> None: +def build_argument_parser() -> argparse.ArgumentParser: """ - Entry point for running the command line directory server. + Construct the default argument parser when launching the server from + the command line. """ parser = argparse.ArgumentParser( prog="jetforce", @@ -418,25 +417,32 @@ def run_server() -> None: ) parser.add_argument("--host", help="Address to bind server to", default="127.0.0.1") parser.add_argument("--port", help="Port to bind server to", type=int, default=1965) - parser.add_argument("--tls-certfile", help="TLS certificate file", metavar="FILE") - parser.add_argument("--tls-keyfile", help="TLS private key file", metavar="FILE") + parser.add_argument( + "--tls-certfile", dest="certfile", help="TLS certificate file", metavar="FILE" + ) + parser.add_argument( + "--tls-keyfile", dest="keyfile", help="TLS private key file", metavar="FILE" + ) parser.add_argument("--hostname", help="Server hostname", default="localhost") + return parser + + +def run_server() -> None: + """ + Entry point for running the command line static directory server. + """ + parser = build_argument_parser() parser.add_argument("--dir", help="local directory to serve", default="/var/gemini") args = parser.parse_args() - certfile, keyfile = args.tls_certfile, args.tls_keyfile - if not certfile: - certfile, keyfile = generate_tls_certificate(args.hostname) - - ssl_context = ssl.SSLContext() - ssl_context.load_cert_chain(certfile, keyfile) - + app = StaticDirectoryApplication(args.dir) server = GeminiServer( host=args.host, port=args.port, - ssl_context=ssl_context, + certfile=args.certfile, + keyfile=args.keyfile, hostname=args.hostname, - app=StaticDirectoryApp.serve_directory(args.dir), + app=app, ) asyncio.run(server.run()) diff --git a/jetforce_client.py b/jetforce_client.py index f187678..a0fda3f 100755 --- a/jetforce_client.py +++ b/jetforce_client.py @@ -41,7 +41,13 @@ def run_client(): parser.add_argument( "--port", help="Optional port to connect to, will default to the URL" ) + parser.add_argument("--certfile", help="Optional client certificate") + parser.add_argument("--keyfile", help="Optional client key") args = parser.parse_args() + + if args.certfile: + context.load_cert_chain(args.certfile, args.keyfile) + fetch(args.url, args.host, args.port)