Compare commits

..

5 Commits

Author SHA1 Message Date
waldek 00349fabfe basic markdown and reverse proxy implementation 2023-03-06 09:17:59 +11:00
Michael Lazar 2c122cabb5 Merge conflict 2022-07-08 16:50:34 -04:00
Michael Lazar ae0c8300c5
Strict trailing slash (#65)
* Update pre-commit versions

* Fix for strict_trailing_slash

* Upping version
2022-07-08 16:49:36 -04:00
Michael Lazar 37add0c605
Strict trailing slash (#64)
* Update pre-commit versions

* Fix for strict_trailing_slash
2022-07-06 17:29:42 -04:00
Michael Lazar 93e0a3ddc7 Fixing build 2022-02-19 14:51:59 -05:00
11 changed files with 267 additions and 24 deletions

View File

@ -24,10 +24,9 @@ jobs:
run: |
pip install .
pip install mypy pytest
mypy --install-types --non-interactive
- name: Check types
run: |
mypy --ignore-missing-imports jetforce/
mypy --ignore-missing-imports --install-types --non-interactive jetforce/
- name: Run tests
run: |
pytest -v tests/

View File

@ -3,10 +3,10 @@ default_language_version:
default_stages: [commit, push]
repos:
- repo: https://github.com/ambv/black
rev: stable
rev: 22.6.0
hooks:
- id: black
- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.1.4
rev: v5.10.1
hooks:
- id: isort

View File

@ -2,6 +2,14 @@
### Unreleased
### v0.9.1 (2022-07-08)
### Fixes
- Fix ``strict_trailing_slash`` argument being applied as
`strict_port` when specified on a route decorator.
- Add support for python 3.10.
### v0.9.0 (2021-05-12)
#### Fixes

View File

@ -11,7 +11,8 @@ import sys
from .__version__ import __version__
from .app.base import RateLimiter
from .app.static import StaticDirectoryApplication
from .server import GeminiServer
from .app.static_md import StaticMarkdownDirectoryApplication
from .server import GeminiServer, ReverseProxyGeminiServer
if sys.version_info < (3, 7):
sys.exit("Fatal Error: jetforce requires Python 3.7+")
@ -104,28 +105,54 @@ group.add_argument(
default=None,
dest="rate_limit",
)
group.add_argument(
"--reverse-proxy",
help="Enable reverse proxy mode where TLS is handled by a reverse proxy such as Nginx",
action='store_true',
default=False,
dest="reverse_proxy",
)
group.add_argument(
"--markdown",
help="Enable markdown to gemini mode.",
action='store_true',
default=False,
dest="markdown",
)
def main() -> None:
args = parser.parse_args()
rate_limiter = RateLimiter(args.rate_limit) if args.rate_limit else None
app = StaticDirectoryApplication(
if args.markdown:
app_class = StaticMarkdownDirectoryApplication
else:
app_class = StaticDirectoryApplication
app = app_class(
root_directory=args.root_directory,
index_file=args.index_file,
cgi_directory=args.cgi_directory,
default_lang=args.default_lang,
rate_limiter=rate_limiter,
)
server = GeminiServer(
app=app,
host=args.host,
port=args.port,
hostname=args.hostname,
certfile=args.certfile,
keyfile=args.keyfile,
cafile=args.cafile,
capath=args.capath,
)
if args.reverse_proxy:
server = ReverseProxyGeminiServer(
app=app,
host=args.host,
port=args.port,
hostname=args.hostname,
)
else:
server = GeminiServer(
app=app,
host=args.host,
port=args.port,
hostname=args.hostname,
certfile=args.certfile,
keyfile=args.keyfile,
cafile=args.cafile,
capath=args.capath,
)
server.run()

View File

@ -1 +1 @@
__version__ = "0.9.0"
__version__ = "0.9.1"

View File

@ -296,6 +296,7 @@ class JetforceApplication:
scheme: str = "gemini",
hostname: typing.Optional[str] = None,
strict_hostname: bool = True,
strict_port: bool = True,
strict_trailing_slash: bool = False,
) -> typing.Callable[[RouteHandler], RouteHandler]:
"""
@ -308,7 +309,12 @@ class JetforceApplication:
return Response(Status.SUCCESS, 'text/plain', 'Hello world!')
"""
route_pattern = RoutePattern(
path, scheme, hostname, strict_hostname, strict_trailing_slash
path=path,
scheme=scheme,
hostname=hostname,
strict_hostname=strict_hostname,
strict_port=strict_port,
strict_trailing_slash=strict_trailing_slash,
)
def wrap(func: RouteHandler) -> RouteHandler:

154
jetforce/app/static_md.py Normal file
View File

@ -0,0 +1,154 @@
import typing
import os
import pathlib
import md2gemini
from .static import StaticDirectoryApplication
from .base import (
Request,
Response,
Status,
)
class StaticMarkdownDirectoryApplication(StaticDirectoryApplication):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def serve_static_file(self, request: Request) -> Response:
"""
Convert a URL into a filesystem path, and attempt to serve the file
or directory that is represented at that path.
"""
url_path = pathlib.Path(request.path.strip("/"))
filename = pathlib.Path(os.path.normpath(str(url_path)))
if filename.is_absolute() or str(filename).startswith(".."):
# Guard against breaking out of the directory
return Response(Status.NOT_FOUND, "Not Found")
if str(filename).startswith(self.cgi_directory):
# CGI needs special treatment to account for extra-path
# PATH_INFO component (RFC 3875 section 4.1.5)
# Identify the shortest path that is not a directory
for i in range(2, len(filename.parts) + 1):
# Split the path into SCRIPT_NAME and PATH_INFO
script_name = pathlib.Path(*filename.parts[:i])
path_info = pathlib.Path(*filename.parts[i:])
filesystem_path = self.root / script_name
try:
if not filesystem_path.is_file():
continue
elif not os.access(filesystem_path, os.R_OK):
continue
elif not os.access(filesystem_path, os.X_OK):
continue
else:
if str(script_name) == ".":
request.environ["SCRIPT_NAME"] = ""
else:
request.environ["SCRIPT_NAME"] = f"/{script_name}"
if str(path_info) == ".":
request.environ["PATH_INFO"] = ""
else:
request.environ["PATH_INFO"] = f"/{path_info}"
# Add back the trailing slash that was stripped off
if request.path.endswith("/"):
request.environ["PATH_INFO"] += "/"
return self.run_cgi_script(filesystem_path, request.environ)
except OSError:
# Filename too large, etc.
return Response(Status.NOT_FOUND, "Not Found")
filesystem_path = self.root / filename
try:
if not os.access(filesystem_path, os.R_OK):
# File not readable
return Response(Status.NOT_FOUND, "Not Found")
except OSError:
# Filename too large, etc.
return Response(Status.NOT_FOUND, "Not Found")
if filesystem_path.is_file():
return self.generate_response(filesystem_path)
elif filesystem_path.is_dir():
if request.path and not request.path.endswith("/"):
url_parts = urllib.parse.urlparse(request.url)
# noinspection PyProtectedMember
url_parts = url_parts._replace(path=request.path + "/")
return Response(Status.REDIRECT_PERMANENT, url_parts.geturl())
index_file = filesystem_path / self.index_file
if index_file.exists():
return self.generate_response(index_file)
mimetype = self.add_extra_parameters("text/gemini")
generator = self.list_directory(url_path, filesystem_path)
return Response(Status.SUCCESS, mimetype, generator)
else:
return Response(Status.NOT_FOUND, "Not Found")
def generate_response(self, filesystem_path: pathlib.Path) -> typing.Iterator:
mimetype = self.guess_mimetype(filesystem_path.name)
if mimetype == "text/markdown":
generator = self._load_md_file(filesystem_path)
mimetype = self.add_extra_parameters("text/gemini")
elif mimetype == "text/gemini":
generator = self.load_file(filesystem_path)
mimetype = self.add_extra_parameters("text/gemini")
else:
generator = self.load_file(filesystem_path)
mimetype = self.add_extra_parameters(mimetype)
return Response(Status.SUCCESS, mimetype, generator)
def _load_md_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]:
with filesystem_path.open("r") as fp:
gemini = md2gemini.md2gemini(fp.read(), links="paragraph")
gemini = gemini.encode()
chunks = int(len(gemini) / self.CHUNK_SIZE) + 1
for i in range(chunks):
start = self.CHUNK_SIZE * (i)
end = self.CHUNK_SIZE * (i + 1)
yield gemini[start:end]
def list_directory(
self, url_path: pathlib.Path, filesystem_path: pathlib.Path
) -> typing.Iterator[bytes]:
"""
Auto-generate a text/gemini document based on the contents of the file system.
"""
buffer = f"Directory: /{url_path}\r\n".encode()
if url_path.parent != url_path:
buffer += f"=>/{url_path.parent}\t..\r\n".encode()
for file in sorted(filesystem_path.iterdir()):
if file.name.startswith("."):
# Skip hidden directories/files that may contain sensitive info
continue
encoded_path = urllib.parse.quote(str(url_path / file.name))
if file.is_dir():
buffer += f"=>/{encoded_path}/\t{file.name}/\r\n".encode()
else:
if file.stem.isdigit():
tz = pytz.timezone("Europe/Brussels")
label = datetime.datetime.fromtimestamp(int(file.stem), tz=tz)
else:
label = f"{file.name}"
buffer += f"=>/{encoded_path}\t{label}\r\n".encode()
if len(buffer) >= self.CHUNK_SIZE:
data, buffer = buffer[: self.CHUNK_SIZE], buffer[self.CHUNK_SIZE :]
yield data
if buffer:
yield buffer

View File

@ -302,3 +302,30 @@ class GeminiProtocol(LineOnlyReceiver):
pass
else:
self.server.log_access(message)
class ReverseProxyGeminiProtocol(GeminiProtocol):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def build_environ(self) -> EnvironDict:
"""
Construct a dictionary that will be passed to the application handler.
Variable names (mostly) conform to the CGI spec defined in RFC 3875.
The TLS variable names borrow from the GLV-1.12556 server.
"""
url_parts = urllib.parse.urlparse(self.url)
environ = {
"GEMINI_URL": self.url,
"HOSTNAME": self.server.hostname,
"QUERY_STRING": url_parts.query,
"REMOTE_ADDR": self.client_addr.host,
"REMOTE_HOST": self.client_addr.host,
"SERVER_NAME": self.server.hostname,
"SERVER_PORT": self.server.port,
"SERVER_PROTOCOL": "GEMINI",
"SERVER_SOFTWARE": f"jetforce/{__version__}",
"client_certificate": None,
}
return environ

View File

@ -6,13 +6,13 @@ import typing
from twisted.internet import reactor as _reactor
from twisted.internet.base import ReactorBase
from twisted.internet.endpoints import SSL4ServerEndpoint
from twisted.internet.endpoints import SSL4ServerEndpoint, TCP4ServerEndpoint
from twisted.internet.protocol import Factory
from twisted.internet.tcp import Port
from .__version__ import __version__
from .app.base import ApplicationCallable
from .protocol import GeminiProtocol
from .protocol import GeminiProtocol, ReverseProxyGeminiProtocol
from .tls import GeminiCertificateOptions, generate_ad_hoc_certificate
if sys.stderr.isatty():
@ -136,3 +136,24 @@ class GeminiServer(Factory):
self.log_message(f"TLS Private Key File: {self.keyfile}")
self.initialize()
self.reactor.run()
class ReverseProxyGeminiServer(GeminiServer):
protocol_class = ReverseProxyGeminiProtocol
endpoint_class = TCP4ServerEndpoint
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, certfile=False)
def initialize(self) -> None:
"""
Install the server into the twisted reactor.
"""
interfaces = [self.host] if self.host else ["0.0.0.0", "::"]
for interface in interfaces:
endpoint = self.endpoint_class(
reactor=self.reactor,
port=self.port,
interface=interface,
)
endpoint.listen(self).addCallback(self.on_bind_interface)

View File

@ -4,7 +4,8 @@ import os
import tempfile
import typing
import OpenSSL
import OpenSSL.crypto
import OpenSSL.SSL
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
@ -120,9 +121,9 @@ class GeminiCertificateOptions(CertificateOptions):
mark the certificate as not trusted.
"""
if not hasattr(conn, "authorised"):
conn.authorised = preverify_ok
conn.authorised = preverify_ok # type: ignore
else:
conn.authorised *= preverify_ok
conn.authorised *= preverify_ok # type: ignore
return True

View File

@ -10,7 +10,7 @@ def long_description() -> str:
setuptools.setup(
name="Jetforce",
version="0.9.0",
version="0.9.1",
url="https://github.com/michael-lazar/jetforce",
license="Other/Proprietary License",
author="Michael Lazar",