First attempt at CGI support.
This commit is contained in:
parent
65c8a6e47c
commit
27c2ee12ae
|
@ -0,0 +1,9 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
This is a demo CGI script that prints a bunch of debug information about
|
||||
the request as an HTML document. Place it in the cgi-bin/ directory of your
|
||||
server.
|
||||
"""
|
||||
import cgi
|
||||
|
||||
cgi.test()
|
|
@ -7,7 +7,7 @@ import jetforce
|
|||
|
||||
|
||||
def echo(environ, send_status):
|
||||
url = environ["URL"]
|
||||
url = environ["GEMINI_URL"]
|
||||
send_status(jetforce.Status.SUCCESS, "text/gemini")
|
||||
yield f"Received path: {url}".encode()
|
||||
|
||||
|
|
91
jetforce.py
91
jetforce.py
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/env python3.7
|
||||
import argparse
|
||||
import asyncio
|
||||
import codecs
|
||||
import dataclasses
|
||||
import datetime
|
||||
import mimetypes
|
||||
|
@ -83,15 +84,16 @@ class Request:
|
|||
def __init__(self, environ: dict):
|
||||
self.environ = environ
|
||||
|
||||
url = urllib.parse.urlparse(environ["URL"])
|
||||
self.scheme = url.scheme
|
||||
self.hostname = url.hostname
|
||||
self.port = url.port
|
||||
self.path = url.path
|
||||
self.params = url.params
|
||||
self.query = url.query
|
||||
self.fragment = url.fragment
|
||||
self.url = url.geturl()
|
||||
self.url = environ["GEMINI_URL"]
|
||||
|
||||
url_parts = urllib.parse.urlparse(self.url)
|
||||
self.scheme = url_parts.scheme
|
||||
self.hostname = url_parts.hostname
|
||||
self.port = url_parts.port
|
||||
self.path = url_parts.path
|
||||
self.params = url_parts.params
|
||||
self.query = url_parts.query
|
||||
self.fragment = url_parts.fragment
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
|
@ -198,11 +200,18 @@ class StaticDirectoryApplication(JetforceApplication):
|
|||
directory listing will be auto-generated.
|
||||
"""
|
||||
|
||||
def __init__(self, directory: str = "/var/gemini", index_file: str = "index.gmi"):
|
||||
def __init__(
|
||||
self,
|
||||
root_directory: str = "/var/gemini",
|
||||
index_file: str = "index.gmi",
|
||||
cgi_directory: str = "cgi-bin/",
|
||||
):
|
||||
super().__init__()
|
||||
self.routes.append((RoutePattern(), self.serve_static_file))
|
||||
|
||||
self.root = pathlib.Path(directory).resolve(strict=True)
|
||||
self.root = pathlib.Path(root_directory).resolve(strict=True)
|
||||
self.cgi_directory = cgi_directory.strip("/") + "/"
|
||||
|
||||
self.index_file = index_file
|
||||
self.mimetypes = mimetypes.MimeTypes()
|
||||
self.mimetypes.add_type("text/gemini", ".gmi")
|
||||
|
@ -220,6 +229,9 @@ class StaticDirectoryApplication(JetforceApplication):
|
|||
filesystem_path = self.root / filename
|
||||
|
||||
if filesystem_path.is_file():
|
||||
if str(filename).startswith(self.cgi_directory):
|
||||
if os.access(filesystem_path, os.X_OK):
|
||||
return self.run_cgi_script(filesystem_path, request.environ)
|
||||
mimetype = self.guess_mimetype(filesystem_path.name)
|
||||
generator = self.load_file(filesystem_path)
|
||||
return Response(Status.SUCCESS, mimetype, generator)
|
||||
|
@ -233,6 +245,36 @@ class StaticDirectoryApplication(JetforceApplication):
|
|||
else:
|
||||
return Response(Status.NOT_FOUND, "Not Found")
|
||||
|
||||
def run_cgi_script(self, filesystem_path: pathlib.Path, environ: dict):
|
||||
script_name = str(filesystem_path)
|
||||
cgi_env = environ.copy()
|
||||
cgi_env["GATEWAY_INTERFACE"] = "GCI/1.1"
|
||||
cgi_env["SCRIPT_NAME"] = script_name
|
||||
|
||||
out = subprocess.Popen(
|
||||
[script_name],
|
||||
stdout=subprocess.PIPE,
|
||||
env=cgi_env,
|
||||
bufsize=1,
|
||||
universal_newlines=True,
|
||||
errors="surrogateescape",
|
||||
)
|
||||
|
||||
# Try to get the mimetype from the CGI response's content-type.
|
||||
# Discard all other response headers for now.
|
||||
mimetype = "text/plain"
|
||||
for line in out.stdout:
|
||||
header = line.lower().strip()
|
||||
if header.startswith("content-type:"):
|
||||
mimetype = header.split(":", maxsplit=1)[1]
|
||||
elif not header:
|
||||
# A empty line signals the end of the headers
|
||||
break
|
||||
|
||||
# Re-encode the rest of the body as bytes
|
||||
body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
|
||||
return Response(Status.SUCCESS, mimetype, body)
|
||||
|
||||
def load_file(self, filesystem_path: pathlib.Path):
|
||||
with filesystem_path.open("rb") as fp:
|
||||
data = fp.read(1024)
|
||||
|
@ -325,13 +367,23 @@ class GeminiRequestHandler:
|
|||
def build_environ(self) -> typing.Dict[str, typing.Any]:
|
||||
"""
|
||||
Construct a dictionary that will be passed to the application handler.
|
||||
|
||||
Variable names conform to the CGI spec defined in RFC 3875.
|
||||
"""
|
||||
url_parts = urllib.parse.urlparse(self.url)
|
||||
return {
|
||||
"SERVER_HOST": self.server.host,
|
||||
"SERVER_PORT": self.server.port,
|
||||
"REMOTE_ADDR": self.remote_addr,
|
||||
"GEMINI_URL": self.url,
|
||||
"HOSTNAME": self.server.hostname,
|
||||
"URL": self.url,
|
||||
"PATH_INFO": url_parts.path,
|
||||
"QUERY_STRING": url_parts.query,
|
||||
"SERVER_NAME": self.server.hostname,
|
||||
"SERVER_PORT": str(self.server.port),
|
||||
"SERVER_PROTOCOL": "GEMINI",
|
||||
"SERVER_SOFTWARE": f"jetforce/{__version__}",
|
||||
"REMOTE_ADDR": self.remote_addr,
|
||||
"REMOTE_HOST": self.remote_addr,
|
||||
"REMOTE_USER": "",
|
||||
"REQUEST_METHOD": "GET",
|
||||
}
|
||||
|
||||
async def parse_header(self) -> None:
|
||||
|
@ -522,14 +574,19 @@ def run_server() -> None:
|
|||
"""
|
||||
parser = command_line_parser()
|
||||
parser.add_argument(
|
||||
"--dir", help="Path on the filesystem to serve", default="/var/gemini"
|
||||
"--root-dir", help="Root path on the filesystem to serve", default="/var/gemini"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cgi-dir",
|
||||
help="CGI script folder, relative to the root path",
|
||||
default="cgi-bin/",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--index-file", help="The gemini directory index file", default="index.gmi"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
app = StaticDirectoryApplication(args.dir, args.index_file)
|
||||
app = StaticDirectoryApplication(args.root_dir, args.index_file, args.cgi_dir)
|
||||
server = GeminiServer(
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
|
|
Loading…
Reference in New Issue