diff --git a/jetforce.py b/jetforce.py index fdb7b71..f541c28 100644 --- a/jetforce.py +++ b/jetforce.py @@ -1,20 +1,23 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3.7 import argparse import asyncio import datetime import mimetypes -import os +import pathlib import ssl import sys import typing +# Fail early to avoid crashing with an obscure error +if sys.version_info < (3, 7): + sys.exit("Fatal Error: jetforce requires Python 3.7+") + __version__ = "0.0.1" __title__ = "Jetforce Gemini Server" __author__ = "Michael Lazar" __license__ = "GNU General Public License v3.0" __copyright__ = "(c) 2019 Michael Lazar" - ABOUT = fr""" You are now riding on... _________ _____________ @@ -27,7 +30,6 @@ An Experimental Gemini Server, v{__version__} https://github.com/michael-lazar/jetforce """ - # Gemini response status codes STATUS_SUCCESS = 2 STATUS_NOT_FOUND = 4 @@ -62,56 +64,72 @@ class StaticDirectoryApp: directory listing will be auto-generated. """ - root: str = "/var/gemini" - - def __init__(self, environ: dict, send_status: typing.Callable) -> None: + def __init__(self, root: str, environ: dict, send_status: typing.Callable) -> None: + self.root = pathlib.Path(root).resolve(strict=True) self.environ = environ self.send_status = send_status self.mimetypes = mimetypes.MimeTypes() - def __iter__(self) -> typing.Iterator[bytes]: - filename = self.environ["PATH"] - filename = filename.lstrip("/") + @classmethod + def serve_directory(cls, root: str) -> typing.Callable: + """ + Return an app that points to the given root directory on the file system. + """ - abs_filename = os.path.abspath(os.path.join(self.root, filename)) - if not abs_filename.startswith(self.root): + def build_class(environ: dict, send_status: typing.Callable): + return cls(root, environ, send_status) + + return build_class + + def __iter__(self) -> typing.Iterator[bytes]: + url_path = pathlib.Path(self.environ["PATH"].strip("/")) + filesystem_path = (self.root / url_path).resolve() + + try: + filesystem_path.relative_to(self.root) + except ValueError: # Guard against breaking out of the directory self.send_status(STATUS_NOT_FOUND, "Not Found") + return - elif os.path.isfile(abs_filename): - mimetype = self.guess_mimetype(abs_filename) - yield from self.load_file(abs_filename, mimetype) + if filesystem_path.is_file(): + mimetype = self.guess_mimetype(filesystem_path.name) + yield from self.load_file(filesystem_path, mimetype) - elif os.path.isdir(abs_filename): - gemini_map_file = os.path.join(abs_filename, ".gemini") - if os.path.exists(gemini_map_file): - yield from self.load_file(gemini_map_file, "text/gemini") + elif filesystem_path.is_dir(): + gemini_file = filesystem_path / ".gemini" + if gemini_file.exists(): + yield from self.load_file(gemini_file, "text/gemini") else: - yield from self.list_directory(abs_filename) + yield from self.list_directory(url_path, filesystem_path) else: self.send_status(STATUS_NOT_FOUND, "Not Found") - def load_file(self, abs_filename: str, mimetype: str): + def load_file(self, filesystem_path: pathlib.Path, mimetype: str): self.send_status(STATUS_SUCCESS, mimetype) - with open(abs_filename, "rb") as fp: + with filesystem_path.open("rb") as fp: data = fp.read(1024) while data: yield data data = fp.read(1024) - def list_directory(self, abs_folder: str): + def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path): self.send_status(STATUS_SUCCESS, "text/gemini") - for filename in os.listdir(abs_folder): - if filename == ".gemini" or filename.startswith("~"): + yield f"Directory: /{url_path}\r\n".encode() + if url_path.parent != url_path: + yield f"=>/{url_path.parent}\t..\r\n".encode() + + for file in sorted(filesystem_path.iterdir()): + if file.name.startswith((".", "~")): + # Skip hidden and temporary files for security reasons continue - abs_filename = os.path.join(abs_folder, filename) - if os.path.isdir(abs_filename): - # The directory end slash is necessary for relative paths to work - filename += "/" - yield f"=>{filename}\r\n".encode() + elif file.is_dir(): + yield f"=>/{url_path / file.name}\t{file.name}/\r\n".encode() + else: + yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode() def guess_mimetype(self, filename: str): mime, encoding = self.mimetypes.guess_type(filename) @@ -176,11 +194,9 @@ class GeminiRequestHandler: app = self.app(environ, self.write_status) for data in app: await self.write_body(data) - except Exception as e: self.write_status(STATUS_SERVER_ERROR, str(e)) raise - finally: await self.flush_status() self.log_request() @@ -269,21 +285,12 @@ class GeminiServer: request_handler_class = GeminiRequestHandler def __init__( - self, - host: str, - port: int, - ssl_context: typing.Union[tuple, ssl.SSLContext], - app: typing.Callable, + self, host: str, port: int, ssl_context: ssl.SSLContext, app: typing.Callable ) -> None: - self.host = host self.port = port + self.ssl_context = ssl_context self.app = app - if isinstance(ssl_context, tuple): - self.ssl_context = ssl.SSLContext() - self.ssl_context.load_cert_chain(*ssl_context) - else: - self.ssl_context = ssl_context async def run(self) -> None: """ @@ -328,10 +335,7 @@ def run_server(): parser.add_argument("--host", help="server host", default="127.0.0.1") parser.add_argument("--port", help="server port", type=int, default=1965) parser.add_argument( - "--dir", - help="local directory to serve files from", - type=str, - default=StaticDirectoryApp.root, + "--dir", help="local directory to serve files from", default="/var/gemini" ) parser.add_argument( "--tls-certfile", @@ -347,13 +351,14 @@ def run_server(): ) args = parser.parse_args() - StaticDirectoryApp.root = args.dir + ssl_context = ssl.SSLContext() + ssl_context.load_cert_chain(args.tls_certfile, args.tls_keyfile) server = GeminiServer( host=args.host, port=args.port, - ssl_context=(args.tls_certfile, args.tls_keyfile), - app=StaticDirectoryApp, + ssl_context=ssl_context, + app=StaticDirectoryApp.serve_directory(args.dir), ) asyncio.run(server.run())