Merge pull request #23 from michael-lazar/v0.3.0

V0.3.0
This commit is contained in:
Michael Lazar 2020-05-23 21:46:00 -04:00 committed by GitHub
commit b4fd0919eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1381 additions and 1708 deletions

View File

@ -1,8 +1,10 @@
# Jetforce Changelog
### Unreleased
### v0.3.0 (pre-release)
- Allow a client certificate subject's CN to be blank.
- The ``jetforce-diagnostics`` script has been split off into a separate
repository at [gemini-diagnostics](https://github.com/michael-lazar/gemini-diagnostics).
### v0.2.2 (2012-03-31)

View File

@ -1,19 +1,16 @@
# Jetforce
An experimental TCP server for the new, under development Gemini Protocol.
Learn more about Gemini [here](https://gopher.commons.host/gopher://zaibatsu.circumlunar.space/1/~solderpunk/gemini).
Learn more about Gemini [here](https://portal.mozz.us/).
![Rocket Launch](resources/rocket.jpg)
![Rocket Launch](logo.jpg)
## Features
- A built-in static file server with support for gemini directories and
CGI scripts.
- Lightweight, single-file framework with zero external dependencies.
- Modern python codebase with type hinting and black style formatting.
- Supports concurrent connections using an asynchronous event loop.
- Extendable components that loosely implement the [WSGI](https://en.wikipedia.org/wiki/Web_Server_Gateway_Interface)
server/application pattern.
- A built-in static file server with support for gemini directories and CGI scripts.
- An extendable application framework that loosely mimics the [WSGI](https://en.wikipedia.org/wiki/Web_Server_Gateway_Interface) interface.
- A lean, modern python codebase with type hints and black formatting.
- A solid foundation built on top of the [twisted](https://twistedmatrix.com/trac/) networking engine.
## Installation
@ -25,12 +22,12 @@ The latest release can be installed from [PyPI](https://pypi.org/project/Jetforc
$ pip install jetforce
```
Or, clone the repository and run the script directly:
Or, install from source:
```bash
$ git clone https://github.com/michael-lazar/jetforce
$ cd jetforce
$ python3 jetforce.py
$ python setup.py install
```
## Usage
@ -91,30 +88,28 @@ The gemini specification *requires* that all connections be sent over TLS.
If you do not provide a TLS certificate file using the ``--tls-certfile`` flag,
jetforce will automatically generate a temporary cert for you to use. This is
great for making development easier, but before you expose your server to the
public internet you should configure something more permanent. You can generate
public internet you should setup something more permanent. You can generate
your own self-signed server certificate, or obtain one from a Certificate
Authority like [Let's Encrypt](https://letsencrypt.org).
Here's the OpenSSL command that jetforce uses to generate a self-signed cert:
Here's an example `openssl` command that you can use to generate a self-signed certificate:
```
$ openssl req -newkey rsa:2048 -nodes -keyout {hostname}.key \
-nodes -x509 -out {hostname}.crt -subj "/CN={hostname}"
```
Jetforce also supports verified client TLS certificates. You can specify your
client CA with the ``--tls-cafile`` or ``--tls-capath`` flags. Verified
connections will have the ``REMOTE_USER`` variable added to their environment,
which contains the client certificate's CN attribute. Instructions on how to
generate TLS client certificates are outside of the scope of this readme, but
Jetforce also supports TLS client certificates (both self-signed and CA verified).
Connections made with a client certificate will have additional metadata included
in the request environment. ``REMOTE_USER`` will contain the subject common name,
and ``TLS_CLIENT_HASH`` will contain a fingerprint that can be used for TOFU pinning.
You can specify a CA for client validation with the ``--tls-cafile`` or ``--tls-capath``
flags. Connections validated by the CA will have the ``TLS_CLIENT_VERIFIED`` flag set to
True. Instructions on how to generate CA's are outside of the scope of this readme, but
you can find many helpful tutorials
[online](https://www.makethenmakeinstall.com/2014/05/ssl-client-authentication-step-by-step/).
There are currently no plans to support unverified (transient) client
certificates. This is due to a technical limitation of the python standard
library's ``ssl`` module, which is described in detail
[here](https://portal.mozz.us/gemini/mozz.us/journal/2019-08-21_transient_tls_certs.gmi).
### Static Files
Jetforce will serve static files in the ``/var/gemini/`` directory:

44
examples/counter.py Normal file
View File

@ -0,0 +1,44 @@
"""
An endpoint that streams incrementing numbers forever.
This is an example of how a jetforce application can respond with a generator
function instead of plain text/bytes. The server will iterate over the
generator and write the data to the socket in-between each iteration. This can
be useful if you want to serve a large response, like a binary file, without
loading the entire response into memory at once.
The server will schedule your application code to be run inside of a separate
thread, using twisted's built-in thread pool. So even though the counter
function contains a sleep(), it will not block the server from handling other
requests. Try requesting this endpoint over two connections simultaneously.
> jetforce-client gemini://localhost
> jetforce-client gemini://localhost
"""
import time
from jetforce import GeminiServer, JetforceApplication, Response, Status
def counter():
"""
Generator function that counts to .
"""
x = 0
while True:
time.sleep(1)
x += 1
yield f"{x}\r\n"
app = JetforceApplication()
@app.route()
def index(request):
return Response(Status.SUCCESS, "text/plain", counter())
if __name__ == "__main__":
server = GeminiServer(app)
server.run()

32
examples/echo.py Normal file
View File

@ -0,0 +1,32 @@
"""
A bare-bones server that with echo back the request to the client.
This example demonstrates the simplest proof-of-concept of how you can write
your own application from scratch instead of sub-classing from the provided
JetforceApplication. The server/application interface is almost identical to
WSGI defined in PEP-3333 [1].
Unless you're feeling adventurous, you probably want to stick to the
JetforceApplication instead of going this low-level.
[1] https://www.python.org/dev/peps/pep-3333/#id20
"""
import jetforce
def app(environ, send_status):
"""
Arguments:
environ: A dictionary containing information about the request
send_status: A callback function that takes two parameters: The
response status (int) and the response meta text (str).
Returns: A generator containing the response body.
"""
send_status(10, "text/gemini")
yield f"Received path: {environ['GEMINI_URL']}"
if __name__ == "__main__":
server = jetforce.GeminiServer(app)
server.run()

View File

@ -1,27 +0,0 @@
"""
A simple Gemini server that echos back the request to the client.
"""
import asyncio
import jetforce
def echo(environ, send_status):
url = environ["GEMINI_URL"]
send_status(jetforce.Status.SUCCESS, "text/gemini")
yield f"Received path: {url}".encode()
if __name__ == "__main__":
args = jetforce.command_line_parser().parse_args()
ssl_context = jetforce.make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
server = jetforce.GeminiServer(
host=args.host,
port=args.port,
ssl_context=ssl_context,
hostname=args.hostname,
app=echo,
)
asyncio.run(server.run())

View File

@ -1,60 +1,68 @@
"""
A guestbook application that accepts input from guests and stores messages in
a simple text file.
A simple guestbook application that accepts and displays text messages.
This is an example of how to return a 10 INPUT request to the client and
retrieve their response by parsing the URL query string.
This example stores the guestbook inside of a persistent sqlite database.
Because each request will run inside of a separate thread, we must create a new
connection object inside of the request handler instead of re-using a global
database connection. This thread-safety can be disabled in sqlite3 by using the
check_same_thread=False argument, but then it's up to you to ensure that only
connection request is writing to the database at any given time.
"""
import asyncio
import pathlib
import sqlite3
from datetime import datetime
import jetforce
from jetforce import Response, Status
from jetforce import GeminiServer, JetforceApplication, Response, Status
guestbook = pathlib.Path("guestbook.txt")
DB = "/tmp/guestbook.sqlite"
SCHEMA = """
CREATE TABLE IF NOT EXISTS guestbook (
ip_address TEXT,
created_at timestamp,
message TEXT
)
"""
with sqlite3.connect(DB) as c:
c.execute(SCHEMA)
app = jetforce.JetforceApplication()
app = JetforceApplication()
@app.route("", strict_trailing_slash=False)
def index(request):
data = ["Guestbook", "=>/submit Sign the Guestbook", ""]
lines = ["Guestbook", "=>/submit Sign the Guestbook"]
guestbook.touch(exist_ok=True)
with guestbook.open("r") as fp:
for line in fp:
line = line.strip()
if line.startswith("=>"):
data.append(line[2:])
else:
data.append(line)
with sqlite3.connect(DB, detect_types=sqlite3.PARSE_DECLTYPES) as c:
for row in c.execute("SELECT * FROM guestbook ORDER BY created_at"):
ip_address, created_at, message = row
line = f"{created_at:%Y-%m-%d} - [{ip_address}] {message}"
lines.append("")
lines.append(line)
data.extend(["", "...", ""])
return Response(Status.SUCCESS, "text/gemini", "\n".join(data))
lines.extend(["", "...", ""])
body = "\n".join(lines)
return Response(Status.SUCCESS, "text/gemini", body)
@app.route("/submit")
def submit(request):
if request.query:
message = request.query[:256]
created = datetime.utcnow()
with guestbook.open("a") as fp:
fp.write(f"\n[{created:%Y-%m-%d %I:%M %p}]\n{message}\n")
created = datetime.now()
ip_address = request.environ["REMOTE_HOST"]
with sqlite3.connect(DB) as c:
values = (ip_address, created, message)
c.execute("INSERT INTO guestbook VALUES (?, ?, ?)", values)
return Response(Status.REDIRECT_TEMPORARY, "")
else:
return Response(Status.INPUT, "Enter your message (max 256 characters)")
if __name__ == "__main__":
args = jetforce.command_line_parser().parse_args()
ssl_context = jetforce.make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
server = jetforce.GeminiServer(
host=args.host,
port=args.port,
ssl_context=ssl_context,
hostname=args.hostname,
app=app,
)
asyncio.run(server.run())
server = GeminiServer(app)
server.run()

View File

@ -1,15 +1,22 @@
"""
This is an example of setting up a Gemini server to proxy requests to other
protocols. This application will accept HTTP URLs, download and render them
locally using the `w3m` tool, and render the output to the client as plain text.
A server that proxies HTTP websites over gemini.
This example demonstrates how your application routes aren't just limited to
gemini URLs. The server will accept any HTTP URL, download the page and
render it using the external `w3m` tool, and then render the output to the
client as plain-text.
Most gemini clients won't be able to make this request, because the hostname
in the URL doesn't match the hostname of the server. You can test this out
using jetforce-client like this:
> jetforce-client https://mozz.us --host localhost
"""
import asyncio
import subprocess
import jetforce
from jetforce import Response, Status
from jetforce import GeminiServer, JetforceApplication, Response, Status
app = jetforce.JetforceApplication()
app = JetforceApplication()
@app.route(scheme="https", strict_hostname=False)
@ -26,15 +33,5 @@ def proxy_request(request):
if __name__ == "__main__":
args = jetforce.command_line_parser().parse_args()
ssl_context = jetforce.make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
server = jetforce.GeminiServer(
host=args.host,
port=args.port,
ssl_context=ssl_context,
hostname=args.hostname,
app=app,
)
asyncio.run(server.run())
server = GeminiServer(app)
server.run()

36
examples/vhost.py Normal file
View File

@ -0,0 +1,36 @@
"""
A server that implements virtual hosting for multiple subdomains.
This is a basic example of you how can run multiple apps from the same server
by creating a composite application.
> jetforce-client gemini://apple.localhost --host localhost
> jetforce-client gemini://banana.localhost --host localhost
"""
from jetforce import GeminiServer, JetforceApplication, Response, Status
from jetforce.app.composite import CompositeApplication
apple = JetforceApplication()
@apple.route()
def index(request):
return Response(Status.SUCCESS, "text/plain", "apple!")
banana = JetforceApplication()
@banana.route()
def index(request):
return Response(Status.SUCCESS, "text/plain", "banana!")
composite_app = CompositeApplication(
{"apple.localhost": apple, "banana.localhost": banana}
)
if __name__ == "__main__":
server = GeminiServer(composite_app)
server.run()

View File

@ -1,31 +0,0 @@
"""
This is an example of using virtual hosting to serve URLs for multiple
subdomains from a single jetforce server.
"""
import asyncio
import jetforce
from jetforce import Response, Status
app = jetforce.JetforceApplication()
@app.route(hostname="apple.localhost")
def serve_apple_domain(request):
return Response(Status.SUCCESS, "text/plain", f"apple\n{request.path}")
@app.route(hostname="banana.localhost")
def serve_banana_domain(request):
return Response(Status.SUCCESS, "text/plain", f"banana\n{request.path}")
if __name__ == "__main__":
args = jetforce.command_line_parser().parse_args()
ssl_context = jetforce.make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
server = jetforce.GeminiServer(
host=args.host, port=args.port, ssl_context=ssl_context, app=app
)
asyncio.run(server.run())

View File

@ -1,827 +0,0 @@
#!/usr/bin/env python3
"""
Jetforce, an experimental Gemini server.
Overview
--------
GeminiServer:
An asynchronous TCP server built on top of python's asyncio stream
abstraction. This is a lightweight class that accepts incoming requests,
logs them, and sends them to a configurable request handler to be processed.
GeminiRequestHandler:
The request handler manages the life of a single gemini request. It exposes
a simplified interface to read the request URL and write the gemini response
status line and body to the socket. The request URL and other server
information is stuffed into an ``environ`` dictionary that encapsulates the
request at a low level. This dictionary, along with a callback to write the
response data, and passed to a configurable "application" function or class.
JetforceApplication:
This is a base class for writing jetforce server applications. It doesn't
anything on its own, but it does provide a convenient interface to define
custom server endpoints using route decorators. If you want to utilize
jetforce as a library and write your own server in python, this is the class
that you want to extend. The examples/ directory contains some examples of
how to accomplish this.
StaticDirectoryApplication:
This is a pre-built application that serves files from a static directory.
It provides an "out-of-the-box" gemini server without needing to write any
lines of code. This is what is invoked when you launch jetforce from the
command line.
"""
from __future__ import annotations
import argparse
import asyncio
import codecs
import dataclasses
import mimetypes
import os
import pathlib
import re
import socket
import ssl
import subprocess
import sys
import tempfile
import time
import typing
import urllib.parse
if sys.version_info < (3, 7):
sys.exit("Fatal Error: jetforce requires Python 3.7+")
__version__ = "0.2.2"
__title__ = "Jetforce Gemini Server"
__author__ = "Michael Lazar"
__license__ = "Floodgap Free Software License"
__copyright__ = "(c) 2020 Michael Lazar"
ABOUT = fr"""
You are now riding on...
_________ _____________
______ /______ /___ __/_______________________
___ _ /_ _ \ __/_ /_ _ __ \_ ___/ ___/ _ \
/ /_/ / / __/ /_ _ __/ / /_/ / / / /__ / __/
\____/ \___/\__/ /_/ \____//_/ \___/ \___/
An Experimental Gemini Server, v{__version__}
https://github.com/michael-lazar/jetforce
"""
class Status:
"""
Gemini response status codes.
"""
INPUT = 10
SUCCESS = 20
SUCCESS_END_OF_SESSION = 21
REDIRECT_TEMPORARY = 30
REDIRECT_PERMANENT = 31
TEMPORARY_FAILURE = 40
SERVER_UNAVAILABLE = 41
CGI_ERROR = 42
PROXY_ERROR = 43
SLOW_DOWN = 44
PERMANENT_FAILURE = 50
NOT_FOUND = 51
GONE = 52
PROXY_REQUEST_REFUSED = 53
BAD_REQUEST = 59
CLIENT_CERTIFICATE_REQUIRED = 60
TRANSIENT_CERTIFICATE_REQUESTED = 61
AUTHORISED_CERTIFICATE_REQUIRED = 62
CERTIFICATE_NOT_ACCEPTED = 63
FUTURE_CERTIFICATE_REJECTED = 64
EXPIRED_CERTIFICATE_REJECTED = 65
class Request:
"""
Object that encapsulates information about a single gemini request.
"""
def __init__(self, environ: dict):
self.environ = environ
self.url = environ["GEMINI_URL"]
url_parts = urllib.parse.urlparse(self.url)
if not url_parts.hostname:
raise ValueError("URL must contain a `hostname` part")
if not url_parts.scheme:
# If scheme is missing, infer it to be gemini://
self.scheme = "gemini"
else:
self.scheme = url_parts.scheme
self.hostname = url_parts.hostname
self.port = url_parts.port
self.path = url_parts.path
self.params = url_parts.params
self.query = urllib.parse.unquote(url_parts.query)
self.fragment = url_parts.fragment
@dataclasses.dataclass
class Response:
"""
Object that encapsulates information about a single gemini response.
"""
status: int
meta: str
body: typing.Union[None, bytes, str, typing.Iterator[bytes]] = None
@dataclasses.dataclass
class RoutePattern:
"""
A pattern for matching URLs with a single endpoint or route.
"""
path: str = ".*"
scheme: str = "gemini"
hostname: typing.Optional[str] = None
strict_hostname: bool = True
strict_port: bool = True
strict_trailing_slash: bool = False
def match(self, request: Request) -> typing.Optional[re.Match]:
"""
Check if the given request URL matches this route pattern.
"""
if self.hostname is None:
server_hostname = request.environ["HOSTNAME"]
else:
server_hostname = self.hostname
server_port = int(request.environ["SERVER_PORT"])
if self.strict_hostname and request.hostname != server_hostname:
return
if self.strict_port and request.port is not None:
if request.port != server_port:
return
if self.scheme and self.scheme != request.scheme:
return
if self.strict_trailing_slash:
request_path = request.path
else:
request_path = request.path.rstrip("/")
return re.fullmatch(self.path, request_path)
class JetforceApplication:
"""
Base Jetforce application class with primitive URL routing.
This is a base class for writing jetforce server applications. It doesn't
anything on its own, but it does provide a convenient interface to define
custom server endpoints using route decorators. If you want to utilize
jetforce as a library and write your own server in python, this is the class
that you want to extend. The examples/ directory contains some examples of
how to accomplish this.
"""
def __init__(self):
self.routes: typing.List[
typing.Tuple[RoutePattern, typing.Callable[[Request], Response]]
] = []
def __call__(
self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]:
try:
request = Request(environ)
except Exception:
send_status(Status.BAD_REQUEST, "Unrecognized URL format")
return
for route_pattern, callback in self.routes[::-1]:
if route_pattern.match(request):
break
else:
callback = self.default_callback
response = callback(request)
send_status(response.status, response.meta)
if isinstance(response.body, bytes):
yield response.body
elif isinstance(response.body, str):
yield response.body.encode()
elif response.body:
yield from response.body
def route(
self,
path: str = ".*",
scheme: str = "gemini",
hostname: typing.Optional[str] = None,
strict_hostname: bool = True,
strict_trailing_slash: bool = False,
) -> typing.Callable:
"""
Decorator for binding a function to a route based on the URL path.
app = JetforceApplication()
@app.route('/my-path')
def my_path(request):
return Response(Status.SUCCESS, 'text/plain', 'Hello world!')
"""
route_pattern = RoutePattern(
path, scheme, hostname, strict_hostname, strict_trailing_slash
)
def wrap(func: typing.Callable) -> typing.Callable:
self.routes.append((route_pattern, func))
return func
return wrap
def default_callback(self, request: Request) -> Response:
"""
Set the error response based on the URL type.
"""
return Response(Status.PERMANENT_FAILURE, "Not Found")
class StaticDirectoryApplication(JetforceApplication):
"""
Application for serving static files & CGI over gemini.
This is a pre-built application that serves files from a static directory.
It provides an "out-of-the-box" gemini server without needing to write any
lines of code. This is what is invoked when you launch jetforce from the
command line.
If a directory contains a file with the name "index.gmi", that file will
be returned when the directory path is requested. Otherwise, a directory
listing will be auto-generated.
"""
def __init__(
self,
root_directory: str = "/var/gemini",
index_file: str = "index.gmi",
cgi_directory: str = "cgi-bin",
):
super().__init__()
self.routes.append((RoutePattern(), self.serve_static_file))
self.root = pathlib.Path(root_directory).resolve(strict=True)
self.cgi_directory = cgi_directory.strip("/") + "/"
self.index_file = index_file
self.mimetypes = mimetypes.MimeTypes()
self.mimetypes.add_type("text/gemini", ".gmi")
self.mimetypes.add_type("text/gemini", ".gemini")
def serve_static_file(self, request: Request) -> Response:
"""
Convert a URL into a filesystem path, and attempt to serve the file
or directory that is represented at that path.
"""
url_path = pathlib.Path(request.path.strip("/"))
filename = pathlib.Path(os.path.normpath(str(url_path)))
if filename.is_absolute() or str(filename.name).startswith(".."):
# Guard against breaking out of the directory
return Response(Status.NOT_FOUND, "Not Found")
filesystem_path = self.root / filename
try:
if not os.access(filesystem_path, os.R_OK):
# File not readable
return Response(Status.NOT_FOUND, "Not Found")
except OSError:
# Filename too large, etc.
return Response(Status.NOT_FOUND, "Not Found")
if filesystem_path.is_file():
is_cgi = str(filename).startswith(self.cgi_directory)
is_exe = os.access(filesystem_path, os.X_OK)
if is_cgi and is_exe:
return self.run_cgi_script(filesystem_path, request.environ)
mimetype = self.guess_mimetype(filesystem_path.name)
generator = self.load_file(filesystem_path)
return Response(Status.SUCCESS, mimetype, generator)
elif filesystem_path.is_dir():
if not request.path.endswith("/"):
url_parts = urllib.parse.urlparse(request.url)
url_parts = url_parts._replace(path=request.path + "/")
return Response(Status.REDIRECT_PERMANENT, url_parts.geturl())
index_file = filesystem_path / self.index_file
if index_file.exists():
generator = self.load_file(index_file)
return Response(Status.SUCCESS, "text/gemini", generator)
generator = self.list_directory(url_path, filesystem_path)
return Response(Status.SUCCESS, "text/gemini", generator)
else:
return Response(Status.NOT_FOUND, "Not Found")
def run_cgi_script(self, filesystem_path: pathlib.Path, environ: dict) -> Response:
"""
Execute the given file as a CGI script and return the script's stdout
stream to the client.
"""
script_name = str(filesystem_path)
cgi_env = environ.copy()
cgi_env["GATEWAY_INTERFACE"] = "GCI/1.1"
cgi_env["SCRIPT_NAME"] = script_name
# Decode the stream as unicode so we can parse the status line
# Use surrogateescape to preserve any non-UTF8 byte sequences.
out = subprocess.Popen(
[script_name],
stdout=subprocess.PIPE,
env=cgi_env,
bufsize=1,
universal_newlines=True,
errors="surrogateescape",
)
status_line = out.stdout.readline().strip()
status_parts = status_line.split(maxsplit=1)
if len(status_parts) != 2 or not status_parts[0].isdecimal():
return Response(Status.CGI_ERROR, "Unexpected Error")
status, meta = status_parts
# Re-encode the rest of the body as bytes
body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
return Response(int(status), meta, body)
def load_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]:
"""
Load a file using a generator to allow streaming data to the TCP socket.
"""
with filesystem_path.open("rb") as fp:
data = fp.read(1024)
while data:
yield data
data = fp.read(1024)
def list_directory(
self, url_path: pathlib.Path, filesystem_path: pathlib.Path
) -> typing.Iterator[bytes]:
"""
Auto-generate a text/gemini document based on the contents of the file system.
"""
yield f"Directory: /{url_path}\r\n".encode()
if url_path.parent != url_path:
yield f"=>/{url_path.parent}\t..\r\n".encode()
for file in sorted(filesystem_path.iterdir()):
if file.name.startswith("."):
# Skip hidden directories/files that may contain sensitive info
continue
elif file.is_dir():
yield f"=>/{url_path / file.name}/\t{file.name}/\r\n".encode()
else:
yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
def guess_mimetype(self, filename: str) -> str:
"""
Guess the mimetype of a file based on the file extension.
"""
mime, encoding = self.mimetypes.guess_type(filename)
if encoding:
return f"{mime}; charset={encoding}"
else:
return mime or "text/plain"
def default_callback(self, request: Request) -> Response:
"""
Since the StaticDirectoryApplication only serves gemini URLs, return
a proxy request refused for suspicious URLs.
"""
if request.scheme != "gemini":
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
elif request.hostname != request.environ["HOSTNAME"]:
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
elif request.port and request.port != request.environ["SERVER_PORT"]:
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
else:
return Response(Status.NOT_FOUND, "Not Found")
class GeminiRequestHandler:
"""
Handle a single Gemini Protocol TCP request.
The request handler manages the life of a single gemini request. It exposes
a simplified interface to read the request URL and write the gemini response
status line and body to the socket. The request URL and other server
information is stuffed into an ``environ`` dictionary that encapsulates the
request at a low level. This dictionary, along with a callback to write the
response data, and passed to a configurable "application" function or class.
This design borrows heavily from the standard library's HTTP request
handler (http.server.BaseHTTPRequestHandler). However, I did not make any
attempts to directly emulate the existing conventions, because Gemini is an
inherently simpler protocol than HTTP and much of the boilerplate could be
removed.
"""
TIMESTAMP_FORMAT = "%d/%b/%Y:%H:%M:%S %z"
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
received_timestamp: time.struct_time
remote_addr: str
client_cert: dict
url: str
status: int
meta: str
response_buffer: str
response_size: int
def __init__(self, server: GeminiServer, app: typing.Callable) -> None:
self.server = server
self.app = app
self.response_size = 0
async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""
Main method for the request handler, performs the following:
1. Read the request bytes from the reader stream
2. Parse the request and generate response data
3. Write the response bytes to the writer stream
"""
self.reader = reader
self.writer = writer
self.remote_addr = writer.get_extra_info("peername")[0]
self.client_cert = writer.get_extra_info("peercert")
self.received_timestamp = time.localtime()
try:
await self.parse_header()
except Exception:
# Malformed request, throw it away and exit immediately
self.write_status(Status.BAD_REQUEST, "Malformed request")
return await self.close_connection()
try:
environ = self.build_environ()
app = self.app(environ, self.write_status)
for data in app:
await self.write_body(data)
except Exception:
self.write_status(Status.CGI_ERROR, "An unexpected error occurred")
raise
finally:
await self.close_connection()
def build_environ(self) -> typing.Dict[str, typing.Any]:
"""
Construct a dictionary that will be passed to the application handler.
Variable names conform to the CGI spec defined in RFC 3875.
"""
url_parts = urllib.parse.urlparse(self.url)
environ = {
"GEMINI_URL": self.url,
"HOSTNAME": self.server.hostname,
"PATH_INFO": url_parts.path,
"QUERY_STRING": url_parts.query,
"REMOTE_ADDR": self.remote_addr,
"REMOTE_HOST": self.remote_addr,
"SERVER_NAME": self.server.hostname,
"SERVER_PORT": str(self.server.port),
"SERVER_PROTOCOL": "GEMINI",
"SERVER_SOFTWARE": f"jetforce/{__version__}",
}
if self.client_cert:
subject = dict(x[0] for x in self.client_cert["subject"])
environ.update(
{
"AUTH_TYPE": "CERTIFICATE",
"REMOTE_USER": subject.get("commonName", ""),
"TLS_CLIENT_NOT_BEFORE": self.client_cert["notBefore"],
"TLS_CLIENT_NOT_AFTER": self.client_cert["notAfter"],
"TLS_CLIENT_SERIAL_NUMBER": self.client_cert["serialNumber"],
}
)
return environ
async def parse_header(self) -> None:
"""
Parse the gemini header line.
The request is a single UTF-8 line formatted as: <URL>\r\n
"""
data = await self.reader.readuntil(b"\r\n")
data = data[:-2] # strip the line ending
if len(data) > 1024:
raise ValueError("URL exceeds max length of 1024 bytes")
self.url = data.decode()
def write_status(self, status: int, meta: str) -> None:
"""
Write the gemini status line to an internal buffer.
The status line is a single UTF-8 line formatted as:
<code>\t<meta>\r\n
If the response status is 2, the meta field will contain the mimetype
of the response data sent. If the status is something else, the meta
will contain a descriptive message.
The status is not written immediately, it's added to an internal buffer
that must be flushed. This is done so that the status can be updated as
long as no other data has been written to the stream yet.
"""
self.status = status
self.meta = meta
self.response_buffer = f"{status}\t{meta}\r\n"
async def write_body(self, data: bytes) -> None:
"""
Write bytes to the gemini response body.
"""
await self.flush_status()
self.response_size += len(data)
self.writer.write(data)
await self.writer.drain()
async def flush_status(self) -> None:
"""
Flush the status line from the internal buffer to the socket stream.
"""
if self.response_buffer and not self.response_size:
data = self.response_buffer.encode()
self.response_size += len(data)
self.writer.write(data)
await self.writer.drain()
self.response_buffer = ""
async def close_connection(self) -> None:
"""
Flush any remaining bytes and close the stream.
"""
await self.flush_status()
self.log_request()
await self.writer.drain()
def log_request(self) -> None:
"""
Log a gemini request using a format derived from the Common Log Format.
"""
try:
self.server.log_message(
f"{self.remote_addr} "
f"[{time.strftime(self.TIMESTAMP_FORMAT, self.received_timestamp)}] "
f'"{self.url}" '
f"{self.status} "
f'"{self.meta}" '
f"{self.response_size}"
)
except AttributeError:
# Malformed request or dropped connection
pass
class GeminiServer:
"""
An asynchronous TCP server that uses the asyncio stream abstraction.
This is a lightweight class that accepts incoming requests, logs them, and
sends them to a configurable request handler to be processed.
"""
request_handler_class = GeminiRequestHandler
def __init__(
self,
app: typing.Callable,
host: str = "127.0.0.1",
port: int = 1965,
ssl_context: ssl.SSLContext = None,
hostname: str = "localhost",
) -> None:
self.host = host
self.port = port
self.hostname = hostname
self.app = app
self.ssl_context = ssl_context
async def run(self) -> None:
"""
The main asynchronous server loop.
"""
self.log_message(ABOUT)
server = await asyncio.start_server(
self.accept_connection, self.host, self.port, ssl=self.ssl_context
)
self.log_message(f"Server hostname is {self.hostname}")
for sock in server.sockets:
sock_ip, sock_port, *_ = sock.getsockname()
if sock.family == socket.AF_INET:
self.log_message(f"Listening on {sock_ip}:{sock_port}")
else:
self.log_message(f"Listening on [{sock_ip}]:{sock_port}")
async with server:
await server.serve_forever()
async def accept_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
"""
Hook called by the socket server when a new connection is accepted.
"""
request_handler = self.request_handler_class(self, self.app)
try:
await request_handler.handle(reader, writer)
finally:
writer.close()
def log_message(self, message: str) -> None:
"""
Log a diagnostic server message.
"""
print(message, file=sys.stderr)
def generate_ad_hoc_certificate(hostname: str) -> typing.Tuple[str, str]:
"""
Utility function to generate a self-signed SSL certificate key pair if
one isn't provided. Results may vary depending on your version of OpenSSL.
"""
certfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.crt"
keyfile = pathlib.Path(tempfile.gettempdir()) / f"{hostname}.key"
if not certfile.exists() or not keyfile.exists():
print(f"Writing ad hoc TLS certificate to {certfile}")
subprocess.run(
[
f"openssl req -newkey rsa:2048 -nodes -keyout {keyfile}"
f' -nodes -x509 -out {certfile} -subj "/CN={hostname}"'
],
shell=True,
check=True,
)
return str(certfile), str(keyfile)
def make_ssl_context(
hostname: str = "localhost",
certfile: typing.Optional[str] = None,
keyfile: typing.Optional[str] = None,
cafile: typing.Optional[str] = None,
capath: typing.Optional[str] = None,
) -> ssl.SSLContext:
"""
Generate a sane default SSL context for a Gemini server.
For more information on what these variables mean and what values they can
contain, see the python standard library documentation:
https://docs.python.org/3/library/ssl.html#ssl-contexts
verify_mode: ssl.CERT_OPTIONAL
A client certificate request is sent to the client. The client may
either ignore the request or send a certificate in order perform TLS
client cert authentication. If the client chooses to send a certificate,
it is verified. Any verification error immediately aborts the TLS
handshake.
"""
if certfile is None:
certfile, keyfile = generate_ad_hoc_certificate(hostname)
context = ssl.SSLContext()
context.verify_mode = ssl.CERT_OPTIONAL
context.load_cert_chain(certfile, keyfile)
if not cafile and not capath:
# Load from the system's default client CA directory
context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
else:
# Use a custom CA for validating client certificates
context.load_verify_locations(cafile, capath)
return context
def command_line_parser() -> argparse.ArgumentParser:
"""
Construct the default argument parser when launching the server from
the command line. These are meant to be application-agnostic arguments
that could apply to any subclass of the JetforceApplication.
"""
parser = argparse.ArgumentParser(
prog="jetforce",
description="An Experimental Gemini Protocol Server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-V", "--version", action="version", version="jetforce " + __version__
)
parser.add_argument("--host", help="Server address to bind to", default="127.0.0.1")
parser.add_argument("--port", help="Server port to bind to", type=int, default=1965)
parser.add_argument("--hostname", help="Server hostname", default="localhost")
parser.add_argument(
"--tls-certfile",
dest="certfile",
help="Server TLS certificate file",
metavar="FILE",
)
parser.add_argument(
"--tls-keyfile",
dest="keyfile",
help="Server TLS private key file",
metavar="FILE",
)
parser.add_argument(
"--tls-cafile",
dest="cafile",
help="A CA file to use for validating clients",
metavar="FILE",
)
parser.add_argument(
"--tls-capath",
dest="capath",
help="A directory containing CA files for validating clients",
metavar="DIR",
)
return parser
def run_server() -> None:
"""
Entry point for running the static directory server.
"""
parser = command_line_parser()
parser.add_argument(
"--dir",
help="Root directory on the filesystem to serve",
default="/var/gemini",
metavar="DIR",
)
parser.add_argument(
"--cgi-dir",
help="CGI script directory, relative to the server's root directory",
default="cgi-bin",
metavar="DIR",
)
parser.add_argument(
"--index-file",
help="If a directory contains a file with this name, that file will be "
"served instead of auto-generating an index page",
default="index.gmi",
metavar="FILE",
)
args = parser.parse_args()
app = StaticDirectoryApplication(args.dir, args.index_file, args.cgi_dir)
ssl_context = make_ssl_context(
args.hostname, args.certfile, args.keyfile, args.cafile, args.capath
)
server = GeminiServer(
host=args.host,
port=args.port,
ssl_context=ssl_context,
hostname=args.hostname,
app=app,
)
asyncio.run(server.run())
if __name__ == "__main__":
run_server()

14
jetforce/__init__.py Normal file
View File

@ -0,0 +1,14 @@
"""
isort:skip_file
"""
from .__version__ import __version__
from .app.base import JetforceApplication, Request, Response, RoutePattern, Status
from .app.static import StaticDirectoryApplication
from .app.composite import CompositeApplication
from .protocol import GeminiProtocol
from .server import GeminiServer
__title__ = "Jetforce Gemini Server"
__author__ = "Michael Lazar"
__license__ = "Floodgap Free Software License"
__copyright__ = "(c) 2020 Michael Lazar"

117
jetforce/__main__.py Normal file
View File

@ -0,0 +1,117 @@
"""
Main entry point for running ``jetforce`` from the command line.
This will launch a gemini server running the StaticFileServer application.
"""
# Black does not do a good job of formatting argparse code, IMHO.
# fmt: off
import argparse
import sys
from .__version__ import __version__
from .app.static import StaticDirectoryApplication
from .server import GeminiServer
if sys.version_info < (3, 7):
sys.exit("Fatal Error: jetforce requires Python 3.7+")
# noinspection PyTypeChecker
parser = argparse.ArgumentParser(
prog="jetforce",
description="An Experimental Gemini Protocol Server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-V", "--version",
action="version",
version="jetforce " + __version__
)
group = parser.add_argument_group("server configuration")
group.add_argument(
"--host",
help="Server address to bind to",
default="127.0.0.1"
)
group.add_argument(
"--port",
help="Server port to bind to",
type=int,
default=1965
)
group.add_argument(
"--hostname",
help="Server hostname",
default="localhost"
)
group.add_argument(
"--tls-certfile",
dest="certfile",
help="Server TLS certificate file",
metavar="FILE",
)
group.add_argument(
"--tls-keyfile",
dest="keyfile",
help="Server TLS private key file",
metavar="FILE",
)
group.add_argument(
"--tls-cafile",
dest="cafile",
help="A CA file to use for validating clients",
metavar="FILE",
)
group.add_argument(
"--tls-capath",
dest="capath",
help="A directory containing CA files for validating clients",
metavar="DIR",
)
group = parser.add_argument_group("static file configuration")
group.add_argument(
"--dir",
help="Root directory on the filesystem to serve",
default="/var/gemini",
metavar="DIR",
dest="root_directory",
)
group.add_argument(
"--cgi-dir",
help="CGI script directory, relative to the server's root directory",
default="cgi-bin",
metavar="DIR",
dest="cgi_directory",
)
group.add_argument(
"--index-file",
help="If a directory contains a file with this name, "
"that file will be served instead of auto-generating an index page",
default="index.gmi",
metavar="FILE",
dest="index_file",
)
def main():
args = parser.parse_args()
app = StaticDirectoryApplication(
root_directory=args.root_directory,
index_file=args.index_file,
cgi_directory=args.cgi_directory,
)
server = GeminiServer(
app=app,
host=args.host,
port=args.port,
hostname=args.hostname,
certfile=args.certfile,
keyfile=args.keyfile,
cafile=args.cafile,
capath=args.capath,
)
server.run()
if __name__ == "__main__":
main()

1
jetforce/__version__.py Normal file
View File

@ -0,0 +1 @@
__version__ = "0.3.0"

0
jetforce/app/__init__.py Normal file
View File

202
jetforce/app/base.py Normal file
View File

@ -0,0 +1,202 @@
import argparse
import dataclasses
import re
import typing
import urllib.parse
class Status:
"""
Gemini response status codes.
"""
INPUT = 10
SUCCESS = 20
SUCCESS_END_OF_SESSION = 21
REDIRECT_TEMPORARY = 30
REDIRECT_PERMANENT = 31
TEMPORARY_FAILURE = 40
SERVER_UNAVAILABLE = 41
CGI_ERROR = 42
PROXY_ERROR = 43
SLOW_DOWN = 44
PERMANENT_FAILURE = 50
NOT_FOUND = 51
GONE = 52
PROXY_REQUEST_REFUSED = 53
BAD_REQUEST = 59
CLIENT_CERTIFICATE_REQUIRED = 60
TRANSIENT_CERTIFICATE_REQUESTED = 61
AUTHORISED_CERTIFICATE_REQUIRED = 62
CERTIFICATE_NOT_ACCEPTED = 63
FUTURE_CERTIFICATE_REJECTED = 64
EXPIRED_CERTIFICATE_REJECTED = 65
class Request:
"""
Object that encapsulates information about a single gemini request.
"""
def __init__(self, environ: dict):
self.environ = environ
self.url = environ["GEMINI_URL"]
url_parts = urllib.parse.urlparse(self.url)
if not url_parts.hostname:
raise ValueError("URL must contain a `hostname` part")
if not url_parts.scheme:
# If scheme is missing, infer it to be gemini://
self.scheme = "gemini"
else:
self.scheme = url_parts.scheme
self.hostname = url_parts.hostname
self.port = url_parts.port
self.path = url_parts.path
self.params = url_parts.params
self.query = urllib.parse.unquote(url_parts.query)
self.fragment = url_parts.fragment
@dataclasses.dataclass
class Response:
"""
Object that encapsulates information about a single gemini response.
"""
status: int
meta: str
body: typing.Union[
None, bytes, str, typing.Iterable[typing.Union[bytes, str]]
] = None
@dataclasses.dataclass
class RoutePattern:
"""
A pattern for matching URLs with a single endpoint or route.
"""
path: str = ".*"
scheme: str = "gemini"
hostname: typing.Optional[str] = None
strict_hostname: bool = True
strict_port: bool = True
strict_trailing_slash: bool = False
def match(self, request: Request) -> typing.Optional[re.Match]:
"""
Check if the given request URL matches this route pattern.
"""
if self.hostname is None:
server_hostname = request.environ["HOSTNAME"]
else:
server_hostname = self.hostname
server_port = int(request.environ["SERVER_PORT"])
if self.strict_hostname and request.hostname != server_hostname:
return
if self.strict_port and request.port is not None:
if request.port != server_port:
return
if self.scheme and self.scheme != request.scheme:
return
if self.strict_trailing_slash:
request_path = request.path
else:
request_path = request.path.rstrip("/")
return re.fullmatch(self.path, request_path)
class JetforceApplication:
"""
Base Jetforce application class with primitive URL routing.
This is a base class for writing jetforce server applications. It doesn't
anything on its own, but it does provide a convenient interface to define
custom server endpoints using route decorators. If you want to utilize
jetforce as a library and write your own server in python, this is the class
that you want to extend. The examples/ directory contains some examples of
how to accomplish this.
"""
def __init__(self):
self.routes: typing.List[
typing.Tuple[RoutePattern, typing.Callable[[Request], Response]]
] = []
def __call__(
self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]:
try:
request = Request(environ)
except Exception:
send_status(Status.BAD_REQUEST, "Unrecognized URL format")
return
for route_pattern, callback in self.routes[::-1]:
if route_pattern.match(request):
break
else:
callback = self.default_callback
response = callback(request)
send_status(response.status, response.meta)
if isinstance(response.body, (bytes, str)):
yield response.body
elif response.body:
yield from response.body
def route(
self,
path: str = ".*",
scheme: str = "gemini",
hostname: typing.Optional[str] = None,
strict_hostname: bool = True,
strict_trailing_slash: bool = False,
) -> typing.Callable:
"""
Decorator for binding a function to a route based on the URL path.
app = JetforceApplication()
@app.route('/my-path')
def my_path(request):
return Response(Status.SUCCESS, 'text/plain', 'Hello world!')
"""
route_pattern = RoutePattern(
path, scheme, hostname, strict_hostname, strict_trailing_slash
)
def wrap(func: typing.Callable) -> typing.Callable:
self.routes.append((route_pattern, func))
return func
return wrap
def default_callback(self, request: Request) -> Response:
"""
Set the error response based on the URL type.
"""
return Response(Status.PERMANENT_FAILURE, "Not Found")
@classmethod
def add_arguments(cls, parser: argparse.ArgumentParser) -> None:
"""
Add any application-specific arguments to the GeminiServer parser.
The destination variables for these arguments should match the method
signature for this class's __init__ method.
"""
return

50
jetforce/app/composite.py Normal file
View File

@ -0,0 +1,50 @@
import typing
from .base import Request, Status
class CompositeApplication:
"""
Route requests between multiple applications by looking at the URL hostname.
The primary intention of this class is enable virtual hosting by serving
two or more applications behind a single jetforce server.
"""
def __init__(self, application_map: typing.Dict[typing.Optional[str], typing.Any]):
"""
Initialize the application by providing a mapping of hostname -> app
key pairs. A hostname of `None` is a special key that can be used as
a default if none of the others match.
Example:
app = CompositeApplication(
{
"cats.com": cats_app,
"dogs.com": dogs_app,
None: other_animals_app,
}
)
"""
self.application_map = application_map
def __call__(
self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]:
try:
request = Request(environ)
except Exception:
send_status(Status.BAD_REQUEST, "Unrecognized URL format")
return
if request.hostname in self.application_map:
environ["HOSTNAME"] = request.hostname
app = self.application_map[request.hostname]
yield from app(environ, send_status)
elif None in self.application_map:
app = self.application_map[None]
yield from app(environ, send_status)
else:
send_status(Status.PROXY_REQUEST_REFUSED, "Invalid hostname")

186
jetforce/app/static.py Normal file
View File

@ -0,0 +1,186 @@
import codecs
import mimetypes
import os
import pathlib
import subprocess
import typing
import urllib.parse
from .base import JetforceApplication, Request, Response, RoutePattern, Status
class StaticDirectoryApplication(JetforceApplication):
"""
Application for serving static files & CGI over gemini.
This is a batteries-included application that serves files from a static
directory. It provides a preconfigured gemini server without needing to
write any lines of code. This is what is invoked when you launch jetforce
from the command line.
If a directory contains a file with the name "index.gmi", that file will
be returned when the directory path is requested. Otherwise, a directory
listing will be auto-generated.
"""
def __init__(
self,
root_directory: str = "/var/gemini",
index_file: str = "index.gmi",
cgi_directory: str = "cgi-bin",
):
super().__init__()
self.routes.append((RoutePattern(), self.serve_static_file))
self.root = pathlib.Path(root_directory).resolve(strict=True)
self.cgi_directory = cgi_directory.strip("/") + "/"
self.index_file = index_file
self.mimetypes = mimetypes.MimeTypes()
self.mimetypes.add_type("text/gemini", ".gmi")
self.mimetypes.add_type("text/gemini", ".gemini")
def serve_static_file(self, request: Request) -> Response:
"""
Convert a URL into a filesystem path, and attempt to serve the file
or directory that is represented at that path.
"""
url_path = pathlib.Path(request.path.strip("/"))
filename = pathlib.Path(os.path.normpath(str(url_path)))
if filename.is_absolute() or str(filename.name).startswith(".."):
# Guard against breaking out of the directory
return Response(Status.NOT_FOUND, "Not Found")
filesystem_path = self.root / filename
try:
if not os.access(filesystem_path, os.R_OK):
# File not readable
return Response(Status.NOT_FOUND, "Not Found")
except OSError:
# Filename too large, etc.
return Response(Status.NOT_FOUND, "Not Found")
if filesystem_path.is_file():
is_cgi = str(filename).startswith(self.cgi_directory)
is_exe = os.access(filesystem_path, os.X_OK)
if is_cgi and is_exe:
return self.run_cgi_script(filesystem_path, request.environ)
mimetype = self.guess_mimetype(filesystem_path.name)
generator = self.load_file(filesystem_path)
return Response(Status.SUCCESS, mimetype, generator)
elif filesystem_path.is_dir():
if not request.path.endswith("/"):
url_parts = urllib.parse.urlparse(request.url)
# noinspection PyProtectedMember
url_parts = url_parts._replace(path=request.path + "/")
return Response(Status.REDIRECT_PERMANENT, url_parts.geturl())
index_file = filesystem_path / self.index_file
if index_file.exists():
generator = self.load_file(index_file)
return Response(Status.SUCCESS, "text/gemini", generator)
generator = self.list_directory(url_path, filesystem_path)
return Response(Status.SUCCESS, "text/gemini", generator)
else:
return Response(Status.NOT_FOUND, "Not Found")
def run_cgi_script(self, filesystem_path: pathlib.Path, environ: dict) -> Response:
"""
Execute the given file as a CGI script and return the script's stdout
stream to the client.
"""
script_name = str(filesystem_path)
cgi_env = {k: v for k, v in environ.items() if k.isupper()}
cgi_env["GATEWAY_INTERFACE"] = "GCI/1.1"
cgi_env["SCRIPT_NAME"] = script_name
# Decode the stream as unicode so we can parse the status line
# Use surrogateescape to preserve any non-UTF8 byte sequences.
out = subprocess.Popen(
[script_name],
stdout=subprocess.PIPE,
env=cgi_env,
bufsize=1,
universal_newlines=True,
errors="surrogateescape",
)
status_line = out.stdout.readline().strip()
status_parts = status_line.split(maxsplit=1)
if len(status_parts) != 2 or not status_parts[0].isdecimal():
return Response(Status.CGI_ERROR, "Unexpected Error")
status, meta = status_parts
# Re-encode the rest of the body as bytes
body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
return Response(int(status), meta, body)
def load_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]:
"""
Load a file in chunks to allow streaming to the TCP socket.
"""
with filesystem_path.open("rb") as fp:
data = fp.read(1024)
while data:
yield data
data = fp.read(1024)
def list_directory(
self, url_path: pathlib.Path, filesystem_path: pathlib.Path
) -> typing.Iterator[bytes]:
"""
Auto-generate a text/gemini document based on the contents of the file system.
"""
yield f"Directory: /{url_path}\r\n".encode()
if url_path.parent != url_path:
yield f"=>/{url_path.parent}\t..\r\n".encode()
for file in sorted(filesystem_path.iterdir()):
if file.name.startswith("."):
# Skip hidden directories/files that may contain sensitive info
continue
elif file.is_dir():
yield f"=>/{url_path / file.name}/\t{file.name}/\r\n".encode()
else:
yield f"=>/{url_path / file.name}\t{file.name}\r\n".encode()
def guess_mimetype(self, filename: str) -> str:
"""
Guess the mimetype of a file based on the file extension.
"""
mime, encoding = self.mimetypes.guess_type(filename)
if encoding:
return f"{mime}; charset={encoding}"
else:
return mime or "text/plain"
def default_callback(self, request: Request) -> Response:
"""
Since the StaticDirectoryApplication only serves gemini URLs, return
a proxy request refused for suspicious URLs.
"""
if request.scheme != "gemini":
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
elif request.hostname != request.environ["HOSTNAME"]:
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
elif request.port and request.port != request.environ["SERVER_PORT"]:
return Response(
Status.PROXY_REQUEST_REFUSED,
"This server does not allow proxy requests",
)
else:
return Response(Status.NOT_FOUND, "Not Found")

230
jetforce/protocol.py Normal file
View File

@ -0,0 +1,230 @@
from __future__ import annotations
import time
import traceback
import typing
import urllib.parse
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import ensureDeferred
from twisted.internet.threads import deferToThread
from twisted.protocols.basic import LineOnlyReceiver
from .__version__ import __version__
from .app.base import JetforceApplication, Status
from .tls import inspect_certificate
class GeminiProtocol(LineOnlyReceiver):
"""
Handle a single Gemini Protocol TCP request.
The request handler manages the life of a single gemini request. It exposes
a simplified interface to read the request URL and write the gemini response
status line and body to the socket. The request URL and other server
information is stuffed into an ``environ`` dictionary that encapsulates the
request at a low level. This dictionary, along with a callback to write the
response data, and passed to a configurable "application" function or class.
This design borrows heavily from the standard library's HTTP request
handler (http.server.BaseHTTPRequestHandler). However, I did not make any
attempts to directly emulate the existing conventions, because Gemini is an
inherently simpler protocol than HTTP and much of the boilerplate could be
removed.
"""
TIMESTAMP_FORMAT = "%d/%b/%Y:%H:%M:%S %z"
client_addr: typing.Union[IPv4Address, IPv6Address]
connected_timestamp: time.struct_time
request: bytes
url: str
status: int
meta: str
response_buffer: str
response_size: int
def __init__(self, server: "GeminiServer", app: JetforceApplication):
self.server = server
self.app = app
def connectionMade(self):
"""
This is invoked by twisted after the connection is first established.
"""
self.connected_timestamp = time.localtime()
self.response_size = 0
self.response_buffer = ""
self.client_addr = self.transport.getPeer()
def lineReceived(self, line):
"""
This method is invoked by LineOnlyReceiver for every incoming line.
"""
self.request = line
return ensureDeferred(self._handle_request_noblock())
async def _handle_request_noblock(self):
"""
Handle the gemini request and write the raw response to the socket.
This method is implemented using an async coroutine, which has been
supported by twisted since python 3.5 by wrapping the method in
ensureDeferred(). Twisted + coroutines is a bitch to figure out, but
once it clicks it really does turn out to be an elegant solution.
Any time that we call into the application code, we wrap the call with
deferToThread() which will execute the code in a separate thread using
twisted's thread pool. deferToThread() will return a future object
that we can then `await` to get the result when the thread finishes.
This is important because we don't want application code to block the
twisted event loop from serving other requests at the same time.
In the future, I would like to add the capability for applications to
implement proper coroutines that can call `await` on directly without
needing to wrap them in threads. Conceptually, this shouldn't be too
difficult, but it will require implementing an alternate version of
the JetforceApplication that's async-compatible.
"""
try:
self.parse_header()
except Exception:
# Malformed request, throw it away and exit immediately
self.server.log_message(traceback.format_exc())
self.write_status(Status.BAD_REQUEST, "Malformed request")
self.flush_status()
self.transport.loseConnection()
raise
try:
environ = self.build_environ()
response_generator = await deferToThread(
self.app, environ, self.write_status
)
while True:
try:
data = await deferToThread(response_generator.__next__)
self.write_body(data)
except StopIteration:
break
except Exception:
self.server.log_message(traceback.format_exc())
self.write_status(Status.CGI_ERROR, "An unexpected error occurred")
finally:
self.flush_status()
self.log_request()
self.transport.loseConnection()
def build_environ(self) -> typing.Dict[str, typing.Any]:
"""
Construct a dictionary that will be passed to the application handler.
Variable names (mostly) conform to the CGI spec defined in RFC 3875.
The TLS variable names borrow from the GLV-1.12556 server.
"""
url_parts = urllib.parse.urlparse(self.url)
environ = {
"GEMINI_URL": self.url,
"HOSTNAME": self.server.hostname,
"PATH_INFO": url_parts.path,
"QUERY_STRING": url_parts.query,
"REMOTE_ADDR": self.client_addr.host,
"REMOTE_HOST": self.client_addr.host,
"SERVER_NAME": self.server.hostname,
"SERVER_PORT": str(self.client_addr.port),
"SERVER_PROTOCOL": "GEMINI",
"SERVER_SOFTWARE": f"jetforce/{__version__}",
"client_certificate": None,
}
cert = self.transport.getPeerCertificate()
if cert:
x509_cert = cert.to_cryptography()
cert_data = inspect_certificate(x509_cert)
conn = self.transport.getHandle()
environ.update(
{
"client_certificate": x509_cert,
"AUTH_TYPE": "CERTIFICATE",
"REMOTE_USER": cert_data["common_name"],
"TLS_CLIENT_HASH": cert_data["fingerprint"],
"TLS_CLIENT_NOT_BEFORE": cert_data["not_before"],
"TLS_CLIENT_NOT_AFTER": cert_data["not_after"],
"TLS_CLIENT_SERIAL_NUMBER": cert_data["serial_number"],
# Grab the value that was stashed during the TLS handshake
"TLS_CLIENT_VERIFIED": getattr(conn, "verified", False),
"TLS_CIPHER": conn.get_cipher_name(),
"TLS_VERSION": conn.get_protocol_version_name(),
}
)
return environ
def parse_header(self) -> None:
"""
Parse the gemini header line.
The request is a single UTF-8 line formatted as: <URL>\r\n
"""
if len(self.request) > 1024:
raise ValueError("URL exceeds max length of 1024 bytes")
self.url = self.request.decode()
def write_status(self, status: int, meta: str) -> None:
"""
Write the gemini status line to an internal buffer.
The status line is a single UTF-8 line formatted as:
<code>\t<meta>\r\n
If the response status is 2, the meta field will contain the mimetype
of the response data sent. If the status is something else, the meta
will contain a descriptive message.
The status is not written immediately, it's added to an internal buffer
that must be flushed. This is done so that the status can be updated as
long as no other data has been written to the stream yet.
"""
self.status = status
self.meta = meta
self.response_buffer = f"{status}\t{meta}\r\n"
def write_body(self, data: typing.Union[str, bytes]) -> None:
"""
Write bytes to the gemini response body.
"""
if isinstance(data, str):
data = data.encode()
self.flush_status()
self.response_size += len(data)
self.transport.write(data)
def flush_status(self) -> None:
"""
Flush the status line from the internal buffer to the socket stream.
"""
if self.response_buffer and not self.response_size:
data = self.response_buffer.encode()
self.response_size += len(data)
self.transport.write(data)
self.response_buffer = ""
def log_request(self) -> None:
"""
Log a gemini request using a format derived from the Common Log Format.
"""
try:
message = '{} [{}] "{}" {} {} {}'.format(
self.client_addr.host,
time.strftime(self.TIMESTAMP_FORMAT, self.connected_timestamp),
self.url,
self.status,
self.meta,
self.response_size,
)
except AttributeError:
# The connection ended before we got far enough to log anything
pass
else:
self.server.log_message(message)

127
jetforce/server.py Normal file
View File

@ -0,0 +1,127 @@
from __future__ import annotations
import socket
import sys
import typing
from twisted.internet import reactor
from twisted.internet.base import ReactorBase
from twisted.internet.endpoints import SSL4ServerEndpoint
from twisted.internet.protocol import Factory
from twisted.internet.tcp import Port
from .__version__ import __version__
from .protocol import GeminiProtocol
from .tls import GeminiCertificateOptions, generate_ad_hoc_certificate
if sys.stderr.isatty():
CYAN = "\033[36m\033[1m"
RESET = "\033[0m"
else:
CYAN = ""
RESET = ""
ABOUT = fr"""
{CYAN}You are now riding on...
_________ _____________
______ /______ /___ __/_______________________
___ _ /_ _ \ __/_ /_ _ __ \_ ___/ ___/ _ \
/ /_/ / / __/ /_ _ __/ / /_/ / / / /__ / __/
\____/ \___/\__/ /_/ \____//_/ \___/ \___/{RESET}
An Experimental Gemini Server, v{__version__}
https://github.com/michael-lazar/jetforce
"""
class GeminiServer(Factory):
"""
Wrapper around twisted's TCP server that handles most of the setup and
plumbing for you.
"""
protocol_class = GeminiProtocol
# The TLS twisted interface class is confusingly named SSL4, even though it
# will accept either IPv4 & IPv6 interfaces.
endpoint_class = SSL4ServerEndpoint
def __init__(
self,
app: typing.Callable,
reactor: ReactorBase = reactor,
host: str = "127.0.0.1",
port: int = 1965,
hostname: str = "localhost",
certfile: typing.Optional[str] = None,
keyfile: typing.Optional[str] = None,
cafile: typing.Optional[str] = None,
capath: typing.Optional[str] = None,
):
if certfile is None:
self.log_message("Generating ad-hoc certificate files...")
certfile, keyfile = generate_ad_hoc_certificate(hostname)
self.app = app
self.reactor = reactor
self.host = host
self.port = port
self.hostname = hostname
self.certfile = certfile
self.keyfile = keyfile
self.cafile = cafile
self.capath = capath
def log_message(self, message: str) -> None:
"""
Log a diagnostic server message to stderr.
"""
print(message, file=sys.stderr)
def on_bind_interface(self, port: Port) -> None:
"""
Log when the server binds to an interface.
"""
sock_ip, sock_port, *_ = port.socket.getsockname()
if port.addressFamily == socket.AF_INET:
self.log_message(f"Listening on {sock_ip}:{sock_port}")
else:
self.log_message(f"Listening on [{sock_ip}]:{sock_port}")
def buildProtocol(self, addr) -> GeminiProtocol:
"""
This method is invoked by twisted once for every incoming connection.
It builds the instance of the protocol class, which is what actually
implements the Gemini protocol.
"""
return GeminiProtocol(self, self.app)
def run(self) -> None:
"""
This is the main server loop.
"""
self.log_message(ABOUT)
self.log_message(f"Server hostname is {self.hostname}")
self.log_message(f"TLS Certificate File: {self.certfile}")
self.log_message(f"TLS Private Key File: {self.keyfile}")
certificate_options = GeminiCertificateOptions(
certfile=self.certfile,
keyfile=self.keyfile,
cafile=self.cafile,
capath=self.capath,
)
interfaces = [self.host] if self.host else ["0.0.0.0", "::"]
for interface in interfaces:
endpoint = self.endpoint_class(
reactor=self.reactor,
port=self.port,
sslContextFactory=certificate_options,
interface=interface,
)
endpoint.listen(self).addCallback(self.on_bind_interface)
self.reactor.run()

203
jetforce/tls.py Normal file
View File

@ -0,0 +1,203 @@
import base64
import datetime
import os
import tempfile
import typing
import OpenSSL
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from twisted.internet.ssl import CertificateOptions, TLSVersion
from twisted.python.randbytes import secureRandom
COMMON_NAME = x509.NameOID.COMMON_NAME
def inspect_certificate(cert: x509) -> dict:
"""
Extract useful fields from a x509 client certificate object.
"""
name_attrs = cert.subject.get_attributes_for_oid(COMMON_NAME)
common_name = name_attrs[0].value if name_attrs else ""
fingerprint_bytes = cert.fingerprint(hashes.SHA256())
fingerprint = base64.b64encode(fingerprint_bytes).decode()
not_before = cert.not_valid_before.strftime("%Y-%m-%dT%H:%M:%SZ")
not_after = cert.not_valid_after.strftime("%Y-%m-%dT%H:%M:%SZ")
serial_number = cert.serial_number
data = {
"common_name": common_name,
"fingerprint": fingerprint,
"not_before": not_before,
"not_after": not_after,
"serial_number": serial_number,
}
return data
def generate_ad_hoc_certificate(hostname: str) -> typing.Tuple[str, str]:
"""
Utility function to generate an ad-hoc self-signed SSL certificate.
"""
certfile = os.path.join(tempfile.gettempdir(), f"{hostname}.crt")
keyfile = os.path.join(tempfile.gettempdir(), f"{hostname}.key")
if not os.path.exists(certfile) or not os.path.exists(keyfile):
backend = default_backend()
private_key = rsa.generate_private_key(65537, 2048, backend)
with open(keyfile, "wb") as fp:
# noinspection PyTypeChecker
key_data = private_key.private_bytes(
serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
fp.write(key_data)
common_name = x509.NameAttribute(COMMON_NAME, hostname)
subject_name = x509.Name([common_name])
not_valid_before = datetime.datetime.utcnow()
not_valid_after = not_valid_before + datetime.timedelta(days=365)
certificate = x509.CertificateBuilder(
subject_name=subject_name,
issuer_name=subject_name,
public_key=private_key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=not_valid_before,
not_valid_after=not_valid_after,
)
certificate = certificate.sign(private_key, hashes.SHA256(), backend)
with open(certfile, "wb") as fp:
# noinspection PyTypeChecker
cert_data = certificate.public_bytes(serialization.Encoding.PEM)
fp.write(cert_data)
return certfile, keyfile
class GeminiCertificateOptions(CertificateOptions):
"""
CertificateOptions is a factory function that twisted provides to do all of
the confusing PyOpenSSL configuration for you. Unfortunately, their built-in
class doesn't support the verify callback and some other options required
for implementing TOFU pinning, so I had to subclass and add custom behavior.
References:
https://twistedmatrix.com/documents/16.1.1/core/howto/ssl.html
https://github.com/urllib3/urllib3/blob/master/src/urllib3/util/ssl_.py
https://github.com/twisted/twisted/blob/trunk/src/twisted/internet/_sslverify.py
"""
def verify_callback(
self,
conn: OpenSSL.SSL.Connection,
cert: OpenSSL.crypto.X509,
errno: int,
depth: int,
preverify_ok: int,
) -> bool:
"""
Callback used by OpenSSL for client certificate verification.
preverify_ok returns the verification result that OpenSSL has already
obtained, so return this value to cede control to the underlying
library. Returning true will always allow client certificates, even if
they are self-signed.
"""
conn.verified = preverify_ok
return True
def proto_select_callback(
self, conn: OpenSSL.SSL.Connection, protocols: typing.List[bytes]
) -> bytes:
"""
Callback used by OpenSSL for ALPN support.
Return the first matching protocol in our list of acceptable values.
This is not currently being used but I may want to add support later.
"""
for p in self._acceptableProtocols:
if p in protocols:
return p
else:
return b""
def sni_callback(self, conn: OpenSSL.SSL.Connection) -> None:
"""
Callback used by OpenSSL for SNI support.
We can inspect the servername requested by the client using
conn.get_servername(), and attach an appropriate context using
conn.set_context(new_context). This is not currently being used but
I want to add support in the future.
"""
pass
def __init__(
self,
certfile: str,
keyfile: typing.Optional[str] = None,
cafile: typing.Optional[str] = None,
capath: typing.Optional[str] = None,
) -> None:
self.certfile = certfile
self.keyfile = keyfile
self.cafile = cafile
self.capath = capath
super().__init__(
raiseMinimumTo=TLSVersion.TLSv1_2,
requireCertificate=False,
fixBrokenPeers=True,
)
def _makeContext(self) -> OpenSSL.SSL.Context:
"""
Most of this code is copied directly from the parent class method.
"""
ctx = self._contextFactory(self.method)
ctx.set_options(self._options)
ctx.set_mode(self._mode)
ctx.use_certificate_file(self.certfile)
ctx.use_privatekey_file(self.keyfile or self.certfile)
for extraCert in self.extraCertChain:
ctx.add_extra_chain_cert(extraCert)
# Sanity check
ctx.check_privatekey()
if self.cafile or self.capath:
ctx.load_verify_locations(self.cafile, self.capath)
verify_flags = OpenSSL.SSL.VERIFY_PEER
if self.requireCertificate:
verify_flags |= OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.verifyOnce:
verify_flags |= OpenSSL.SSL.VERIFY_CLIENT_ONCE
ctx.set_verify(verify_flags, self.verify_callback)
if self.verifyDepth is not None:
ctx.set_verify_depth(self.verifyDepth)
if self.enableSessions:
session_name = secureRandom(32)
ctx.set_session_id(session_name)
ctx.set_cipher_list(self._cipherString.encode("ascii"))
self._ecChooser.configureECDHCurve(ctx)
if self._acceptableProtocols:
ctx.set_alpn_select_callback(self.proto_select_callback)
ctx.set_alpn_protos(self._acceptableProtocols)
ctx.set_tlsext_servername_callback(self.sni_callback)
return ctx

View File

@ -1,12 +1,11 @@
#!/usr/bin/env python3
"""
A dead-simple gemini client intended to be used for server development and testing.
./jetforce-client gemini://mozz.us
A very basic gemini client to use for testing server configurations.
"""
import argparse
import socket
import ssl
import sys
import urllib.parse
context = ssl.create_default_context()
@ -14,41 +13,45 @@ context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
def fetch(url: str, host: str = None, port: str = None):
def fetch(url, host=None, port=None, use_sni=False):
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme:
parsed_url = urllib.parse.urlparse(f"gemini://{url}")
host = host or parsed_url.hostname
port = port or parsed_url.port or 1965
sni = host if use_sni else None
with socket.create_connection((host, port)) as sock:
with context.wrap_socket(sock) as ssock:
with context.wrap_socket(sock, server_hostname=sni) as ssock:
ssock.sendall((url + "\r\n").encode())
fp = ssock.makefile("rb")
header = fp.readline().decode()
print(header)
body = fp.read().decode()
print(body)
fp = ssock.makefile("rb", buffering=0)
data = fp.read(1024)
while data:
sys.stdout.buffer.write(data)
data = fp.read(1024)
def run_client():
parser = argparse.ArgumentParser(description="A simple gemini client")
parser.add_argument("url")
parser.add_argument("--host", help="Server host")
parser.add_argument("--port", help="Server port")
parser.add_argument("--tls-certfile", help="Client certificate")
parser.add_argument("--tls-keyfile", help="Client private key")
parser.add_argument("--tls-alpn-protocol", help="Protocol for ALPN negotiation")
parser.add_argument(
"--host", help="Optional server to connect to, will default to the URL"
"--tls-enable-sni", action="store_true", help="Specify the hostname using SNI"
)
parser.add_argument(
"--port", help="Optional port to connect to, will default to the URL"
)
parser.add_argument("--certfile", help="Optional client certificate")
parser.add_argument("--keyfile", help="Optional client key")
args = parser.parse_args()
if args.tls_certfile:
context.load_cert_chain(args.tls_certfile, args.tls_keyfile)
if args.certfile:
context.load_cert_chain(args.certfile, args.keyfile)
if args.tls_alpn_protocol:
context.set_alpn_protocols([args.tls_alpn_protocol])
fetch(args.url, args.host, args.port)
fetch(args.url, args.host, args.port, args.tls_enable_sni)
if __name__ == "__main__":

View File

@ -1,721 +0,0 @@
#!/usr/bin/env python3
"""
A diagnostic tool for gemini servers.
This program will barrage your server with a series of requests in
an attempt to uncover unexpected behavior. Not all of these checks
adhere strictly to the gemini specification. Some of them are
general best practices, and some trigger undefined behavior. Results
should be taken with a grain of salt and analyzed on their own merit.
"""
import argparse
import contextlib
import datetime
import ipaddress
import socket
import ssl
import sys
import time
import typing
if sys.version_info < (3, 7):
sys.exit("Fatal Error: script requires Python 3.7+")
socket.setdefaulttimeout(5)
# ANSI color codes
A_BOLD = 1
FG_BLACK = 30
FG_RED = 31
FG_GREEN = 32
FG_YELLOW = 33
FG_BLUE = 34
FG_MAGENTA = 35
FG_CYAN = 36
FG_WHITE = 37
def colorize(text: str, color: int) -> str:
"""
Colorize text using ANSI escape codes.
"""
if sys.stdout.isatty():
return f"\033[{color}m{text}\033[0m"
else:
return text
def log(text: str, style: str = "normal") -> None:
"""
Print formatted text to stdout with optional styling.
"""
if style == "title":
text = colorize(text, A_BOLD)
if style == "warning":
text = colorize(f" {text}", FG_YELLOW)
elif style == "info":
text = colorize(f" {text}", FG_CYAN)
elif style == "success":
text = colorize(f"{text}", FG_GREEN)
elif style == "failure":
text = colorize(f" x {text}", FG_RED)
print(text)
def log_error(err: Exception) -> None:
"""
Helper method for formatting exceptions as error messages.
"""
if isinstance(err, Warning):
log(str(err), style="warning")
else:
log(str(err), style="failure")
class GeminiResponse:
def __init__(self, header):
self.charset: str = "utf-8"
self.header: str = header
self.body: str = ""
self.meta: typing.Optional[str] = None
self.status: typing.Optional[str] = None
self.mime: typing.Optional[str] = None
class BaseCheck:
"""
Abstract base class for implementing server checks.
"""
description: str = ""
def __init__(self, args: argparse.Namespace):
self.args = args
def run(self) -> None:
"""
Run the check and log any unhandled exceptions.
"""
log(f"[{self.__class__.__name__}] {self.__doc__}", style="title")
try:
self.check()
except Exception as e:
log_error(e)
log("")
def check(self) -> None:
raise NotImplemented
@property
def netloc(self):
if self.args.port == 1965:
return self.args.host
else:
return f"{self.args.host}:{self.args.port}"
def resolve_host(self, family: socket.AddressFamily) -> tuple:
"""
Retrieve the IP address and connection information for the host.
"""
host = self.args.host
port = self.args.port
type_ = socket.SOCK_STREAM
proto = socket.IPPROTO_TCP
addr_info = socket.getaddrinfo(host, port, family, type_, proto)
if not addr_info:
raise UserWarning(f"No {family} address found for host")
# Gemini IPv6
return addr_info[0][4]
@contextlib.contextmanager
def connection(
self, context: typing.Optional[ssl.SSLContext] = None
) -> ssl.SSLSocket:
"""
Setup an unverified TLS socket connection with the host.
"""
if context is None:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection(
(self.args.host, self.args.port), timeout=5
) as sock:
with context.wrap_socket(sock, server_hostname = self.netloc) as ssock:
yield ssock
def make_request(self, url: str) -> GeminiResponse:
"""
Send the request verbatim to the server and parse the response bytes.
"""
log("Requesting URL")
log(repr(url), style="info")
with self.connection() as sock:
sock.sendall(url.encode(errors="surrogateescape"))
fp = sock.makefile("rb")
header = fp.readline().decode()
log("Response header")
log(repr(header), style="info")
response = GeminiResponse(header)
try:
response.status, response.meta = header.strip().split(maxsplit=1)
except ValueError:
return response
if response.status.startswith("2"):
meta_parts = [part.strip() for part in response.meta.split(";")]
response.mime = meta_parts[0]
for part in meta_parts[1:]:
if part.lower().startswith("charset="):
response.charset = part[8:]
response.body = fp.read().decode(response.charset)
return response
def assert_success(self, response: GeminiResponse) -> None:
"""
Helper method to check if a response was successful.
"""
log("Status should return a success code (20 SUCCESS)")
style = "success" if response.status == "20" else "failure"
log(f"Received status of {response.status!r}", style)
def assert_permanent_failure(self, response: GeminiResponse) -> None:
"""
Helper method to assert that a response returned a permanent.
"""
log("Status should return a failure code (5X PERMANENT FAILURE)")
style = "success" if response.status.startswith("5") else "failure"
log(f"Received status of {response.status!r}", style)
def assert_proxy_refused(self, response: GeminiResponse) -> None:
"""
Helper method to assert that a response returned a permanent.
"""
log("Status should return a failure code (53 PROXY REQUEST REFUSED)")
style = "success" if response.status == "53" else "failure"
log(f"Received status of {response.status!r}", style)
def assert_bad_request(self, response: GeminiResponse) -> None:
"""
Helper method to assert that a response returned a permanent.
"""
log("Status should return a failure code (59 BAD REQUEST)")
style = "success" if response.status == "59" else "failure"
log(f"Received status of {response.status!r}", style)
class IPv4Address(BaseCheck):
"""Establish a connection over an IPv4 address"""
def check(self):
log(f"Looking up IPv4 address for {self.args.host!r}")
addr = self.resolve_host(socket.AF_INET)
log(f"{addr[0]!r}", style="success")
log(f"Attempting to connect to {addr[0]}:{addr[1]}")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(addr)
sock.close()
log(f"Successfully established connection", style="success")
class IPv6Address(BaseCheck):
"""Establish a connection over an IPv6 address"""
def check(self) -> None:
log(f"Looking up IPv6 address for {self.args.host!r}")
addr = self.resolve_host(socket.AF_INET6)
if ipaddress.ip_address(addr[0]).ipv4_mapped:
raise UserWarning("Found IPv4-mapped address, skipping check")
log(f"{addr[0]!r}", style="success")
log(f"Attempting to connect to [{addr[0]}]:{addr[1]}")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.connect(addr)
sock.close()
log(f"Successfully established connection", style="success")
class TLSVersion(BaseCheck):
"""Server must negotiate at least TLS v1.2, ideally TLS v1.3"""
def check(self) -> None:
log(f"Checking client library")
log(f"{ssl.OPENSSL_VERSION!r}", style="info")
log("Determining highest supported TLS version")
with self.connection() as sock:
version = sock.version()
if version in ("SSLv2", "SSLv3", "TLSv1", "TLSv1.1"):
log(f"Negotiated {version}", style="failure")
elif version == "TLSv1.2":
log(f"Negotiated {version}", style="warning")
else:
log(f"Negotiated {version}", style="success")
class TLSClaims(BaseCheck):
"""Certificate claims must be valid"""
def check(self) -> None:
try:
# $ pip install cryptography
import cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID, ExtensionOID
except ImportError:
raise UserWarning("cryptography library not installed, skipping check")
with self.connection() as sock:
# Python refuses to parse a certificate unless the issuer is validated.
# Because many gemini servers use self-signed certs, we need to use
# a third-party library to parse the certs from their binary form.
der_x509 = sock.getpeercert(binary_form=True)
cert = default_backend().load_der_x509_certificate(der_x509)
now = datetime.datetime.utcnow()
log('Checking "Not Valid Before" timestamp')
style = "success" if cert.not_valid_before <= now else "failure"
log(f"{cert.not_valid_before} UTC", style)
log('Checking "Not Valid After" timestamp')
style = "success" if cert.not_valid_after >= now else "failure"
log(f"{cert.not_valid_after} UTC", style)
log("Checking subject claim matches server hostname")
subject = []
for cn in cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME):
subject.append(("commonName", cn.value))
subject_alt_name = []
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
except cryptography.x509.ExtensionNotFound:
pass
else:
for dns in ext.value.get_values_for_type(cryptography.x509.DNSName):
subject_alt_name.append(("DNS", dns))
for ip_address in ext.value.get_values_for_type(
cryptography.x509.IPAddress
):
subject_alt_name.append(("IP Address", ip_address))
cert_dict = {
"subject": (tuple(subject),),
"subjectAltName": tuple(subject_alt_name),
}
log(f"{cert_dict!r}", style="info")
ssl.match_hostname(cert_dict, self.args.host)
log(f"Hostname {self.args.host!r} matches claim", style="success")
class TLSVerified(BaseCheck):
"""Certificate should be self-signed or have a trusted issuer"""
def check(self) -> None:
log("Connecting over verified SSL socket")
context = ssl.create_default_context()
try:
with socket.create_connection((self.args.host, self.args.port)) as sock:
with context.wrap_socket(sock, server_hostname=self.args.host) as ssock:
ssock.sendall(f"gemini://{self.netloc}\r\n".encode())
except Exception as e:
if getattr(e, "verify_code", None) == 18:
log("Self-signed TLS certificate detected", style="warning")
else:
raise
else:
log("Established trusted TLS connection", style="success")
class TLSRequired(BaseCheck):
"""Non-TLS requests should be refused"""
def check(self) -> None:
log("Sending non-TLS request")
try:
with socket.create_connection((self.args.host, self.args.port)) as sock:
sock.sendall(f"gemini://{self.netloc}\r\n".encode())
fp = sock.makefile("rb")
header = fp.readline().decode()
if header:
log(f"Received unexpected response {header!r}", style="failure")
else:
log(f"Connection closed by server", style="success")
except Exception as e:
# A connection error is a valid response
log(f"{e!r}", style="success")
class ConcurrentConnections(BaseCheck):
"""Server should support concurrent connections"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\r\n"
log(f"Attempting to establish two connections")
with self.connection() as sock:
log("Opening socket 1", style="info")
sock.send(url[0].encode())
with self.connection() as sock2:
log("Opening socket 2", style="info")
sock2.sendall(url.encode())
log("Closing socket 2", style="info")
sock.sendall(url[1:].encode())
log("Closing socket 1", style="info")
log(f"Concurrent connections supported", style="success")
class Homepage(BaseCheck):
"""Request the gemini homepage"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\r\n"
response = self.make_request(url)
self.assert_success(response)
log('Mime type should be "text/gemini"')
style = "success" if response.mime == "text/gemini" else "failure"
log(f"{response.mime!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be non-empty")
style = "success" if response.body else "failure"
log(f"{response.body[:50]!r}", style)
log("Body should terminate with a newline")
style = "success" if response.body.endswith("\n") else "failure"
log(f"{response.body[-1:]!r}", style)
log('Body should use "\\r\\n" line endings')
bad_line = None
for line in response.body.splitlines(True):
if not line.endswith("\r\n"):
bad_line = line
break
if bad_line is None:
log("All lines end with '\\r\\n'", style="success")
else:
log(f"Invalid line ending {bad_line!r}", style="failure")
class HomepageRedirect(BaseCheck):
"""A URL with no trailing slash should redirect to the canonical resource"""
def check(self) -> None:
url = f"gemini://{self.netloc}\r\n"
response = self.make_request(url)
log("Status should return code 31 (REDIRECT PERMANENT)")
style = "success" if response.status == "31" else "failure"
log(f"{response.status!r}", style)
log('Meta should redirect to location "gemini://[hostname]/"')
style = "success" if response.meta == f"gemini://{self.netloc}/" else "failure"
log(f"{response.meta!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be empty")
style = "success" if response.body == "" else "failure"
log(f"{response.body[:50]!r}", style)
class PageNotFound(BaseCheck):
"""Request a gemini URL that does not exist"""
def check(self) -> None:
url = f"gemini://{self.netloc}/09pdsakjo73hjn12id78\r\n"
response = self.make_request(url)
log("Status should return code 51 (NOT FOUND)")
style = "success" if response.status == "51" else "failure"
log(f"{response.status!r}", style)
log('Header should end with "\\r\\n"')
style = "success" if response.header.endswith("\r\n") else "failure"
log(f"{response.header[-2:]!r}", style)
log("Body should be empty")
style = "success" if response.body == "" else "failure"
log(f"{response.body[:50]!r}", style)
class RequestMissingCR(BaseCheck):
"""A request without a <CR> should timeout"""
def check(self) -> None:
url = f"gemini://{self.netloc}/\n"
try:
response = self.make_request(url)
except Exception as e:
log("No response should be received")
log(f"{e}", style="success")
else:
log("No response should be received")
log(f"{response.status!r}", style="failure")
class URLIncludePort(BaseCheck):
"""Send the URL with the port explicitly defined"""
def check(self) -> None:
url = f"gemini://{self.args.host}:{self.args.port}/\r\n"
response = self.make_request(url)
self.assert_success(response)
class URLSchemeMissing(BaseCheck):
"""A URL without a scheme should be inferred as gemini"""
def check(self) -> None:
url = f"//{self.netloc}/\r\n"
response = self.make_request(url)
self.assert_success(response)
class URLByIPAddress(BaseCheck):
"""Send the URL using the IPv4 address"""
def check(self) -> None:
addr = self.resolve_host(socket.AF_INET)
url = f"gemini://{addr[0]}:{addr[1]}/\r\n"
response = self.make_request(url)
log("Verify that the status matches your desired behavior")
log(f"{response.status!r}", style="info")
class URLInvalidUTF8Byte(BaseCheck):
"""Send a URL containing a non-UTF8 byte sequence"""
def check(self) -> None:
non_utf8_character = "\udcdc" # Surrogate-escaped byte sequence
url = f"gemini://{self.netloc}/{non_utf8_character}\r\n"
try:
response = self.make_request(url)
except Exception:
response = None
log("Connection should either drop, or return 59 (BAD REQUEST)")
if response is None:
log("Connection closed without response", style="success")
else:
style = "success" if response.status == "59" else "failure"
log(f"{response.status!r}", style)
class URLMaxSize(BaseCheck):
"""Send a 1024 byte URL, the maximum allowed size"""
def check(self) -> None:
# Per the spec, the <CR><LF> are not included in the total size
base_url = f"gemini://{self.netloc}/"
buffer = "0" * (1024 - len(base_url.encode("utf-8")))
url = base_url + buffer + "\r\n"
response = self.make_request(url)
log("Status should return code 51 (NOT FOUND)")
style = "success" if response.status == "51" else "failure"
log(f"{response.status!r}", style)
class URLAboveMaxSize(BaseCheck):
"""Send a 1025 byte URL, above the maximum allowed size"""
def check(self) -> None:
# Per the spec, the <CR><LF> are not included in the total size
base_url = f"gemini://{self.netloc}/"
buffer = "0" * (1025 - len(base_url.encode("utf-8")))
url = base_url + buffer + "\r\n"
try:
response = self.make_request(url)
except Exception:
response = None
log("Connection should either drop, or return 59 (BAD REQUEST)")
if response is None:
log("Connection closed without response", style="success")
else:
style = "success" if response.status == "59" else "failure"
log(f"{response.status!r}", style)
class URLWrongPort(BaseCheck):
"""A URL with an incorrect port number should be rejected"""
def check(self) -> None:
url = f"gemini://{self.args.host}:443/\r\n"
response = self.make_request(url)
self.assert_proxy_refused(response)
class URLWrongHost(BaseCheck):
"""A URL with a foreign hostname should be rejected"""
def check(self) -> None:
url = f"gemini://wikipedia.org/\r\n"
response = self.make_request(url)
self.assert_proxy_refused(response)
class URLSchemeHTTP(BaseCheck):
"""Send a URL with an HTTP scheme"""
def check(self) -> None:
url = f"http://{self.netloc}/\r\n"
response = self.make_request(url)
self.assert_proxy_refused(response)
class URLSchemeHTTPS(BaseCheck):
"""Send a URL with an HTTPS scheme"""
def check(self) -> None:
url = f"https://{self.netloc}/\r\n"
response = self.make_request(url)
self.assert_proxy_refused(response)
class URLSchemeGopher(BaseCheck):
"""Send a URL with a Gopher scheme"""
def check(self) -> None:
url = f"gopher://{self.netloc}/\r\n"
response = self.make_request(url)
self.assert_proxy_refused(response)
class URLEmpty(BaseCheck):
"""Empty URLs should not be accepted by the server"""
def check(self) -> None:
url = f"\r\n"
response = self.make_request(url)
self.assert_bad_request(response)
class URLRelative(BaseCheck):
"""Relative URLs should not be accepted by the server"""
def check(self) -> None:
url = f"/\r\n"
response = self.make_request(url)
self.assert_bad_request(response)
class URLInvalid(BaseCheck):
"""Random text should not be accepted by the server"""
def check(self) -> None:
url = f"Hello Gemini!\r\n"
response = self.make_request(url)
self.assert_bad_request(response)
class URLDotEscape(BaseCheck):
"""A URL should not be able to escape the root using dot notation"""
def check(self) -> None:
url = f"gemini://{self.netloc}/../../\r\n"
response = self.make_request(url)
self.assert_permanent_failure(response)
# TODO: Test sending a transient client certificate
# TODO: Test with client pinned to TLS v1.1
CHECKS = [
IPv4Address,
IPv6Address,
TLSVersion,
TLSClaims,
TLSVerified,
TLSRequired,
ConcurrentConnections,
Homepage,
HomepageRedirect,
PageNotFound,
RequestMissingCR,
URLIncludePort,
URLSchemeMissing,
URLByIPAddress,
URLInvalidUTF8Byte,
URLMaxSize,
URLAboveMaxSize,
URLWrongPort,
URLWrongHost,
URLSchemeHTTP,
URLSchemeHTTPS,
URLSchemeGopher,
URLEmpty,
URLRelative,
URLInvalid,
URLDotEscape,
]
def build_epilog():
epilog = ["list of checks:"]
for check in CHECKS:
epilog.append(colorize(f" [{check.__name__}]", A_BOLD))
epilog.append(f" {check.__doc__}")
return "\n".join(epilog)
parser = argparse.ArgumentParser(
usage="%(prog)s host [port] [--help]",
description=__doc__,
epilog=build_epilog(),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("host", help="server hostname to connect to")
parser.add_argument(
"port",
nargs="?",
type=int,
default=1965,
help="server port to connect to (default: 1965)",
)
parser.add_argument("--checks", help="comma separated list of checks to apply")
parser.add_argument(
"--delay",
type=float,
default=2,
help="seconds to sleep between checks (default: 2)",
)
def run():
args = parser.parse_args()
if args.checks:
check_names = {cls.__name__: cls for cls in CHECKS}
check_list = []
for name in args.checks.split(","):
name = name.strip()
if name not in check_names:
raise ValueError(f"unknown check {name!r}")
check_list.append(check_names[name])
else:
check_list = CHECKS
log(f"Running gemini server diagnostics check against {args.host}:{args.port}")
log("...\n")
for check in check_list:
time.sleep(args.delay)
check(args).run()
log("Done!")
if __name__ == "__main__":
run()

BIN
logo.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

26
requirements.txt Normal file
View File

@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile
#
attrs==19.3.0 # via automat, service-identity, twisted
automat==20.2.0 # via twisted
cffi==1.14.0 # via cryptography
constantly==15.1.0 # via twisted
cryptography==2.9.2 # via pyopenssl, service-identity
hyperlink==19.0.0 # via twisted
idna==2.9 # via Jetforce (setup.py), hyperlink
incremental==17.5.0 # via twisted
pyasn1-modules==0.2.8 # via service-identity
pyasn1==0.4.8 # via pyasn1-modules, service-identity
pycparser==2.20 # via cffi
pyhamcrest==2.0.2 # via twisted
pyopenssl==19.1.0 # via Jetforce (setup.py)
service-identity==18.1.0 # via Jetforce (setup.py)
six==1.14.0 # via automat, cryptography, pyopenssl
twisted==20.3.0 # via Jetforce (setup.py)
zope.interface==5.1.0 # via twisted
# The following packages are considered to be unsafe in a requirements file:
# setuptools

Binary file not shown.

Before

Width:  |  Height:  |  Size: 697 KiB

View File

@ -10,20 +10,27 @@ def long_description():
setuptools.setup(
name="Jetforce",
version="0.2.2",
version="0.3.0b1",
url="https://github.com/michael-lazar/jetforce",
license="Other/Proprietary License",
author="Michael Lazar",
author_email="lazar.michael22@gmail.com",
description="An Experimental Gemini Server",
install_requires=[
"twisted>=20.3.0",
# Requirements below are used by twisted[security]
"service_identity",
"idna",
"pyopenssl",
],
long_description=long_description(),
long_description_content_type="text/markdown",
py_modules=["jetforce", "jetforce_client", "jetforce_diagnostics"],
packages=["jetforce", "jetforce.app"],
py_modules=["jetforce_client"],
entry_points={
"console_scripts": [
"jetforce=jetforce:run_server",
"jetforce=jetforce.__main__:main",
"jetforce-client=jetforce_client:run_client",
"jetforce-diagnostics=jetforce_diagnostics:run",
]
},
python_requires=">=3.7",
@ -31,7 +38,6 @@ setuptools.setup(
classifiers=[
"Environment :: Web Environment",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",