From abea5601d74f80616d6989357d946666e651a9be Mon Sep 17 00:00:00 2001 From: Michael Lazar Date: Sun, 17 May 2020 23:08:16 -0400 Subject: [PATCH] Finally figured out twisted concurrency --- examples/counter.py | 45 ++++++++++++++++++++++++++++++++++++++++++++ jetforce/protocol.py | 29 ++++++++++++++++------------ jetforce_client.py | 11 ++++++----- 3 files changed, 68 insertions(+), 17 deletions(-) create mode 100644 examples/counter.py diff --git a/examples/counter.py b/examples/counter.py new file mode 100644 index 0000000..ec5f7b6 --- /dev/null +++ b/examples/counter.py @@ -0,0 +1,45 @@ +""" +An endpoint that streams incrementing numbers forever. + +This is an example of how a jetforce application can respond with a generator +function instead of plain text/bytes. The server will iterate over the +generator and write the data to the socket in-between each iteration. This can +be useful if you want to serve a large response, like a binary file, without +loading the entire response into memory at once. + +The server will schedule your application code to be run inside of a separate +thread, using twisted's built-in thread pool. So even though the counter +function contains a sleep(), it will not block the server from handling other +requests. Try requesting this endpoint over two connections simultaneously. + +> jetforce-client gemini://localhost +> jetforce-client gemini://localhost +""" + +import time + +from jetforce import GeminiServer, JetforceApplication, Response, Status + + +def counter(): + """ + Generator function that counts to ∞. + """ + x = 0 + while True: + time.sleep(1) + x += 1 + yield f"{x}\r\n" + + +app = JetforceApplication() + + +@app.route() +def index(request): + return Response(Status.SUCCESS, "text/plain", counter()) + + +if __name__ == "__main__": + server = GeminiServer(app) + server.run() diff --git a/jetforce/protocol.py b/jetforce/protocol.py index a083493..0b20233 100644 --- a/jetforce/protocol.py +++ b/jetforce/protocol.py @@ -5,7 +5,8 @@ import typing import urllib.parse from twisted.internet.address import IPv4Address, IPv6Address -from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import ensureDeferred +from twisted.internet.threads import deferToThread from twisted.protocols.basic import LineOnlyReceiver from .__version__ import __version__ @@ -64,10 +65,10 @@ class GeminiProtocol(LineOnlyReceiver): connection without managing any state. """ self.request = line - return self._handle_request_noblock() + return ensureDeferred(self._handle_request_noblock()) + + async def _handle_request_noblock(self): - @inlineCallbacks - def _handle_request_noblock(self): try: self.parse_header() except Exception: @@ -79,9 +80,15 @@ class GeminiProtocol(LineOnlyReceiver): try: environ = self.build_environ() - for data in self.app(environ, self.write_status): - self.write_body(data) - yield # Yield control to the event loop + response_generator = await deferToThread( + self.app, environ, self.write_status + ) + while True: + try: + data = await deferToThread(response_generator.__next__) + self.write_body(data) + except StopIteration: + break except Exception: self.write_status(Status.CGI_ERROR, "An unexpected error occurred") finally: @@ -163,16 +170,14 @@ class GeminiProtocol(LineOnlyReceiver): self.meta = meta self.response_buffer = f"{status}\t{meta}\r\n" - def write_body(self, data: typing.Union[str, bytes, None]) -> None: + def write_body(self, data: typing.Union[str, bytes]) -> None: """ Write bytes to the gemini response body. """ - if data is None: - return - - self.flush_status() if isinstance(data, str): data = data.encode() + + self.flush_status() self.response_size += len(data) self.transport.write(data) diff --git a/jetforce_client.py b/jetforce_client.py index 6d8411c..1b08b56 100755 --- a/jetforce_client.py +++ b/jetforce_client.py @@ -7,6 +7,7 @@ A dead-simple gemini client intended to be used for server development and testi import argparse import socket import ssl +import sys import urllib.parse context = ssl.create_default_context() @@ -25,11 +26,11 @@ def fetch(url: str, host: str = None, port: str = None): with socket.create_connection((host, port)) as sock: with context.wrap_socket(sock) as ssock: ssock.sendall((url + "\r\n").encode()) - fp = ssock.makefile("rb") - header = fp.readline().decode() - print(header) - body = fp.read().decode() - print(body) + fp = ssock.makefile("rb", buffering=0) + data = fp.read(1024) + while data: + sys.stdout.buffer.write(data) + data = fp.read(1024) def run_client():