From c794384248eb13d8392882de1d77ba13f1717251 Mon Sep 17 00:00:00 2001 From: Michael Lazar Date: Mon, 25 May 2020 23:40:15 -0400 Subject: [PATCH] Drop threading by default and add proper support for async applications --- examples/counter.py | 78 ++++++++++++++++++++++++++++++++------- jetforce/app/base.py | 12 +++--- jetforce/app/composite.py | 4 +- jetforce/protocol.py | 42 ++++++++++++--------- jetforce_client.py | 1 + 5 files changed, 98 insertions(+), 39 deletions(-) diff --git a/examples/counter.py b/examples/counter.py index fbec028..ce57e48 100644 --- a/examples/counter.py +++ b/examples/counter.py @@ -6,37 +6,87 @@ function instead of plain text/bytes. The server will iterate over the generator and write the data to the socket in-between each iteration. This can be useful if you want to serve a large response, like a binary file, without loading the entire response into memory at once. - -The server will schedule your application code to be run inside of a separate -thread, using twisted's built-in thread pool. So even though the counter -function contains a sleep(), it will not block the server from handling other -requests. Try requesting this endpoint over two connections simultaneously. - -> jetforce-client gemini://localhost -> jetforce-client gemini://localhost """ import time from jetforce import GeminiServer, JetforceApplication, Response, Status +from twisted.internet import reactor +from twisted.internet.task import deferLater +from twisted.internet.threads import deferToThread -def counter(): +def blocking_counter(): """ - Generator function that counts to ∞. + This is the simplest implementation of a blocking, synchronous generator. + + The calls to time.sleep(1) will run in the main twisted event loop and + block all other requests from processing. """ x = 0 while True: - time.sleep(1) x += 1 + time.sleep(1) yield f"{x}\r\n" +def threaded_counter(): + """ + This counter uses the twisted ThreadPool to invoke sleep() inside of a + separate thread. + + This avoids blocking the twisted event loop during the sleep() call. + It adds an overhead of setting up a thread for each iteration. It also + requires that your code be thread-safe, because more than one thread may + be running simultaneously in order to process separate requests. + """ + + def delayed_callback(x): + time.sleep(1) + return f"{x}\r\n" + + x = 0 + while True: + x += 1 + yield deferToThread(delayed_callback, x) + + +def deferred_counter(): + """ + This counter uses twisted's deferLater() to schedule calling the function + after a delay of one second. + + This is equivalent to using asyncio.sleep(1). It tells the twisted event + loop to "go do something else, and come back to run this callback after at + least one second has elapsed". The advantage is that it's non-blocking and + you don't need to worry about thread-safety because your callback will + eventually run in the main event loop. + """ + + def delayed_callback(x): + return f"{x}\r\n" + + x = 0 + while True: + x += 1 + yield deferLater(reactor, 1, delayed_callback, x) + + app = JetforceApplication() -@app.route() -def index(request): - return Response(Status.SUCCESS, "text/plain", counter()) +@app.route("/blocking") +def blocking(request): + return Response(Status.SUCCESS, "text/plain", blocking_counter()) + + +@app.route("/threaded") +def threaded(request): + return Response(Status.SUCCESS, "text/plain", threaded_counter()) + + +@app.route("/deferred") +def deferred(request): + return Response(Status.SUCCESS, "text/plain", deferred_counter()) if __name__ == "__main__": diff --git a/jetforce/app/base.py b/jetforce/app/base.py index 73fc320..74c2e68 100644 --- a/jetforce/app/base.py +++ b/jetforce/app/base.py @@ -4,6 +4,10 @@ import re import typing import urllib.parse +from twisted.internet.defer import Deferred + +ResponseType = typing.Union[str, bytes, Deferred] + class Status: """ @@ -73,9 +77,7 @@ class Response: status: int meta: str - body: typing.Union[ - None, bytes, str, typing.Iterable[typing.Union[bytes, str]] - ] = None + body: typing.Union[None, ResponseType, typing.Iterable[ResponseType]] = None @dataclasses.dataclass @@ -137,7 +139,7 @@ class JetforceApplication: def __call__( self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: + ) -> typing.Iterator[ResponseType]: try: request = Request(environ) except Exception: @@ -156,7 +158,7 @@ class JetforceApplication: response = callback(request, **callback_kwargs) send_status(response.status, response.meta) - if isinstance(response.body, (bytes, str)): + if isinstance(response.body, (bytes, str, Deferred)): yield response.body elif response.body: yield from response.body diff --git a/jetforce/app/composite.py b/jetforce/app/composite.py index 1f52c53..bcea6b2 100644 --- a/jetforce/app/composite.py +++ b/jetforce/app/composite.py @@ -1,6 +1,6 @@ import typing -from .base import Request, Status +from .base import Request, ResponseType, Status class CompositeApplication: @@ -30,7 +30,7 @@ class CompositeApplication: def __call__( self, environ: dict, send_status: typing.Callable - ) -> typing.Iterator[bytes]: + ) -> typing.Iterator[ResponseType]: try: request = Request(environ) except Exception: diff --git a/jetforce/protocol.py b/jetforce/protocol.py index 582c679..3863466 100644 --- a/jetforce/protocol.py +++ b/jetforce/protocol.py @@ -6,14 +6,17 @@ import typing import urllib.parse from twisted.internet.address import IPv4Address, IPv6Address -from twisted.internet.defer import ensureDeferred -from twisted.internet.threads import deferToThread +from twisted.internet.defer import ensureDeferred, maybeDeferred +from twisted.internet.task import deferLater from twisted.protocols.basic import LineOnlyReceiver from .__version__ import __version__ from .app.base import JetforceApplication, Status from .tls import inspect_certificate +if typing.TYPE_CHECKING: + from .server import GeminiServer + class GeminiProtocol(LineOnlyReceiver): """ @@ -44,7 +47,7 @@ class GeminiProtocol(LineOnlyReceiver): response_buffer: str response_size: int - def __init__(self, server: "GeminiServer", app: JetforceApplication): + def __init__(self, server: GeminiServer, app: JetforceApplication): self.server = server self.app = app @@ -70,21 +73,20 @@ class GeminiProtocol(LineOnlyReceiver): This method is implemented using an async coroutine, which has been supported by twisted since python 3.5 by wrapping the method in - ensureDeferred(). Twisted + coroutines is a bitch to figure out, but - once it clicks it really does turn out to be an elegant solution. + ensureDeferred(). - Any time that we call into the application code, we wrap the call with - deferToThread() which will execute the code in a separate thread using - twisted's thread pool. deferToThread() will return a future object - that we can then `await` to get the result when the thread finishes. - This is important because we don't want application code to block the - twisted event loop from serving other requests at the same time. + There are two places that we call into the "application" code: - In the future, I would like to add the capability for applications to - implement proper coroutines that can call `await` on directly without - needing to wrap them in threads. Conceptually, this shouldn't be too - difficult, but it will require implementing an alternate version of - the JetforceApplication that's async-compatible. + 1. The initial invoking of app(environ, write_callback) which will + return an iterable. + 2. Every time that we call next() on the iterable to retrieve bytes to + write to the response body. + + In both of these places, the app can either return the result directly, + or it can return a "deferred" object, which is twisted's version of an + asyncio future. The server will await on the result of this deferred, + which yields control of the event loop for other requests to be handled + concurrently. """ try: self.parse_header() @@ -98,13 +100,17 @@ class GeminiProtocol(LineOnlyReceiver): try: environ = self.build_environ() - response_generator = await deferToThread( + response_generator = await maybeDeferred( self.app, environ, self.write_status ) + # Yield control of the event loop + await deferLater(self.server.reactor, 0) while True: try: - data = await deferToThread(response_generator.__next__) + data = await maybeDeferred(response_generator.__next__) self.write_body(data) + # Yield control of the event loop + await deferLater(self.server.reactor, 0) except StopIteration: break except Exception: diff --git a/jetforce_client.py b/jetforce_client.py index 8d5b977..e673b03 100755 --- a/jetforce_client.py +++ b/jetforce_client.py @@ -29,6 +29,7 @@ def fetch(url, host=None, port=None, use_sni=False): data = fp.read(1024) while data: sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() data = fp.read(1024)