diff --git a/CHANGELOG.md b/CHANGELOG.md index b2f6bc4..e395f6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,10 @@ #### Fixes +- Fix not including trailing slashes in $PATH_INFO for CGI scripts. - Fix not sending the complete certificate chain for TLS certificates that include it. +- Fix incorrect type signature for the EnvironDict type. ### v0.8.2 (2021-03-21) diff --git a/jetforce/app/base.py b/jetforce/app/base.py index b147bb4..b42e598 100644 --- a/jetforce/app/base.py +++ b/jetforce/app/base.py @@ -9,7 +9,7 @@ from urllib.parse import unquote, urlparse from twisted.internet.defer import Deferred -EnvironDict = typing.Dict[str, object] +EnvironDict = typing.Dict[str, typing.Any] ResponseType = typing.Union[str, bytes, Deferred] ApplicationResponse = typing.Iterable[ResponseType] WriteStatusCallable = typing.Callable[[int, str], None] diff --git a/jetforce/app/static.py b/jetforce/app/static.py index 75805e0..b2a4e91 100644 --- a/jetforce/app/static.py +++ b/jetforce/app/static.py @@ -6,8 +6,8 @@ import typing import urllib.parse from twisted.internet import reactor -from twisted.internet.task import deferLater from twisted.internet.defer import Deferred +from twisted.internet.task import deferLater from .base import ( EnvironDict, @@ -114,6 +114,10 @@ class StaticDirectoryApplication(JetforceApplication): else: request.environ["PATH_INFO"] = f"/{path_info}" + # Add back the trailing slash that was stripped off + if request.path.endswith("/"): + request.environ["PATH_INFO"] += "/" + return self.run_cgi_script(filesystem_path, request.environ) except OSError: diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 1706f7e..0000000 --- a/requirements.txt +++ /dev/null @@ -1,26 +0,0 @@ -# -# This file is autogenerated by pip-compile -# To update, run: -# -# pip-compile -# -attrs==19.3.0 # via automat, service-identity, twisted -automat==20.2.0 # via twisted -cffi==1.14.0 # via cryptography -constantly==15.1.0 # via twisted -cryptography==2.9.2 # via pyopenssl, service-identity -hyperlink==19.0.0 # via twisted -idna==2.9 # via Jetforce (setup.py), hyperlink -incremental==17.5.0 # via twisted -pyasn1-modules==0.2.8 # via service-identity -pyasn1==0.4.8 # via pyasn1-modules, service-identity -pycparser==2.20 # via cffi -pyhamcrest==2.0.2 # via twisted -pyopenssl==19.1.0 # via Jetforce (setup.py) -service-identity==18.1.0 # via Jetforce (setup.py) -six==1.14.0 # via automat, cryptography, pyopenssl -twisted==20.3.0 # via Jetforce (setup.py) -zope.interface==5.1.0 # via twisted - -# The following packages are considered to be unsafe in a requirements file: -# setuptools diff --git a/tests/data/cgi-bin/debug.py b/tests/data/cgi-bin/debug.py new file mode 100755 index 0000000..9eb8b4b --- /dev/null +++ b/tests/data/cgi-bin/debug.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +import os +import json + +print("20 application/json") +print(json.dumps(dict(os.environ))) diff --git a/tests/data/cgi-bin/echo.cgi b/tests/data/cgi-bin/echo.cgi deleted file mode 100755 index 9987d51..0000000 --- a/tests/data/cgi-bin/echo.cgi +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -echo "20 text/plain" -echo $QUERY_STRING \ No newline at end of file diff --git a/tests/test_jetforce.py b/tests/test_jetforce.py index 6e9bede..ec32b67 100644 --- a/tests/test_jetforce.py +++ b/tests/test_jetforce.py @@ -1,11 +1,14 @@ +import json import os -import ssl import socket +import ssl import unittest from threading import Thread +from unittest import mock from twisted.internet import reactor -from jetforce import StaticDirectoryApplication, GeminiServer + +from jetforce import GeminiServer, StaticDirectoryApplication ROOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data") @@ -54,7 +57,7 @@ class FunctionalTestCase(unittest.TestCase): cls.thread.join(timeout=5) @classmethod - def request(cls, data): + def request(cls, data: str): """ Send bytes to the server using a TCP/IP socket. """ @@ -64,37 +67,145 @@ class FunctionalTestCase(unittest.TestCase): with socket.create_connection((cls.server.host, cls.server.real_port)) as sock: with context.wrap_socket(sock) as ssock: - ssock.sendall(data) + ssock.sendall(data.encode(errors="surrogateescape")) fp = ssock.makefile("rb") - return fp.read() + return fp.read().decode(errors="surrogateescape") + + @classmethod + def parse_cgi_resp(cls, response): + return json.loads(response.splitlines()[1]) def test_index(self): - resp = self.request(b"gemini://localhost\r\n") - self.assertEqual(resp, b"20 text/gemini\r\nJetforce rules!\n") + resp = self.request("gemini://localhost\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_index_trailing_slash(self): + resp = self.request("gemini://localhost/\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_index_two_slashes(self): + resp = self.request("gemini://localhost//\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_index_slash_dot(self): + resp = self.request("gemini://localhost/.\r\n") + self.assertEqual(resp, "31 gemini://localhost/./\r\n") + + def test_index_slash_dot_slash(self): + resp = self.request("gemini://localhost/./\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_index_filename(self): + resp = self.request("gemini://localhost/index.gmi\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_index_filename_escaped(self): + resp = self.request("gemini://localhost/inde%78.gmi\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") def test_invalid_path(self): - resp = self.request(b"gemini://localhost/invalid\r\n") - self.assertEqual(resp, b"51 Not Found\r\n") + resp = self.request("gemini://localhost/invalid\r\n") + self.assertEqual(resp, "51 Not Found\r\n") def test_invalid_hostname(self): - resp = self.request(b"gemini://example.com\r\n") - self.assertEqual(resp, b"53 This server does not allow proxy requests\r\n") + resp = self.request("gemini://example.com\r\n") + self.assertEqual(resp, "53 This server does not allow proxy requests\r\n") def test_invalid_port(self): - resp = self.request(b"gemini://localhost:1111\r\n") - self.assertEqual(resp, b"53 This server does not allow proxy requests\r\n") + resp = self.request("gemini://localhost:1111\r\n") + self.assertEqual(resp, "53 This server does not allow proxy requests\r\n") + + def test_invalid_scheme(self): + resp = self.request("data://localhost\r\n") + self.assertEqual(resp, "53 This server does not allow proxy requests\r\n") + + def test_invalid_userinfo(self): + resp = self.request("gemini://nancy@localhost\r\n") + self.assertEqual(resp, "59 Invalid URL\r\n") + + def test_missing_scheme(self): + resp = self.request("//localhost\r\n") + self.assertEqual(resp, "59 Invalid URL\r\n") + + def test_escape_root(self): + resp = self.request("gemini://localhost/..\r\n") + self.assertEqual(resp, "51 Not Found\r\n") + + def test_escape_root_directory(self): + resp = self.request("gemini://localhost/../\r\n") + self.assertEqual(resp, "51 Not Found\r\n") + + def test_escape_root_directory2(self): + resp = self.request("gemini://localhost/../.\r\n") + self.assertEqual(resp, "51 Not Found\r\n") + + def test_escape_root_filename(self): + resp = self.request("gemini://localhost/../test_jetforce.py\r\n") + self.assertEqual(resp, "51 Not Found\r\n") def test_directory_redirect(self): - resp = self.request(b"gemini://localhost/cgi-bin\r\n") - self.assertEqual(resp, b"31 gemini://localhost/cgi-bin/\r\n") + resp = self.request("gemini://localhost/cgi-bin\r\n") + self.assertEqual(resp, "31 gemini://localhost/cgi-bin/\r\n") def test_directory(self): - resp = self.request(b"gemini://localhost/cgi-bin/\r\n") - self.assertEqual(resp.splitlines(keepends=True)[0], b"20 text/gemini\r\n") + resp = self.request("gemini://localhost/cgi-bin/\r\n") + resp = resp.splitlines(keepends=True)[0] + self.assertEqual(resp, "20 text/gemini\r\n") - def test_cgi_script(self): - resp = self.request(b"gemini://localhost/cgi-bin/echo.cgi?hello%20world\r\n") - self.assertEqual(resp, b"20 text/plain\r\nhello%20world\n") + def test_directory_up(self): + resp = self.request("gemini://localhost/cgi-bin/..\r\n") + self.assertEqual(resp, "31 gemini://localhost/cgi-bin/../\r\n") + + def test_directory_up_trailing_slash(self): + resp = self.request("gemini://localhost/cgi-bin/../\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_non_utf8(self): + resp = self.request("gemini://localhost/%AE\r\n") + self.assertEqual(resp, "51 Not Found\r\n") + + def test_cgi(self): + resp = self.request("gemini://localhost/cgi-bin/debug.py\r\n") + resp = resp.splitlines(keepends=True)[0] + self.assertEqual(resp, "20 application/json\r\n") + + def test_cgi_query(self): + resp = self.request("gemini://localhost/cgi-bin/debug.py?hello%20world\r\n") + data = self.parse_cgi_resp(resp) + self.assertEqual(data["QUERY_STRING"], "hello%20world") + self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py") + self.assertEqual(data["PATH_INFO"], "") + + def test_cgi_root_trailing_slash(self): + resp = self.request("gemini://localhost/cgi-bin/debug.py/\r\n") + data = self.parse_cgi_resp(resp) + self.assertEqual(data["QUERY_STRING"], "") + self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py") + self.assertEqual(data["PATH_INFO"], "/") + + def test_cgi_path_info(self): + resp = self.request("gemini://localhost/cgi-bin/debug.py/extra/info\r\n") + data = self.parse_cgi_resp(resp) + self.assertEqual(data["QUERY_STRING"], "") + self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py") + self.assertEqual(data["PATH_INFO"], "/extra/info") + + def test_cgi_path_info_trailing_slash(self): + resp = self.request("gemini://localhost/cgi-bin/debug.py/extra/info/\r\n") + data = self.parse_cgi_resp(resp) + self.assertEqual(data["QUERY_STRING"], "") + self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py") + self.assertEqual(data["PATH_INFO"], "/extra/info/") + + def test_hostname_punycode(self): + with mock.patch.object(self.server, "hostname", "xn--caf-dma.localhost"): + resp = self.request("gemini://xn--caf-dma.localhost\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") + + def test_hostname_unicode(self): + with mock.patch.object(self.server, "hostname", "xn--caf-dma.localhost"): + resp = self.request("gemini://café.localhost\r\n") + self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n") if __name__ == "__main__":