Fix not including trailing slashes in $PATH_INFO for CGI scripts. (#60)

* Fix not including trailing slashes in $PATH_INFO for CGI scripts.

* Fix type hint
This commit is contained in:
Michael Lazar 2021-05-11 20:17:18 -04:00 committed by GitHub
parent 7fd453e43c
commit e6fa299d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 145 additions and 51 deletions

View File

@ -4,8 +4,10 @@
#### Fixes #### Fixes
- Fix not including trailing slashes in $PATH_INFO for CGI scripts.
- Fix not sending the complete certificate chain for TLS certificates - Fix not sending the complete certificate chain for TLS certificates
that include it. that include it.
- Fix incorrect type signature for the EnvironDict type.
### v0.8.2 (2021-03-21) ### v0.8.2 (2021-03-21)

View File

@ -9,7 +9,7 @@ from urllib.parse import unquote, urlparse
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
EnvironDict = typing.Dict[str, object] EnvironDict = typing.Dict[str, typing.Any]
ResponseType = typing.Union[str, bytes, Deferred] ResponseType = typing.Union[str, bytes, Deferred]
ApplicationResponse = typing.Iterable[ResponseType] ApplicationResponse = typing.Iterable[ResponseType]
WriteStatusCallable = typing.Callable[[int, str], None] WriteStatusCallable = typing.Callable[[int, str], None]

View File

@ -6,8 +6,8 @@ import typing
import urllib.parse import urllib.parse
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from twisted.internet.task import deferLater
from .base import ( from .base import (
EnvironDict, EnvironDict,
@ -114,6 +114,10 @@ class StaticDirectoryApplication(JetforceApplication):
else: else:
request.environ["PATH_INFO"] = f"/{path_info}" request.environ["PATH_INFO"] = f"/{path_info}"
# Add back the trailing slash that was stripped off
if request.path.endswith("/"):
request.environ["PATH_INFO"] += "/"
return self.run_cgi_script(filesystem_path, request.environ) return self.run_cgi_script(filesystem_path, request.environ)
except OSError: except OSError:

View File

@ -1,26 +0,0 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile
#
attrs==19.3.0 # via automat, service-identity, twisted
automat==20.2.0 # via twisted
cffi==1.14.0 # via cryptography
constantly==15.1.0 # via twisted
cryptography==2.9.2 # via pyopenssl, service-identity
hyperlink==19.0.0 # via twisted
idna==2.9 # via Jetforce (setup.py), hyperlink
incremental==17.5.0 # via twisted
pyasn1-modules==0.2.8 # via service-identity
pyasn1==0.4.8 # via pyasn1-modules, service-identity
pycparser==2.20 # via cffi
pyhamcrest==2.0.2 # via twisted
pyopenssl==19.1.0 # via Jetforce (setup.py)
service-identity==18.1.0 # via Jetforce (setup.py)
six==1.14.0 # via automat, cryptography, pyopenssl
twisted==20.3.0 # via Jetforce (setup.py)
zope.interface==5.1.0 # via twisted
# The following packages are considered to be unsafe in a requirements file:
# setuptools

6
tests/data/cgi-bin/debug.py Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env python
import os
import json
print("20 application/json")
print(json.dumps(dict(os.environ)))

View File

@ -1,3 +0,0 @@
#!/bin/bash
echo "20 text/plain"
echo $QUERY_STRING

View File

@ -1,11 +1,14 @@
import json
import os import os
import ssl
import socket import socket
import ssl
import unittest import unittest
from threading import Thread from threading import Thread
from unittest import mock
from twisted.internet import reactor from twisted.internet import reactor
from jetforce import StaticDirectoryApplication, GeminiServer
from jetforce import GeminiServer, StaticDirectoryApplication
ROOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data") ROOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data")
@ -54,7 +57,7 @@ class FunctionalTestCase(unittest.TestCase):
cls.thread.join(timeout=5) cls.thread.join(timeout=5)
@classmethod @classmethod
def request(cls, data): def request(cls, data: str):
""" """
Send bytes to the server using a TCP/IP socket. Send bytes to the server using a TCP/IP socket.
""" """
@ -64,37 +67,145 @@ class FunctionalTestCase(unittest.TestCase):
with socket.create_connection((cls.server.host, cls.server.real_port)) as sock: with socket.create_connection((cls.server.host, cls.server.real_port)) as sock:
with context.wrap_socket(sock) as ssock: with context.wrap_socket(sock) as ssock:
ssock.sendall(data) ssock.sendall(data.encode(errors="surrogateescape"))
fp = ssock.makefile("rb") fp = ssock.makefile("rb")
return fp.read() return fp.read().decode(errors="surrogateescape")
@classmethod
def parse_cgi_resp(cls, response):
return json.loads(response.splitlines()[1])
def test_index(self): def test_index(self):
resp = self.request(b"gemini://localhost\r\n") resp = self.request("gemini://localhost\r\n")
self.assertEqual(resp, b"20 text/gemini\r\nJetforce rules!\n") self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_index_trailing_slash(self):
resp = self.request("gemini://localhost/\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_index_two_slashes(self):
resp = self.request("gemini://localhost//\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_index_slash_dot(self):
resp = self.request("gemini://localhost/.\r\n")
self.assertEqual(resp, "31 gemini://localhost/./\r\n")
def test_index_slash_dot_slash(self):
resp = self.request("gemini://localhost/./\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_index_filename(self):
resp = self.request("gemini://localhost/index.gmi\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_index_filename_escaped(self):
resp = self.request("gemini://localhost/inde%78.gmi\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_invalid_path(self): def test_invalid_path(self):
resp = self.request(b"gemini://localhost/invalid\r\n") resp = self.request("gemini://localhost/invalid\r\n")
self.assertEqual(resp, b"51 Not Found\r\n") self.assertEqual(resp, "51 Not Found\r\n")
def test_invalid_hostname(self): def test_invalid_hostname(self):
resp = self.request(b"gemini://example.com\r\n") resp = self.request("gemini://example.com\r\n")
self.assertEqual(resp, b"53 This server does not allow proxy requests\r\n") self.assertEqual(resp, "53 This server does not allow proxy requests\r\n")
def test_invalid_port(self): def test_invalid_port(self):
resp = self.request(b"gemini://localhost:1111\r\n") resp = self.request("gemini://localhost:1111\r\n")
self.assertEqual(resp, b"53 This server does not allow proxy requests\r\n") self.assertEqual(resp, "53 This server does not allow proxy requests\r\n")
def test_invalid_scheme(self):
resp = self.request("data://localhost\r\n")
self.assertEqual(resp, "53 This server does not allow proxy requests\r\n")
def test_invalid_userinfo(self):
resp = self.request("gemini://nancy@localhost\r\n")
self.assertEqual(resp, "59 Invalid URL\r\n")
def test_missing_scheme(self):
resp = self.request("//localhost\r\n")
self.assertEqual(resp, "59 Invalid URL\r\n")
def test_escape_root(self):
resp = self.request("gemini://localhost/..\r\n")
self.assertEqual(resp, "51 Not Found\r\n")
def test_escape_root_directory(self):
resp = self.request("gemini://localhost/../\r\n")
self.assertEqual(resp, "51 Not Found\r\n")
def test_escape_root_directory2(self):
resp = self.request("gemini://localhost/../.\r\n")
self.assertEqual(resp, "51 Not Found\r\n")
def test_escape_root_filename(self):
resp = self.request("gemini://localhost/../test_jetforce.py\r\n")
self.assertEqual(resp, "51 Not Found\r\n")
def test_directory_redirect(self): def test_directory_redirect(self):
resp = self.request(b"gemini://localhost/cgi-bin\r\n") resp = self.request("gemini://localhost/cgi-bin\r\n")
self.assertEqual(resp, b"31 gemini://localhost/cgi-bin/\r\n") self.assertEqual(resp, "31 gemini://localhost/cgi-bin/\r\n")
def test_directory(self): def test_directory(self):
resp = self.request(b"gemini://localhost/cgi-bin/\r\n") resp = self.request("gemini://localhost/cgi-bin/\r\n")
self.assertEqual(resp.splitlines(keepends=True)[0], b"20 text/gemini\r\n") resp = resp.splitlines(keepends=True)[0]
self.assertEqual(resp, "20 text/gemini\r\n")
def test_cgi_script(self): def test_directory_up(self):
resp = self.request(b"gemini://localhost/cgi-bin/echo.cgi?hello%20world\r\n") resp = self.request("gemini://localhost/cgi-bin/..\r\n")
self.assertEqual(resp, b"20 text/plain\r\nhello%20world\n") self.assertEqual(resp, "31 gemini://localhost/cgi-bin/../\r\n")
def test_directory_up_trailing_slash(self):
resp = self.request("gemini://localhost/cgi-bin/../\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_non_utf8(self):
resp = self.request("gemini://localhost/%AE\r\n")
self.assertEqual(resp, "51 Not Found\r\n")
def test_cgi(self):
resp = self.request("gemini://localhost/cgi-bin/debug.py\r\n")
resp = resp.splitlines(keepends=True)[0]
self.assertEqual(resp, "20 application/json\r\n")
def test_cgi_query(self):
resp = self.request("gemini://localhost/cgi-bin/debug.py?hello%20world\r\n")
data = self.parse_cgi_resp(resp)
self.assertEqual(data["QUERY_STRING"], "hello%20world")
self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py")
self.assertEqual(data["PATH_INFO"], "")
def test_cgi_root_trailing_slash(self):
resp = self.request("gemini://localhost/cgi-bin/debug.py/\r\n")
data = self.parse_cgi_resp(resp)
self.assertEqual(data["QUERY_STRING"], "")
self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py")
self.assertEqual(data["PATH_INFO"], "/")
def test_cgi_path_info(self):
resp = self.request("gemini://localhost/cgi-bin/debug.py/extra/info\r\n")
data = self.parse_cgi_resp(resp)
self.assertEqual(data["QUERY_STRING"], "")
self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py")
self.assertEqual(data["PATH_INFO"], "/extra/info")
def test_cgi_path_info_trailing_slash(self):
resp = self.request("gemini://localhost/cgi-bin/debug.py/extra/info/\r\n")
data = self.parse_cgi_resp(resp)
self.assertEqual(data["QUERY_STRING"], "")
self.assertEqual(data["SCRIPT_NAME"], "/cgi-bin/debug.py")
self.assertEqual(data["PATH_INFO"], "/extra/info/")
def test_hostname_punycode(self):
with mock.patch.object(self.server, "hostname", "xn--caf-dma.localhost"):
resp = self.request("gemini://xn--caf-dma.localhost\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
def test_hostname_unicode(self):
with mock.patch.object(self.server, "hostname", "xn--caf-dma.localhost"):
resp = self.request("gemini://café.localhost\r\n")
self.assertEqual(resp, "20 text/gemini\r\nJetforce rules!\n")
if __name__ == "__main__": if __name__ == "__main__":