Better path handling

This commit is contained in:
Michael Lazar 2019-08-05 22:21:28 -04:00
parent 3efbd727dd
commit 94e28235b3
1 changed files with 55 additions and 50 deletions

View File

@ -1,20 +1,23 @@
#!/usr/bin/env python3
#!/usr/bin/env python3.7
import argparse
import asyncio
import datetime
import mimetypes
import os
import pathlib
import ssl
import sys
import typing
# Fail early to avoid crashing with an obscure error
if sys.version_info < (3, 7):
sys.exit("Fatal Error: jetforce requires Python 3.7+")
__version__ = "0.0.1"
__title__ = "Jetforce Gemini Server"
__author__ = "Michael Lazar"
__license__ = "GNU General Public License v3.0"
__copyright__ = "(c) 2019 Michael Lazar"
ABOUT = fr"""
You are now riding on...
_________ _____________
@ -27,7 +30,6 @@ An Experimental Gemini Server, v{__version__}
https://github.com/michael-lazar/jetforce
"""
# Gemini response status codes
STATUS_SUCCESS = 2
STATUS_NOT_FOUND = 4
@ -62,56 +64,72 @@ class StaticDirectoryApp:
directory listing will be auto-generated.
"""
root: str = "/var/gemini"
def __init__(self, environ: dict, send_status: typing.Callable) -> None:
def __init__(self, root: str, environ: dict, send_status: typing.Callable) -> None:
self.root = pathlib.Path(root).resolve(strict=True)
self.environ = environ
self.send_status = send_status
self.mimetypes = mimetypes.MimeTypes()
def __iter__(self) -> typing.Iterator[bytes]:
filename = self.environ["PATH"]
filename = filename.lstrip("/")
@classmethod
def serve_directory(cls, root: str) -> typing.Callable:
"""
Return an app that points to the given root directory on the file system.
"""
abs_filename = os.path.abspath(os.path.join(self.root, filename))
if not abs_filename.startswith(self.root):
def build_class(environ: dict, send_status: typing.Callable):
return cls(root, environ, send_status)
return build_class
def __iter__(self) -> typing.Iterator[bytes]:
url_path = pathlib.Path(self.environ["PATH"].strip("/"))
filesystem_path = (self.root / url_path).resolve()
try:
filesystem_path.relative_to(self.root)
except ValueError:
# Guard against breaking out of the directory
self.send_status(STATUS_NOT_FOUND, "Not Found")
return
elif os.path.isfile(abs_filename):
mimetype = self.guess_mimetype(abs_filename)
yield from self.load_file(abs_filename, mimetype)
if filesystem_path.is_file():
mimetype = self.guess_mimetype(filesystem_path.name)
yield from self.load_file(filesystem_path, mimetype)
elif os.path.isdir(abs_filename):
gemini_map_file = os.path.join(abs_filename, ".gemini")
if os.path.exists(gemini_map_file):
yield from self.load_file(gemini_map_file, "text/gemini")
elif filesystem_path.is_dir():
gemini_file = filesystem_path / ".gemini"
if gemini_file.exists():
yield from self.load_file(gemini_file, "text/gemini")
else:
yield from self.list_directory(abs_filename)
yield from self.list_directory(url_path, filesystem_path)
else:
self.send_status(STATUS_NOT_FOUND, "Not Found")
def load_file(self, abs_filename: str, mimetype: str):
def load_file(self, filesystem_path: pathlib.Path, mimetype: str):
self.send_status(STATUS_SUCCESS, mimetype)
with open(abs_filename, "rb") as fp:
with filesystem_path.open("rb") as fp:
data = fp.read(1024)
while data:
yield data
data = fp.read(1024)
def list_directory(self, abs_folder: str):
def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path):
self.send_status(STATUS_SUCCESS, "text/gemini")
for filename in os.listdir(abs_folder):
if filename == ".gemini" or filename.startswith("~"):
yield f"Directory: /{url_path}\r\n".encode()
if url_path.parent != url_path:
yield f"=>/{url_path.parent}\t..\r\n".encode()
for file in sorted(filesystem_path.iterdir()):
if file.name.startswith((".", "~")):
# Skip hidden and temporary files for security reasons
continue
abs_filename = os.path.join(abs_folder, filename)
if os.path.isdir(abs_filename):
# The directory end slash is necessary for relative paths to work
filename += "/"
yield f"=>{filename}\r\n".encode()
elif file.is_dir():
yield f"=>/{url_path / file.name}\t{file.name}/\r\n".encode()
else:
yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
def guess_mimetype(self, filename: str):
mime, encoding = self.mimetypes.guess_type(filename)
@ -176,11 +194,9 @@ class GeminiRequestHandler:
app = self.app(environ, self.write_status)
for data in app:
await self.write_body(data)
except Exception as e:
self.write_status(STATUS_SERVER_ERROR, str(e))
raise
finally:
await self.flush_status()
self.log_request()
@ -269,21 +285,12 @@ class GeminiServer:
request_handler_class = GeminiRequestHandler
def __init__(
self,
host: str,
port: int,
ssl_context: typing.Union[tuple, ssl.SSLContext],
app: typing.Callable,
self, host: str, port: int, ssl_context: ssl.SSLContext, app: typing.Callable
) -> None:
self.host = host
self.port = port
self.app = app
if isinstance(ssl_context, tuple):
self.ssl_context = ssl.SSLContext()
self.ssl_context.load_cert_chain(*ssl_context)
else:
self.ssl_context = ssl_context
self.app = app
async def run(self) -> None:
"""
@ -328,10 +335,7 @@ def run_server():
parser.add_argument("--host", help="server host", default="127.0.0.1")
parser.add_argument("--port", help="server port", type=int, default=1965)
parser.add_argument(
"--dir",
help="local directory to serve files from",
type=str,
default=StaticDirectoryApp.root,
"--dir", help="local directory to serve files from", default="/var/gemini"
)
parser.add_argument(
"--tls-certfile",
@ -347,13 +351,14 @@ def run_server():
)
args = parser.parse_args()
StaticDirectoryApp.root = args.dir
ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain(args.tls_certfile, args.tls_keyfile)
server = GeminiServer(
host=args.host,
port=args.port,
ssl_context=(args.tls_certfile, args.tls_keyfile),
app=StaticDirectoryApp,
ssl_context=ssl_context,
app=StaticDirectoryApp.serve_directory(args.dir),
)
asyncio.run(server.run())