diff --git a/CHANGELOG.md b/CHANGELOG.md index d17e62a..b5e3c2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,20 +2,18 @@ ### v0.8.0 (Unreleased) -#### Spec Changes - - Added support for international domain names using IDN encoding. - -#### New Features - -- Several fixes & improvements to python type hinting coverage. +- Several improvements to internal python type hinting coverage. - Added a ``py.typed`` file to indicate project support for type hints. - -#### Bug Fixes - +- Optimized TCP packets when streaming directory listings. +- Optimized TCP packets when streaming large CGI responses. +- Improved error handling to catch invalid responses from CGI scripts. - Fixed a bug where TLS_CLIENT_AUTHORISED would sometimes be set to ``True``/``False`` instead of ``1``/``0``. - +- Fixed error handling edge case when the client killed the connection + before all data has been sent. A `CancelledError` exception will now + be raised internally instead of a ``ConnectionClosed`` exception. + ### v0.7.0 (2020-12-06) #### Spec Changes diff --git a/jetforce/app/static.py b/jetforce/app/static.py index 5ff3097..81fc680 100644 --- a/jetforce/app/static.py +++ b/jetforce/app/static.py @@ -1,4 +1,3 @@ -import codecs import mimetypes import os import pathlib @@ -6,6 +5,10 @@ import subprocess import typing import urllib.parse +from twisted.internet import reactor +from twisted.internet.task import deferLater +from twisted.internet.defer import Deferred + from .base import ( EnvironDict, JetforceApplication, @@ -34,6 +37,12 @@ class StaticDirectoryApplication(JetforceApplication): # Chunk size for streaming files, taken from the twisted FileSender class CHUNK_SIZE = 2 ** 14 + # Length of time to defer while waiting for more data from a CGI script + CGI_POLLING_PERIOD = 0.05 + + # Maximum size in bytes of the first line of a server response + CGI_MAX_RESPONSE_HEADER_SIZE = 2048 + mimetypes: mimetypes.MimeTypes def __init__( @@ -157,27 +166,54 @@ class StaticDirectoryApplication(JetforceApplication): cgi_env = {k: str(v) for k, v in environ.items() if k.isupper()} cgi_env["GATEWAY_INTERFACE"] = "CGI/1.1" - # Decode the stream as unicode so we can parse the status line - # Use surrogateescape to preserve any non-UTF8 byte sequences. - out = subprocess.Popen( + proc = subprocess.Popen( [str(filesystem_path)], stdout=subprocess.PIPE, env=cgi_env, - bufsize=1, - universal_newlines=True, - errors="surrogateescape", + bufsize=0, ) - status_line = out.stdout.readline().strip() - status_parts = status_line.split(maxsplit=1) + status_line = proc.stdout.readline(self.CGI_MAX_RESPONSE_HEADER_SIZE) + if len(status_line) == self.CGI_MAX_RESPONSE_HEADER_SIZE: + # Too large response header line received from the CGI script. + return Response(Status.CGI_ERROR, "Unexpected Error") + + status_parts = status_line.decode().strip().split(maxsplit=1) if len(status_parts) != 2 or not status_parts[0].isdecimal(): + # Malformed header line received from the CGI script. return Response(Status.CGI_ERROR, "Unexpected Error") status, meta = status_parts + return Response(int(status), meta, self.cgi_body_generator(proc)) - # Re-encode the rest of the body as bytes - body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape") - return Response(int(status), meta, body) + def cgi_body_generator( + self, + proc: subprocess.Popen[bytes], + ) -> typing.Iterator[typing.Union[bytes, Deferred]]: + """ + Non-blocking read from the stdout of the CGI process and pipe it + to the socket transport. + """ + while True: + proc.poll() + + data = proc.stdout.read(self.CHUNK_SIZE) + if len(data) == self.CHUNK_SIZE: + # Send the chunk and yield control of the event loop + yield data + elif proc.returncode is None: + # We didn't get a full chunk's worth of data from the + # subprocess. Send what we have, but add a delay before + # attempting to read again to allow time for more bytes + # to buffer in stdout. + if data: + yield data + yield deferLater(reactor, self.CGI_POLLING_PERIOD) + else: + # Subprocess has finished, send everything that's left. + if data: + yield data + break def load_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]: """ @@ -196,9 +232,9 @@ class StaticDirectoryApplication(JetforceApplication): """ Auto-generate a text/gemini document based on the contents of the file system. """ - yield f"Directory: /{url_path}\r\n".encode() + buffer = f"Directory: /{url_path}]\r\n".encode() if url_path.parent != url_path: - yield f"=>/{url_path.parent}\t..\r\n".encode() + buffer += f"=>/{url_path.parent}\t..\r\n".encode() for file in sorted(filesystem_path.iterdir()): if file.name.startswith("."): @@ -207,9 +243,16 @@ class StaticDirectoryApplication(JetforceApplication): encoded_path = urllib.parse.quote(str(url_path / file.name)) if file.is_dir(): - yield f"=>/{encoded_path}/\t{file.name}/\r\n".encode() + buffer += f"=>/{encoded_path}/\t{file.name}/\r\n".encode() else: - yield f"=>/{encoded_path}\t{file.name}\r\n".encode() + buffer += f"=>/{encoded_path}\t{file.name}\r\n".encode() + + if len(buffer) >= self.CHUNK_SIZE: + data, buffer = buffer[: self.CHUNK_SIZE], buffer[self.CHUNK_SIZE :] + yield data + + if buffer: + yield buffer def guess_mimetype(self, filename: str) -> str: """ diff --git a/jetforce/protocol.py b/jetforce/protocol.py index 99b36a9..afdeea9 100644 --- a/jetforce/protocol.py +++ b/jetforce/protocol.py @@ -6,8 +6,7 @@ import typing import urllib.parse from twisted.internet.address import IPv4Address, IPv6Address -from twisted.internet.defer import Deferred, ensureDeferred -from twisted.internet.error import ConnectionClosed +from twisted.internet.defer import Deferred, ensureDeferred, CancelledError from twisted.internet.protocol import connectionDone from twisted.internet.task import deferLater from twisted.protocols.basic import LineOnlyReceiver @@ -40,6 +39,7 @@ class GeminiProtocol(LineOnlyReceiver): """ TIMESTAMP_FORMAT = "%d/%b/%Y:%H:%M:%S %z" + DEBUG = False client_addr: typing.Union[IPv4Address, IPv6Address] connected_timestamp: time.struct_time @@ -69,8 +69,7 @@ class GeminiProtocol(LineOnlyReceiver): This is invoked by twisted after the connection has been closed. """ if self._currently_deferred: - self._currently_deferred.errback(reason) - self._currently_deferred = None + self._currently_deferred.cancel() def lineReceived(self, line: bytes) -> Deferred: """ @@ -150,7 +149,8 @@ class GeminiProtocol(LineOnlyReceiver): response_generator = await self.track_deferred(response_generator) else: # Yield control of the event loop - await deferLater(self.server.reactor, 0) + deferred = deferLater(self.server.reactor, 0) + await self.track_deferred(deferred) for data in response_generator: if isinstance(data, Deferred): @@ -159,9 +159,9 @@ class GeminiProtocol(LineOnlyReceiver): else: self.write_body(data) # Yield control of the event loop - await deferLater(self.server.reactor, 0) - - except ConnectionClosed: + deferred = deferLater(self.server.reactor, 0) + await self.track_deferred(deferred) + except CancelledError: pass except Exception: self.server.log_message(traceback.format_exc()) @@ -172,6 +172,10 @@ class GeminiProtocol(LineOnlyReceiver): self.finish_connection() async def track_deferred(self, deferred: Deferred) -> typing.Union[str, bytes]: + """ + Keep track of the deferred that we're waiting on so we can send an + error back to it if the connection is abruptly killed. + """ self._currently_deferred = deferred try: return await deferred @@ -252,15 +256,20 @@ class GeminiProtocol(LineOnlyReceiver): self.meta = meta self.response_buffer = f"{status} {meta}\r\n" - def write_body(self, data: typing.Union[str, bytes]) -> None: + def write_body(self, data: typing.Union[str, bytes, None]) -> None: """ Write bytes to the gemini response body. """ + if data is None: + return + if isinstance(data, str): data = data.encode() self.flush_status() self.response_size += len(data) + if self.DEBUG: + print(f"Writing body: {len(data)} bytes") self.transport.write(data) def flush_status(self) -> None: @@ -270,6 +279,8 @@ class GeminiProtocol(LineOnlyReceiver): if self.response_buffer and not self.response_size: data = self.response_buffer.encode() self.response_size += len(data) + if self.DEBUG: + print(f"Writing status: {len(data)} bytes") self.transport.write(data) self.response_buffer = ""