jetforce/jetforce.py

688 lines
22 KiB
Python
Raw Normal View History

2019-08-06 04:21:28 +02:00
#!/usr/bin/env python3.7
from __future__ import annotations
2019-08-04 19:52:54 +02:00
import argparse
import asyncio
2019-08-27 05:41:10 +02:00
import codecs
2019-08-23 00:53:02 +02:00
import dataclasses
2019-08-04 19:52:54 +02:00
import datetime
2019-08-05 03:42:27 +02:00
import mimetypes
import os
2019-08-06 04:21:28 +02:00
import pathlib
2019-08-23 00:53:02 +02:00
import re
2019-08-04 19:52:54 +02:00
import ssl
import subprocess
2019-08-04 19:52:54 +02:00
import sys
import tempfile
2019-08-06 00:47:59 +02:00
import typing
import urllib.parse
2019-08-04 19:52:54 +02:00
2019-08-23 00:53:02 +02:00
# Fail early to avoid crashing with some obscure error
2019-08-06 04:21:28 +02:00
if sys.version_info < (3, 7):
sys.exit("Fatal Error: jetforce requires Python 3.7+")
2019-08-23 01:34:04 +02:00
__version__ = "0.0.6"
2019-08-04 19:52:54 +02:00
__title__ = "Jetforce Gemini Server"
__author__ = "Michael Lazar"
__license__ = "GNU General Public License v3.0"
__copyright__ = "(c) 2019 Michael Lazar"
ABOUT = fr"""
2019-08-05 03:42:27 +02:00
You are now riding on...
2019-08-04 19:52:54 +02:00
_________ _____________
______ /______ /___ __/_______________________
___ _ /_ _ \ __/_ /_ _ __ \_ ___/ ___/ _ \
/ /_/ / / __/ /_ _ __/ / /_/ / / / /__ / __/
\____/ \___/\__/ /_/ \____//_/ \___/ \___/
2019-08-05 03:42:27 +02:00
An Experimental Gemini Server, v{__version__}
2019-08-04 19:52:54 +02:00
https://github.com/michael-lazar/jetforce
"""
2019-08-23 00:53:02 +02:00
class Status:
"""
Gemini response status codes.
"""
INPUT = 10
SUCCESS = 20
SUCCESS_END_OF_SESSION = 21
REDIRECT_TEMPORARY = 30
REDIRECT_PERMANENT = 31
2019-08-23 00:53:02 +02:00
TEMPORARY_FAILURE = 40
SERVER_UNAVAILABLE = 41
CGI_ERROR = 42
PROXY_ERROR = 43
SLOW_DOWN = 44
2019-08-23 00:53:02 +02:00
PERMANENT_FAILURE = 50
NOT_FOUND = 51
GONE = 52
PROXY_REQUEST_REFUSED = 53
BAD_REQUEST = 59
2019-08-23 00:53:02 +02:00
CLIENT_CERTIFICATE_REQUIRED = 60
TRANSIENT_CERTIFICATE_REQUESTED = 61
AUTHORISED_CERTIFICATE_REQUIRED = 62
CERTIFICATE_NOT_ACCEPTED = 63
FUTURE_CERTIFICATE_REJECTED = 64
EXPIRED_CERTIFICATE_REJECTED = 65
2019-08-04 19:52:54 +02:00
2019-08-23 00:53:02 +02:00
class Request:
2019-08-04 19:52:54 +02:00
"""
2019-08-23 00:53:02 +02:00
Object that encapsulates information about a single gemini request.
"""
def __init__(self, environ: dict):
self.environ = environ
2019-08-27 05:41:10 +02:00
self.url = environ["GEMINI_URL"]
url_parts = urllib.parse.urlparse(self.url)
self.scheme = url_parts.scheme
self.hostname = url_parts.hostname
self.port = url_parts.port
self.path = url_parts.path
self.params = url_parts.params
self.query = urllib.parse.unquote(url_parts.query)
2019-08-27 05:41:10 +02:00
self.fragment = url_parts.fragment
2019-08-23 00:53:02 +02:00
@dataclasses.dataclass
class Response:
"""
Object that encapsulates information about a single gemini response.
2019-08-04 19:52:54 +02:00
"""
2019-08-23 00:53:02 +02:00
status: int
meta: str
body: typing.Union[None, bytes, str, typing.Iterator[bytes]] = None
2019-08-04 19:52:54 +02:00
2019-08-23 00:53:02 +02:00
@dataclasses.dataclass
class RoutePattern:
"""
A pattern for matching URLs with a single endpoint or route.
"""
path: str = ""
scheme: str = "gemini"
strict_hostname: bool = True
strict_trailing_slash: bool = False
def match(self, request: Request) -> bool:
"""
2019-08-23 00:53:02 +02:00
Check if the given request URL matches this route pattern.
"""
2019-08-23 00:53:02 +02:00
server_hostname = request.environ["HOSTNAME"]
2019-08-04 19:52:54 +02:00
2019-08-23 00:53:02 +02:00
if self.strict_hostname and request.hostname != server_hostname:
return False
if self.scheme and self.scheme != request.scheme:
return False
2019-08-04 19:52:54 +02:00
2019-08-23 00:53:02 +02:00
if self.strict_trailing_slash:
request_path = request.path
else:
request_path = request.path.rstrip("/")
return bool(re.match(self.path, request_path))
class JetforceApplication:
"""
Base Jetforce application class with primitive URL routing.
"""
def __init__(self):
self.routes: typing.List[
typing.Tuple[RoutePattern, typing.Callable[[Request], Response]]
] = []
2019-08-28 05:44:07 +02:00
def __call__(
self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]:
2019-08-23 00:53:02 +02:00
request = Request(environ)
for route_pattern, callback in self.routes[::-1]:
if route_pattern.match(request):
response = callback(request)
send_status(response.status, response.meta)
if response.body:
if isinstance(response.body, bytes):
yield response.body
elif isinstance(response.body, str):
yield response.body.encode()
else:
yield from response.body
break
else:
send_status(Status.NOT_FOUND, "Not Found")
def route(
self,
path: str = "",
scheme: str = "gemini",
strict_hostname: bool = True,
strict_trailing_slash: bool = False,
) -> typing.Callable:
"""
Decorator for binding a function to a route based on the URL path.
app = JetforceApplication()
@app.route('/my-path')
def my_path(request):
return Response(Status.SUCCESS, 'text/plain', 'Hello world!')
"""
route_pattern = RoutePattern(
path, scheme, strict_hostname, strict_trailing_slash
)
def wrap(func: typing.Callable) -> typing.Callable:
self.routes.append((route_pattern, func))
return func
return wrap
class StaticDirectoryApplication(JetforceApplication):
2019-08-05 03:42:27 +02:00
"""
Serve a static directory over Gemini.
If a directory contains a hidden file with the name ".gemini", that file
will be returned when the directory path is requested. Otherwise, a
directory listing will be auto-generated.
"""
2019-08-27 05:41:10 +02:00
def __init__(
self,
root_directory: str = "/var/gemini",
index_file: str = "index.gmi",
cgi_directory: str = "cgi-bin",
2019-08-27 05:41:10 +02:00
):
2019-08-23 00:53:02 +02:00
super().__init__()
self.routes.append((RoutePattern(), self.serve_static_file))
2019-08-27 05:41:10 +02:00
self.root = pathlib.Path(root_directory).resolve(strict=True)
self.cgi_directory = cgi_directory.strip("/") + "/"
2019-08-23 15:45:24 +02:00
self.index_file = index_file
2019-08-05 03:42:27 +02:00
self.mimetypes = mimetypes.MimeTypes()
2019-08-23 00:53:02 +02:00
self.mimetypes.add_type("text/gemini", ".gmi")
2019-08-23 15:45:24 +02:00
self.mimetypes.add_type("text/gemini", ".gemini")
2019-08-05 03:42:27 +02:00
2019-08-28 05:44:07 +02:00
def serve_static_file(self, request: Request) -> Response:
2019-08-27 15:47:53 +02:00
"""
Convert a URL into a filesystem path, and attempt to serve the file
or directory that is represented at that path.
"""
2019-08-23 00:53:02 +02:00
url_path = pathlib.Path(request.path.strip("/"))
2019-08-05 03:42:27 +02:00
filename = pathlib.Path(os.path.normpath(str(url_path)))
2019-08-07 05:47:11 +02:00
if filename.is_absolute() or str(filename.name).startswith(".."):
2019-08-05 03:42:27 +02:00
# Guard against breaking out of the directory
2019-08-23 00:53:02 +02:00
return Response(Status.NOT_FOUND, "Not Found")
filesystem_path = self.root / filename
2019-08-05 03:42:27 +02:00
2019-08-06 04:21:28 +02:00
if filesystem_path.is_file():
2019-08-27 15:47:53 +02:00
is_cgi = str(filename).startswith(self.cgi_directory)
is_exe = os.access(filesystem_path, os.X_OK)
if is_cgi and is_exe:
return self.run_cgi_script(filesystem_path, request.environ)
else:
mimetype = self.guess_mimetype(filesystem_path.name)
generator = self.load_file(filesystem_path)
return Response(Status.SUCCESS, mimetype, generator)
2019-08-06 04:21:28 +02:00
elif filesystem_path.is_dir():
2019-08-23 15:45:24 +02:00
index_file = filesystem_path / self.index_file
if index_file.exists():
generator = self.load_file(index_file)
2019-08-27 15:47:53 +02:00
return Response(Status.SUCCESS, "text/gemini", generator)
2019-08-05 03:42:27 +02:00
else:
2019-08-23 00:53:02 +02:00
generator = self.list_directory(url_path, filesystem_path)
2019-08-27 15:47:53 +02:00
return Response(Status.SUCCESS, "text/gemini", generator)
2019-08-05 03:42:27 +02:00
else:
2019-08-23 00:53:02 +02:00
return Response(Status.NOT_FOUND, "Not Found")
2019-08-05 03:42:27 +02:00
2019-08-28 05:44:07 +02:00
def run_cgi_script(self, filesystem_path: pathlib.Path, environ: dict) -> Response:
"""
Execute the given file as a CGI script and return the script's stdout
stream to the client.
"""
2019-08-27 05:41:10 +02:00
script_name = str(filesystem_path)
cgi_env = environ.copy()
cgi_env["GATEWAY_INTERFACE"] = "GCI/1.1"
cgi_env["SCRIPT_NAME"] = script_name
# Decode the stream as unicode so we can parse the status line
# Use surrogateescape to preserve any non-UTF8 byte sequences.
2019-08-27 05:41:10 +02:00
out = subprocess.Popen(
[script_name],
stdout=subprocess.PIPE,
env=cgi_env,
bufsize=1,
universal_newlines=True,
errors="surrogateescape",
)
2019-08-27 15:47:53 +02:00
status_line = out.stdout.readline().strip()
status_parts = status_line.split(maxsplit=1)
if len(status_parts) != 2 or not status_parts[0].isdecimal():
2019-08-27 16:38:38 +02:00
return Response(Status.CGI_ERROR, "Unexpected Error")
2019-08-27 15:47:53 +02:00
status, meta = status_parts
2019-08-27 05:41:10 +02:00
# Re-encode the rest of the body as bytes
body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
2019-08-27 15:47:53 +02:00
return Response(int(status), meta, body)
2019-08-27 05:41:10 +02:00
2019-08-28 05:44:07 +02:00
def load_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]:
2019-08-27 15:47:53 +02:00
"""
Load a file using a generator to allow streaming data to the TCP socket.
"""
2019-08-06 04:21:28 +02:00
with filesystem_path.open("rb") as fp:
2019-08-05 03:42:27 +02:00
data = fp.read(1024)
while data:
yield data
data = fp.read(1024)
2019-08-28 05:44:07 +02:00
def list_directory(
self, url_path: pathlib.Path, filesystem_path: pathlib.Path
) -> typing.Iterator[bytes]:
2019-08-23 00:53:02 +02:00
"""
Auto-generate a text/gemini document based on the contents of the file system.
"""
2019-08-06 04:21:28 +02:00
yield f"Directory: /{url_path}\r\n".encode()
if url_path.parent != url_path:
yield f"=>/{url_path.parent}\t..\r\n".encode()
for file in sorted(filesystem_path.iterdir()):
if file.name.startswith((".", "~")):
# Skip hidden and temporary files for security reasons
2019-08-05 03:42:27 +02:00
continue
2019-08-06 04:21:28 +02:00
elif file.is_dir():
yield f"=>/{url_path / file.name}\t{file.name}/\r\n".encode()
else:
yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
2019-08-05 03:42:27 +02:00
2019-08-28 05:44:07 +02:00
def guess_mimetype(self, filename: str) -> str:
2019-08-27 15:47:53 +02:00
"""
Guess the mimetype of a file based on the file extension.
"""
2019-08-05 03:42:27 +02:00
mime, encoding = self.mimetypes.guess_type(filename)
if encoding:
return f"{mime}; charset={encoding}"
else:
2019-08-23 00:53:02 +02:00
return mime or "text/plain"
2019-08-05 03:42:27 +02:00
2019-08-04 19:52:54 +02:00
class GeminiRequestHandler:
"""
Handle a single Gemini Protocol TCP request.
This design borrows heavily from the standard library's HTTP request
handler (http.server.BaseHTTPRequestHandler). However, I did not make any
attempts to directly emulate the existing conventions, because Gemini is an
inherently simpler protocol than HTTP and much of the boilerplate could be
removed or slimmed-down.
"""
def __init__(self, server: GeminiServer, app: typing.Callable) -> None:
2019-08-04 19:52:54 +02:00
self.server = server
self.app = app
2019-08-06 00:47:59 +02:00
self.reader: typing.Optional[asyncio.StreamReader] = None
self.writer: typing.Optional[asyncio.StreamWriter] = None
self.received_timestamp: typing.Optional[datetime.datetime] = None
2019-08-06 15:55:26 +02:00
self.remote_addr: typing.Optional[str] = None
2019-08-29 05:52:39 +02:00
self.client_cert: typing.Optional[dict] = None
self.url: typing.Optional[urllib.parse.ParseResult] = None
2019-08-06 00:47:59 +02:00
self.status: typing.Optional[int] = None
self.meta: typing.Optional[str] = None
2019-08-06 00:47:59 +02:00
self.response_buffer: typing.Optional[str] = None
2019-08-04 19:52:54 +02:00
self.response_size: int = 0
async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""
Main method for the request handler, performs the following:
1. Read the request bytes from the reader stream
2. Parse the request and generate response data
3. Write the response bytes to the writer stream
"""
self.reader = reader
self.writer = writer
2019-08-06 15:55:26 +02:00
self.remote_addr = writer.get_extra_info("peername")[0]
2019-08-29 05:52:39 +02:00
self.client_cert = writer.get_extra_info("peercert")
2019-08-04 19:52:54 +02:00
self.received_timestamp = datetime.datetime.utcnow()
try:
await self.parse_header()
2019-08-04 19:52:54 +02:00
except Exception:
# Malformed request, throw it away and exit immediately
2019-08-29 05:52:39 +02:00
self.write_status(Status.BAD_REQUEST, "Malformed request")
return await self.close_connection()
2019-08-04 19:52:54 +02:00
try:
environ = self.build_environ()
app = self.app(environ, self.write_status)
for data in app:
2019-08-05 03:42:27 +02:00
await self.write_body(data)
2019-08-04 19:52:54 +02:00
except Exception as e:
2019-08-23 00:53:02 +02:00
self.write_status(Status.CGI_ERROR, str(e))
2019-08-04 19:52:54 +02:00
raise
finally:
await self.close_connection()
2019-08-04 19:52:54 +02:00
2019-08-06 00:47:59 +02:00
def build_environ(self) -> typing.Dict[str, typing.Any]:
2019-08-04 19:52:54 +02:00
"""
Construct a dictionary that will be passed to the application handler.
2019-08-27 05:41:10 +02:00
Variable names conform to the CGI spec defined in RFC 3875.
2019-08-04 19:52:54 +02:00
"""
2019-08-27 05:41:10 +02:00
url_parts = urllib.parse.urlparse(self.url)
2019-08-29 05:52:39 +02:00
environ = {
2019-08-27 05:41:10 +02:00
"GEMINI_URL": self.url,
"HOSTNAME": self.server.hostname,
2019-08-27 05:41:10 +02:00
"PATH_INFO": url_parts.path,
"QUERY_STRING": url_parts.query,
2019-08-29 05:52:39 +02:00
"REMOTE_ADDR": self.remote_addr,
"REMOTE_HOST": self.remote_addr,
2019-08-27 05:41:10 +02:00
"SERVER_NAME": self.server.hostname,
"SERVER_PORT": str(self.server.port),
"SERVER_PROTOCOL": "GEMINI",
"SERVER_SOFTWARE": f"jetforce/{__version__}",
2019-08-04 19:52:54 +02:00
}
2019-08-29 05:52:39 +02:00
if self.client_cert:
subject = dict(x[0] for x in self.client_cert["subject"])
environ.update(
{"AUTH_TYPE": "CERTIFICATE", "REMOTE_USER": subject["commonName"]}
)
return environ
async def parse_header(self) -> None:
2019-08-04 19:52:54 +02:00
"""
Parse the gemini header line.
2019-08-04 19:52:54 +02:00
The request is a single UTF-8 line formatted as: <URL>\r\n
2019-08-04 19:52:54 +02:00
"""
data = await self.reader.readuntil(b"\r\n")
data = data[:-2] # strip the line ending
if len(data) > 1024:
raise ValueError("URL exceeds max length of 1024 bytes")
self.url = data.decode()
def write_status(self, status: int, meta: str) -> None:
2019-08-04 19:52:54 +02:00
"""
Write the gemini status line to an internal buffer.
The status line is a single UTF-8 line formatted as:
<code>\t<meta>\r\n
2019-08-04 19:52:54 +02:00
If the response status is 2, the meta field will contain the mimetype
of the response data sent. If the status is something else, the meta
2019-08-04 19:52:54 +02:00
will contain a descriptive message.
The status is not written immediately, it's added to an internal buffer
that must be flushed. This is done so that the status can be updated as
long as no other data has been written to the stream yet.
"""
self.status = status
self.meta = meta
self.response_buffer = f"{status}\t{meta}\r\n"
2019-08-04 19:52:54 +02:00
2019-08-05 03:42:27 +02:00
async def write_body(self, data: bytes) -> None:
2019-08-04 19:52:54 +02:00
"""
Write bytes to the gemini response body.
"""
2019-08-05 03:42:27 +02:00
await self.flush_status()
2019-08-04 19:52:54 +02:00
self.response_size += len(data)
self.writer.write(data)
2019-08-05 03:42:27 +02:00
await self.writer.drain()
2019-08-04 19:52:54 +02:00
2019-08-05 03:42:27 +02:00
async def flush_status(self) -> None:
2019-08-04 19:52:54 +02:00
"""
Flush the status line from the internal buffer to the socket stream.
"""
if self.response_buffer and not self.response_size:
data = self.response_buffer.encode()
self.response_size += len(data)
self.writer.write(data)
2019-08-05 03:42:27 +02:00
await self.writer.drain()
2019-08-04 19:52:54 +02:00
self.response_buffer = None
async def close_connection(self) -> None:
"""
Flush any remaining bytes and close the stream.
"""
await self.flush_status()
self.log_request()
await self.writer.drain()
2019-08-04 19:52:54 +02:00
def log_request(self) -> None:
"""
Log a gemini request using a format derived from the Common Log Format.
"""
self.server.log_message(
2019-08-06 15:55:26 +02:00
f"{self.remote_addr} "
2019-08-04 19:52:54 +02:00
f"[{self.received_timestamp:%d/%b/%Y:%H:%M:%S +0000}] "
f'"{self.url}" '
2019-08-04 19:52:54 +02:00
f"{self.status} "
f'"{self.meta}" '
2019-08-04 19:52:54 +02:00
f"{self.response_size}"
)
class GeminiServer:
"""
An asynchronous TCP server that understands the Gemini Protocol.
"""
request_handler_class = GeminiRequestHandler
def __init__(
self,
app: typing.Callable,
host: str = "127.0.0.1",
port: int = 1965,
ssl_context: ssl.SSLContext = None,
hostname: str = "localhost",
2019-08-04 19:52:54 +02:00
) -> None:
2019-08-04 19:52:54 +02:00
self.host = host
self.port = port
self.hostname = hostname
2019-08-04 19:52:54 +02:00
self.app = app
self.ssl_context = ssl_context
2019-08-04 19:52:54 +02:00
async def run(self) -> None:
"""
The main asynchronous server loop.
"""
self.log_message(ABOUT)
server = await asyncio.start_server(
self.accept_connection, self.host, self.port, ssl=self.ssl_context
)
socket_info = server.sockets[0].getsockname()
self.log_message(f"Server hostname is {self.hostname}")
2019-08-05 03:42:27 +02:00
self.log_message(f"Listening on {socket_info[0]}:{socket_info[1]}")
2019-08-04 19:52:54 +02:00
async with server:
await server.serve_forever()
async def accept_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
2019-08-28 05:44:07 +02:00
) -> None:
2019-08-04 19:52:54 +02:00
"""
Hook called by the socket server when a new connection is accepted.
"""
request_handler = self.request_handler_class(self, self.app)
try:
await request_handler.handle(reader, writer)
finally:
writer.close()
2019-08-28 05:44:07 +02:00
def log_message(self, message: str) -> None:
2019-08-04 19:52:54 +02:00
"""
Log a diagnostic server message.
"""
print(message, file=sys.stderr)
def generate_ad_hoc_certificate(hostname: str) -> typing.Tuple[str, str]:
"""
Utility function to generate a self-signed SSL certificate key pair if
one isn't provided. Results may vary depending on your version of OpenSSL.
"""
certfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.crt"
keyfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.key"
if not certfile.exists() or not keyfile.exists():
print(f"Writing ad hoc TLS certificate to {certfile}")
subprocess.run(
[
f"openssl req -newkey rsa:2048 -nodes -keyout {keyfile}"
f' -nodes -x509 -out {certfile} -subj "/CN={hostname}"'
],
shell=True,
check=True,
)
return str(certfile), str(keyfile)
def make_ssl_context(
hostname: str = "localhost",
certfile: typing.Optional[str] = None,
keyfile: typing.Optional[str] = None,
cafile: typing.Optional[str] = None,
capath: typing.Optional[str] = None,
) -> ssl.SSLContext:
"""
Generate a sane default SSL context for a Gemini server.
For more information on what these variables mean and what values they can
contain, see the python standard library documentation:
https://docs.python.org/3/library/ssl.html#ssl-contexts
verify_mode: ssl.CERT_OPTIONAL
A client certificate request is sent to the client. The client may
either ignore the request or send a certificate in order perform TLS
client cert authentication. If the client chooses to send a certificate,
it is verified. Any verification error immediately aborts the TLS
handshake.
"""
if certfile is None:
certfile, keyfile = generate_ad_hoc_certificate(hostname)
context = ssl.SSLContext()
context.verify_mode = ssl.CERT_OPTIONAL
context.load_cert_chain(certfile, keyfile)
if not cafile and not capath:
# Load from the system's default client CA directory
context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
else:
# Use a custom CA for validating client certificates
context.load_verify_locations(cafile, capath)
return context
2019-08-23 00:53:02 +02:00
def command_line_parser() -> argparse.ArgumentParser:
2019-08-06 15:55:26 +02:00
"""
Construct the default argument parser when launching the server from
the command line. These are meant to be application-agnostic arguments
that could apply to any subclass of the JetforceApplication.
2019-08-06 15:55:26 +02:00
"""
2019-08-04 20:11:22 +02:00
parser = argparse.ArgumentParser(
prog="jetforce",
description="An Experimental Gemini Protocol Server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--host", help="Server address to bind to", default="127.0.0.1")
parser.add_argument("--port", help="Server port to bind to", type=int, default=1965)
parser.add_argument("--hostname", help="Server hostname", default="localhost")
parser.add_argument(
"--tls-certfile",
dest="certfile",
help="Server TLS certificate file",
metavar="FILE",
)
parser.add_argument(
"--tls-keyfile",
dest="keyfile",
help="Server TLS private key file",
metavar="FILE",
)
parser.add_argument(
"--tls-cafile",
dest="cafile",
help="A CA file to use for validating clients",
metavar="FILE",
)
parser.add_argument(
"--tls-capath",
dest="capath",
help="A directory containing CA files for validating clients",
metavar="DIR",
)
return parser
2019-08-04 19:52:54 +02:00
def run_server() -> None:
"""
2019-08-28 05:45:10 +02:00
Entry point for running the static directory server.
"""
2019-08-23 00:53:02 +02:00
parser = command_line_parser()
2019-08-23 15:45:24 +02:00
parser.add_argument(
"--dir",
help="Root directory on the filesystem to serve",
default="/var/gemini",
metavar="DIR",
2019-08-27 05:41:10 +02:00
)
parser.add_argument(
"--cgi-dir",
help="CGI script directory, relative to the server's root directory",
default="cgi-bin",
metavar="DIR",
2019-08-23 15:45:24 +02:00
)
parser.add_argument(
"--index-file",
help="If a directory contains a file with this name, that file will be "
"served instead of auto-generating an index page",
default="index.gmi",
metavar="FILE",
2019-08-23 15:45:24 +02:00
)
args = parser.parse_args()
2019-08-05 03:42:27 +02:00
app = StaticDirectoryApplication(args.dir, args.index_file, args.cgi_dir)
ssl_context = make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
2019-08-04 19:52:54 +02:00
server = GeminiServer(
host=args.host,
port=args.port,
ssl_context=ssl_context,
hostname=args.hostname,
app=app,
2019-08-04 19:52:54 +02:00
)
asyncio.run(server.run())
2019-08-04 20:11:22 +02:00
if __name__ == "__main__":
run_server()