More significant refactoring
This commit is contained in:
parent
dd76b04d76
commit
4160aefe94
|
@ -2,20 +2,18 @@
|
||||||
A simple Gemini server that echos back the request to the client.
|
A simple Gemini server that echos back the request to the client.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
|
||||||
|
|
||||||
import jetforce
|
import jetforce
|
||||||
|
|
||||||
|
|
||||||
def echo(environ: dict, send_status: typing.Callable) -> typing.Iterator[bytes]:
|
def echo(environ, send_status):
|
||||||
url = environ["URL"]
|
url = environ["URL"]
|
||||||
send_status(jetforce.STATUS_SUCCESS, "text/gemini")
|
send_status(jetforce.Status.SUCCESS, "text/gemini")
|
||||||
yield f"Received path: {url}".encode()
|
yield f"Received path: {url}".encode()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = jetforce.build_argument_parser()
|
args = jetforce.command_line_parser().parse_args()
|
||||||
args = parser.parse_args()
|
|
||||||
server = jetforce.GeminiServer(
|
server = jetforce.GeminiServer(
|
||||||
host=args.host,
|
host=args.host,
|
||||||
port=args.port,
|
port=args.port,
|
||||||
|
|
|
@ -1,83 +1,52 @@
|
||||||
"""
|
"""
|
||||||
A guestbook application that accepts user messages using the INPUT response type
|
A guestbook application that accepts input from guests and stores messages in
|
||||||
and stores messages in a simple SQLite database file.
|
a simple text file.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import sqlite3
|
import pathlib
|
||||||
import typing
|
|
||||||
import urllib.parse
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import jetforce
|
import jetforce
|
||||||
|
from jetforce import Response, Status
|
||||||
|
|
||||||
|
guestbook = pathlib.Path("guestbook.txt")
|
||||||
|
|
||||||
|
|
||||||
class GuestbookApplication(jetforce.BaseApplication):
|
app = jetforce.JetforceApplication()
|
||||||
|
|
||||||
db_file = "guestbook.sql"
|
|
||||||
|
|
||||||
def connect_db(self) -> typing.Tuple[sqlite3.Connection, sqlite3.Cursor]:
|
@app.route("$")
|
||||||
db = sqlite3.connect(self.db_file, detect_types=sqlite3.PARSE_DECLTYPES)
|
def index(request):
|
||||||
cursor = db.cursor()
|
data = ["Guestbook", "=>/submit Leave a Message", ""]
|
||||||
cursor.execute(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS guestbook (
|
|
||||||
id int PRIMARY KEY,
|
|
||||||
message text,
|
|
||||||
created timestamp,
|
|
||||||
ip_address text);
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
db.commit()
|
|
||||||
return db, cursor
|
|
||||||
|
|
||||||
def __call__(
|
guestbook.touch(exist_ok=True)
|
||||||
self, environ: dict, send_status: typing.Callable
|
with guestbook.open("r") as fp:
|
||||||
) -> typing.Iterator[bytes]:
|
for line in fp:
|
||||||
url = environ["URL"]
|
line = line.strip()
|
||||||
url_parts = urllib.parse.urlparse(url)
|
if line.startswith("=>"):
|
||||||
|
# Protect guests from writing messages that contain links
|
||||||
error_message = self.block_proxy_requests(url, environ["HOSTNAME"])
|
data.append(line[2:])
|
||||||
if error_message:
|
|
||||||
return send_status(jetforce.STATUS_PROXY_REQUEST_REFUSED, error_message)
|
|
||||||
|
|
||||||
if url_parts.path in ("", "/"):
|
|
||||||
send_status(jetforce.STATUS_SUCCESS, "text/gemini")
|
|
||||||
yield from self.list_messages()
|
|
||||||
elif url_parts.path == "/submit":
|
|
||||||
if url_parts.query:
|
|
||||||
self.save_message(url_parts.query, environ["REMOTE_ADDR"])
|
|
||||||
return send_status(jetforce.STATUS_REDIRECT_TEMPORARY, "/")
|
|
||||||
else:
|
else:
|
||||||
return send_status(
|
data.append(line)
|
||||||
jetforce.STATUS_INPUT, "Enter your message (max 256 characters)"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return send_status(jetforce.STATUS_NOT_FOUND, "Invalid address")
|
|
||||||
|
|
||||||
def save_message(self, message: str, ip_address: str) -> None:
|
data.extend(["", "...", ""])
|
||||||
message = message[:256]
|
return Response(Status.SUCCESS, "text/gemini", "\n".join(data))
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/submit")
|
||||||
|
def submit(request):
|
||||||
|
if request.query:
|
||||||
|
message = request.query[:256]
|
||||||
created = datetime.utcnow()
|
created = datetime.utcnow()
|
||||||
|
with guestbook.open("a") as fp:
|
||||||
db, cursor = self.connect_db()
|
fp.write(f"\n[{created:%Y-%m-%d %I:%M %p}]\n{message}\n")
|
||||||
sql = "INSERT INTO guestbook(message, created, ip_address) VALUES (?, ?, ?)"
|
return Response(Status.REDIRECT_TEMPORARY, "")
|
||||||
cursor.execute(sql, (message, created, ip_address))
|
else:
|
||||||
db.commit()
|
return Response(Status.INPUT, "Enter your message (max 256 characters)")
|
||||||
|
|
||||||
def list_messages(self) -> typing.Iterator[bytes]:
|
|
||||||
yield "Guestbook\n=>/submit Leave a Message\n".encode()
|
|
||||||
|
|
||||||
db, cursor = self.connect_db()
|
|
||||||
cursor.execute("SELECT created, message FROM guestbook ORDER BY created DESC")
|
|
||||||
for row in cursor.fetchall():
|
|
||||||
yield f"\n[{row[0]:%Y-%m-%d %I:%M %p}]\n{row[1]}\n".encode()
|
|
||||||
|
|
||||||
yield "\n...\n".encode()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = jetforce.build_argument_parser()
|
args = jetforce.command_line_parser().parse_args()
|
||||||
args = parser.parse_args()
|
|
||||||
app = GuestbookApplication()
|
|
||||||
server = jetforce.GeminiServer(
|
server = jetforce.GeminiServer(
|
||||||
host=args.host,
|
host=args.host,
|
||||||
port=args.port,
|
port=args.port,
|
||||||
|
|
|
@ -5,39 +5,28 @@ locally using the `w3m` tool, and render the output to the client as plain text.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import subprocess
|
import subprocess
|
||||||
import typing
|
|
||||||
import urllib.parse
|
|
||||||
|
|
||||||
import jetforce
|
import jetforce
|
||||||
|
from jetforce import Response, Status
|
||||||
|
|
||||||
|
app = jetforce.JetforceApplication()
|
||||||
|
|
||||||
|
|
||||||
class HTTPProxyApplication(jetforce.BaseApplication):
|
@app.route(scheme="https", strict_hostname=False)
|
||||||
|
@app.route(scheme="http", strict_hostname=False)
|
||||||
command = [b"w3m", b"-dump"]
|
def proxy_request(request):
|
||||||
|
command = [b"w3m", b"-dump", request.url.encode()]
|
||||||
def __call__(
|
|
||||||
self, environ: dict, send_status: typing.Callable
|
|
||||||
) -> typing.Iterator[bytes]:
|
|
||||||
url = environ["URL"]
|
|
||||||
url_parts = urllib.parse.urlparse(url)
|
|
||||||
if url_parts.scheme not in ("http", "https"):
|
|
||||||
return send_status(jetforce.STATUS_NOT_FOUND, "Invalid Resource")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
command = self.command + [url.encode()]
|
|
||||||
out = subprocess.run(command, stdout=subprocess.PIPE)
|
out = subprocess.run(command, stdout=subprocess.PIPE)
|
||||||
out.check_returncode()
|
out.check_returncode()
|
||||||
except Exception:
|
except Exception:
|
||||||
send_status(jetforce.STATUS_CGI_ERROR, "Failed to load URL")
|
return Response(Status.CGI_ERROR, "Failed to load URL")
|
||||||
else:
|
else:
|
||||||
send_status(jetforce.STATUS_SUCCESS, "text/plain")
|
return Response(Status.SUCCESS, "text/plain", out.stdout)
|
||||||
yield out.stdout
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = jetforce.build_argument_parser()
|
args = jetforce.command_line_parser().parse_args()
|
||||||
args = parser.parse_args()
|
|
||||||
app = HTTPProxyApplication()
|
|
||||||
server = jetforce.GeminiServer(
|
server = jetforce.GeminiServer(
|
||||||
host=args.host,
|
host=args.host,
|
||||||
port=args.port,
|
port=args.port,
|
||||||
|
|
243
jetforce.py
243
jetforce.py
|
@ -1,10 +1,12 @@
|
||||||
#!/usr/bin/env python3.7
|
#!/usr/bin/env python3.7
|
||||||
import argparse
|
import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import dataclasses
|
||||||
import datetime
|
import datetime
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import re
|
||||||
import ssl
|
import ssl
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
@ -12,7 +14,7 @@ import tempfile
|
||||||
import typing
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
|
||||||
# Fail early to avoid crashing with an obscure error
|
# Fail early to avoid crashing with some obscure error
|
||||||
if sys.version_info < (3, 7):
|
if sys.version_info < (3, 7):
|
||||||
sys.exit("Fatal Error: jetforce requires Python 3.7+")
|
sys.exit("Fatal Error: jetforce requires Python 3.7+")
|
||||||
|
|
||||||
|
@ -39,64 +41,155 @@ If the TLS cert/keyfile is not provided, a self-signed certificate will
|
||||||
automatically be generated and saved to your temporary file directory.
|
automatically be generated and saved to your temporary file directory.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
STATUS_INPUT = 10
|
|
||||||
|
|
||||||
STATUS_SUCCESS = 20
|
class Status:
|
||||||
STATUS_SUCCESS_END_OF_SESSION = 21
|
|
||||||
|
|
||||||
STATUS_REDIRECT_TEMPORARY = 30
|
|
||||||
STATUS_REDIRECT_PERMANENT = 31
|
|
||||||
STATUS_TEMPORARY_FAILURE = 40
|
|
||||||
STATUS_SERVER_UNAVAILABLE = 41
|
|
||||||
STATUS_CGI_ERROR = 42
|
|
||||||
STATUS_PROXY_ERROR = 43
|
|
||||||
STATUS_SLOW_DOWN = 44
|
|
||||||
|
|
||||||
STATUS_PERMANENT_FAILURE = 50
|
|
||||||
STATUS_NOT_FOUND = 51
|
|
||||||
STATUS_GONE = 52
|
|
||||||
STATUS_PROXY_REQUEST_REFUSED = 53
|
|
||||||
STATUS_BAD_REQUEST = 59
|
|
||||||
|
|
||||||
STATUS_CLIENT_CERTIFICATE_REQUIRED = 60
|
|
||||||
STATUS_TRANSIENT_CERTIFICATE_REQUESTED = 61
|
|
||||||
STATUS_AUTHORISED_CERTIFICATE_REQUIRED = 62
|
|
||||||
STATUS_CERTIFICATE_NOT_ACCEPTED = 63
|
|
||||||
STATUS_FUTURE_CERTIFICATE_REJECTED = 64
|
|
||||||
STATUS_EXPIRED_CERTIFICATE_REJECTED = 65
|
|
||||||
|
|
||||||
|
|
||||||
class BaseApplication:
|
|
||||||
"""
|
"""
|
||||||
Base Jetforce application class, analogous to a WSGI app.
|
Gemini response status codes.
|
||||||
|
|
||||||
At a minimum you must implement the ``__call__()`` method, which will
|
|
||||||
perform the following tasks:
|
|
||||||
|
|
||||||
1. Send the response header by calling ``send_status()``.
|
|
||||||
2. Optionally, yield the response body as bytes.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __call__(
|
INPUT = 10
|
||||||
self, environ: dict, send_status: typing.Callable
|
|
||||||
) -> typing.Iterator[bytes]:
|
|
||||||
raise NotImplemented
|
|
||||||
|
|
||||||
@staticmethod
|
SUCCESS = 20
|
||||||
def block_proxy_requests(url: str, hostname: str) -> typing.Optional[str]:
|
SUCCESS_END_OF_SESSION = 21
|
||||||
|
|
||||||
|
REDIRECT_TEMPORARY = 30
|
||||||
|
REDIRECT_PERMANENT = 31
|
||||||
|
|
||||||
|
TEMPORARY_FAILURE = 40
|
||||||
|
SERVER_UNAVAILABLE = 41
|
||||||
|
CGI_ERROR = 42
|
||||||
|
PROXY_ERROR = 43
|
||||||
|
SLOW_DOWN = 44
|
||||||
|
|
||||||
|
PERMANENT_FAILURE = 50
|
||||||
|
NOT_FOUND = 51
|
||||||
|
GONE = 52
|
||||||
|
PROXY_REQUEST_REFUSED = 53
|
||||||
|
BAD_REQUEST = 59
|
||||||
|
|
||||||
|
CLIENT_CERTIFICATE_REQUIRED = 60
|
||||||
|
TRANSIENT_CERTIFICATE_REQUESTED = 61
|
||||||
|
AUTHORISED_CERTIFICATE_REQUIRED = 62
|
||||||
|
CERTIFICATE_NOT_ACCEPTED = 63
|
||||||
|
FUTURE_CERTIFICATE_REJECTED = 64
|
||||||
|
EXPIRED_CERTIFICATE_REJECTED = 65
|
||||||
|
|
||||||
|
|
||||||
|
class Request:
|
||||||
"""
|
"""
|
||||||
Optional method that may be used to restrict request URLs that do not
|
Object that encapsulates information about a single gemini request.
|
||||||
match your current host and the gemini:// scheme. This may be used if
|
|
||||||
you don't want to worry about proxying traffic to other servers.
|
|
||||||
"""
|
"""
|
||||||
url_parts = urllib.parse.urlparse(url)
|
|
||||||
if url_parts.scheme and url_parts.scheme != "gemini":
|
def __init__(self, environ: dict):
|
||||||
return 'URL scheme must be "gemini://"'
|
self.environ = environ
|
||||||
if url_parts.hostname and url_parts.hostname != hostname:
|
|
||||||
return f'URL hostname must be "{hostname}"'
|
url = urllib.parse.urlparse(environ["URL"])
|
||||||
|
self.scheme = url.scheme
|
||||||
|
self.hostname = url.hostname
|
||||||
|
self.port = url.port
|
||||||
|
self.path = url.path
|
||||||
|
self.params = url.params
|
||||||
|
self.query = url.query
|
||||||
|
self.fragment = url.fragment
|
||||||
|
self.url = url.geturl()
|
||||||
|
|
||||||
|
|
||||||
class StaticDirectoryApplication(BaseApplication):
|
@dataclasses.dataclass
|
||||||
|
class Response:
|
||||||
|
"""
|
||||||
|
Object that encapsulates information about a single gemini response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
status: int
|
||||||
|
meta: str
|
||||||
|
body: typing.Union[None, bytes, str, typing.Iterator[bytes]] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class RoutePattern:
|
||||||
|
"""
|
||||||
|
A pattern for matching URLs with a single endpoint or route.
|
||||||
|
"""
|
||||||
|
|
||||||
|
path: str = ""
|
||||||
|
scheme: str = "gemini"
|
||||||
|
strict_hostname: bool = True
|
||||||
|
strict_trailing_slash: bool = False
|
||||||
|
|
||||||
|
def match(self, request: Request) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the given request URL matches this route pattern.
|
||||||
|
"""
|
||||||
|
server_hostname = request.environ["HOSTNAME"]
|
||||||
|
|
||||||
|
if self.strict_hostname and request.hostname != server_hostname:
|
||||||
|
return False
|
||||||
|
if self.scheme and self.scheme != request.scheme:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if self.strict_trailing_slash:
|
||||||
|
request_path = request.path
|
||||||
|
else:
|
||||||
|
request_path = request.path.rstrip("/")
|
||||||
|
|
||||||
|
return bool(re.match(self.path, request_path))
|
||||||
|
|
||||||
|
|
||||||
|
class JetforceApplication:
|
||||||
|
"""
|
||||||
|
Base Jetforce application class with primitive URL routing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.routes: typing.List[
|
||||||
|
typing.Tuple[RoutePattern, typing.Callable[[Request], Response]]
|
||||||
|
] = []
|
||||||
|
|
||||||
|
def __call__(self, environ: dict, send_status: typing.Callable):
|
||||||
|
request = Request(environ)
|
||||||
|
for route_pattern, callback in self.routes[::-1]:
|
||||||
|
if route_pattern.match(request):
|
||||||
|
response = callback(request)
|
||||||
|
send_status(response.status, response.meta)
|
||||||
|
if response.body:
|
||||||
|
if isinstance(response.body, bytes):
|
||||||
|
yield response.body
|
||||||
|
elif isinstance(response.body, str):
|
||||||
|
yield response.body.encode()
|
||||||
|
else:
|
||||||
|
yield from response.body
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
send_status(Status.NOT_FOUND, "Not Found")
|
||||||
|
|
||||||
|
def route(
|
||||||
|
self,
|
||||||
|
path: str = "",
|
||||||
|
scheme: str = "gemini",
|
||||||
|
strict_hostname: bool = True,
|
||||||
|
strict_trailing_slash: bool = False,
|
||||||
|
) -> typing.Callable:
|
||||||
|
"""
|
||||||
|
Decorator for binding a function to a route based on the URL path.
|
||||||
|
|
||||||
|
app = JetforceApplication()
|
||||||
|
|
||||||
|
@app.route('/my-path')
|
||||||
|
def my_path(request):
|
||||||
|
return Response(Status.SUCCESS, 'text/plain', 'Hello world!')
|
||||||
|
"""
|
||||||
|
route_pattern = RoutePattern(
|
||||||
|
path, scheme, strict_hostname, strict_trailing_slash
|
||||||
|
)
|
||||||
|
|
||||||
|
def wrap(func: typing.Callable) -> typing.Callable:
|
||||||
|
self.routes.append((route_pattern, func))
|
||||||
|
return func
|
||||||
|
|
||||||
|
return wrap
|
||||||
|
|
||||||
|
|
||||||
|
class StaticDirectoryApplication(JetforceApplication):
|
||||||
"""
|
"""
|
||||||
Serve a static directory over Gemini.
|
Serve a static directory over Gemini.
|
||||||
|
|
||||||
|
@ -105,44 +198,38 @@ class StaticDirectoryApplication(BaseApplication):
|
||||||
directory listing will be auto-generated.
|
directory listing will be auto-generated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, directory="/var/gemini"):
|
def __init__(self, directory: str = "/var/gemini"):
|
||||||
|
super().__init__()
|
||||||
|
self.routes.append((RoutePattern(), self.serve_static_file))
|
||||||
|
|
||||||
self.root = pathlib.Path(directory).resolve(strict=True)
|
self.root = pathlib.Path(directory).resolve(strict=True)
|
||||||
self.mimetypes = mimetypes.MimeTypes()
|
self.mimetypes = mimetypes.MimeTypes()
|
||||||
|
self.mimetypes.add_type("text/gemini", ".gmi")
|
||||||
|
|
||||||
def __call__(
|
def serve_static_file(self, request: Request):
|
||||||
self, environ: dict, send_status: typing.Callable
|
|
||||||
) -> typing.Iterator[bytes]:
|
|
||||||
url = environ["URL"]
|
|
||||||
url_parts = urllib.parse.urlparse(url)
|
|
||||||
url_path = pathlib.Path(url_parts.path.strip("/"))
|
|
||||||
|
|
||||||
error_message = self.block_proxy_requests(url, environ["HOSTNAME"])
|
url_path = pathlib.Path(request.path.strip("/"))
|
||||||
if error_message:
|
|
||||||
return send_status(STATUS_PROXY_REQUEST_REFUSED, error_message)
|
|
||||||
|
|
||||||
filename = pathlib.Path(os.path.normpath(str(url_path)))
|
filename = pathlib.Path(os.path.normpath(str(url_path)))
|
||||||
if filename.is_absolute() or str(filename.name).startswith(".."):
|
if filename.is_absolute() or str(filename.name).startswith(".."):
|
||||||
# Guard against breaking out of the directory
|
# Guard against breaking out of the directory
|
||||||
return send_status(STATUS_NOT_FOUND, "Not Found")
|
return Response(Status.NOT_FOUND, "Not Found")
|
||||||
|
|
||||||
filesystem_path = self.root / filename
|
filesystem_path = self.root / filename
|
||||||
|
|
||||||
if filesystem_path.is_file():
|
if filesystem_path.is_file():
|
||||||
mimetype = self.guess_mimetype(filesystem_path.name)
|
mimetype = self.guess_mimetype(filesystem_path.name)
|
||||||
send_status(STATUS_SUCCESS, mimetype)
|
generator = self.load_file(filesystem_path)
|
||||||
yield from self.load_file(filesystem_path)
|
return Response(Status.SUCCESS, mimetype, generator)
|
||||||
|
|
||||||
elif filesystem_path.is_dir():
|
elif filesystem_path.is_dir():
|
||||||
gemini_file = filesystem_path / ".gemini"
|
gemini_file = filesystem_path / ".gemini"
|
||||||
if gemini_file.exists():
|
if gemini_file.exists():
|
||||||
send_status(STATUS_SUCCESS, "text/gemini")
|
generator = self.load_file(gemini_file)
|
||||||
yield from self.load_file(gemini_file)
|
|
||||||
else:
|
else:
|
||||||
send_status(STATUS_SUCCESS, "text/gemini")
|
generator = self.list_directory(url_path, filesystem_path)
|
||||||
yield from self.list_directory(url_path, filesystem_path)
|
return Response(Status.SUCCESS, "text/gemini", generator)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return send_status(STATUS_NOT_FOUND, "Not Found")
|
return Response(Status.NOT_FOUND, "Not Found")
|
||||||
|
|
||||||
def load_file(self, filesystem_path: pathlib.Path):
|
def load_file(self, filesystem_path: pathlib.Path):
|
||||||
with filesystem_path.open("rb") as fp:
|
with filesystem_path.open("rb") as fp:
|
||||||
|
@ -152,6 +239,9 @@ class StaticDirectoryApplication(BaseApplication):
|
||||||
data = fp.read(1024)
|
data = fp.read(1024)
|
||||||
|
|
||||||
def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path):
|
def list_directory(self, url_path: pathlib.Path, filesystem_path: pathlib.Path):
|
||||||
|
"""
|
||||||
|
Auto-generate a text/gemini document based on the contents of the file system.
|
||||||
|
"""
|
||||||
yield f"Directory: /{url_path}\r\n".encode()
|
yield f"Directory: /{url_path}\r\n".encode()
|
||||||
if url_path.parent != url_path:
|
if url_path.parent != url_path:
|
||||||
yield f"=>/{url_path.parent}\t..\r\n".encode()
|
yield f"=>/{url_path.parent}\t..\r\n".encode()
|
||||||
|
@ -169,10 +259,8 @@ class StaticDirectoryApplication(BaseApplication):
|
||||||
mime, encoding = self.mimetypes.guess_type(filename)
|
mime, encoding = self.mimetypes.guess_type(filename)
|
||||||
if encoding:
|
if encoding:
|
||||||
return f"{mime}; charset={encoding}"
|
return f"{mime}; charset={encoding}"
|
||||||
elif mime:
|
|
||||||
return mime
|
|
||||||
else:
|
else:
|
||||||
return "text/plain"
|
return mime or "text/plain"
|
||||||
|
|
||||||
|
|
||||||
class GeminiRequestHandler:
|
class GeminiRequestHandler:
|
||||||
|
@ -218,7 +306,7 @@ class GeminiRequestHandler:
|
||||||
await self.parse_header()
|
await self.parse_header()
|
||||||
except Exception:
|
except Exception:
|
||||||
# Malformed request, throw it away and exit immediately
|
# Malformed request, throw it away and exit immediately
|
||||||
self.write_status(STATUS_BAD_REQUEST, "Could not understand request line")
|
self.write_status(Status.BAD_REQUEST, "Could not understand request line")
|
||||||
return await self.close_connection()
|
return await self.close_connection()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -227,7 +315,7 @@ class GeminiRequestHandler:
|
||||||
for data in app:
|
for data in app:
|
||||||
await self.write_body(data)
|
await self.write_body(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.write_status(STATUS_CGI_ERROR, str(e))
|
self.write_status(Status.CGI_ERROR, str(e))
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
await self.close_connection()
|
await self.close_connection()
|
||||||
|
@ -272,7 +360,6 @@ class GeminiRequestHandler:
|
||||||
that must be flushed. This is done so that the status can be updated as
|
that must be flushed. This is done so that the status can be updated as
|
||||||
long as no other data has been written to the stream yet.
|
long as no other data has been written to the stream yet.
|
||||||
"""
|
"""
|
||||||
# TODO: enforce restriction on response meta <= 1024 bytes
|
|
||||||
self.status = status
|
self.status = status
|
||||||
self.meta = meta
|
self.meta = meta
|
||||||
self.response_buffer = f"{status}\t{meta}\r\n"
|
self.response_buffer = f"{status}\t{meta}\r\n"
|
||||||
|
@ -404,7 +491,7 @@ class GeminiServer:
|
||||||
return str(certfile), str(keyfile)
|
return str(certfile), str(keyfile)
|
||||||
|
|
||||||
|
|
||||||
def build_argument_parser() -> argparse.ArgumentParser:
|
def command_line_parser() -> argparse.ArgumentParser:
|
||||||
"""
|
"""
|
||||||
Construct the default argument parser when launching the server from
|
Construct the default argument parser when launching the server from
|
||||||
the command line.
|
the command line.
|
||||||
|
@ -431,7 +518,7 @@ def run_server() -> None:
|
||||||
"""
|
"""
|
||||||
Entry point for running the command line static directory server.
|
Entry point for running the command line static directory server.
|
||||||
"""
|
"""
|
||||||
parser = build_argument_parser()
|
parser = command_line_parser()
|
||||||
parser.add_argument("--dir", help="local directory to serve", default="/var/gemini")
|
parser.add_argument("--dir", help="local directory to serve", default="/var/gemini")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue