Drop threading by default and add proper support for async applications

This commit is contained in:
Michael Lazar 2020-05-25 23:40:15 -04:00
parent d16afa8dc7
commit c794384248
5 changed files with 98 additions and 39 deletions

View File

@ -6,37 +6,87 @@ function instead of plain text/bytes. The server will iterate over the
generator and write the data to the socket in-between each iteration. This can generator and write the data to the socket in-between each iteration. This can
be useful if you want to serve a large response, like a binary file, without be useful if you want to serve a large response, like a binary file, without
loading the entire response into memory at once. loading the entire response into memory at once.
The server will schedule your application code to be run inside of a separate
thread, using twisted's built-in thread pool. So even though the counter
function contains a sleep(), it will not block the server from handling other
requests. Try requesting this endpoint over two connections simultaneously.
> jetforce-client gemini://localhost
> jetforce-client gemini://localhost
""" """
import time import time
from jetforce import GeminiServer, JetforceApplication, Response, Status from jetforce import GeminiServer, JetforceApplication, Response, Status
from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.threads import deferToThread
def counter(): def blocking_counter():
""" """
Generator function that counts to . This is the simplest implementation of a blocking, synchronous generator.
The calls to time.sleep(1) will run in the main twisted event loop and
block all other requests from processing.
""" """
x = 0 x = 0
while True: while True:
time.sleep(1)
x += 1 x += 1
time.sleep(1)
yield f"{x}\r\n" yield f"{x}\r\n"
def threaded_counter():
"""
This counter uses the twisted ThreadPool to invoke sleep() inside of a
separate thread.
This avoids blocking the twisted event loop during the sleep() call.
It adds an overhead of setting up a thread for each iteration. It also
requires that your code be thread-safe, because more than one thread may
be running simultaneously in order to process separate requests.
"""
def delayed_callback(x):
time.sleep(1)
return f"{x}\r\n"
x = 0
while True:
x += 1
yield deferToThread(delayed_callback, x)
def deferred_counter():
"""
This counter uses twisted's deferLater() to schedule calling the function
after a delay of one second.
This is equivalent to using asyncio.sleep(1). It tells the twisted event
loop to "go do something else, and come back to run this callback after at
least one second has elapsed". The advantage is that it's non-blocking and
you don't need to worry about thread-safety because your callback will
eventually run in the main event loop.
"""
def delayed_callback(x):
return f"{x}\r\n"
x = 0
while True:
x += 1
yield deferLater(reactor, 1, delayed_callback, x)
app = JetforceApplication() app = JetforceApplication()
@app.route() @app.route("/blocking")
def index(request): def blocking(request):
return Response(Status.SUCCESS, "text/plain", counter()) return Response(Status.SUCCESS, "text/plain", blocking_counter())
@app.route("/threaded")
def threaded(request):
return Response(Status.SUCCESS, "text/plain", threaded_counter())
@app.route("/deferred")
def deferred(request):
return Response(Status.SUCCESS, "text/plain", deferred_counter())
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -4,6 +4,10 @@ import re
import typing import typing
import urllib.parse import urllib.parse
from twisted.internet.defer import Deferred
ResponseType = typing.Union[str, bytes, Deferred]
class Status: class Status:
""" """
@ -73,9 +77,7 @@ class Response:
status: int status: int
meta: str meta: str
body: typing.Union[ body: typing.Union[None, ResponseType, typing.Iterable[ResponseType]] = None
None, bytes, str, typing.Iterable[typing.Union[bytes, str]]
] = None
@dataclasses.dataclass @dataclasses.dataclass
@ -137,7 +139,7 @@ class JetforceApplication:
def __call__( def __call__(
self, environ: dict, send_status: typing.Callable self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]: ) -> typing.Iterator[ResponseType]:
try: try:
request = Request(environ) request = Request(environ)
except Exception: except Exception:
@ -156,7 +158,7 @@ class JetforceApplication:
response = callback(request, **callback_kwargs) response = callback(request, **callback_kwargs)
send_status(response.status, response.meta) send_status(response.status, response.meta)
if isinstance(response.body, (bytes, str)): if isinstance(response.body, (bytes, str, Deferred)):
yield response.body yield response.body
elif response.body: elif response.body:
yield from response.body yield from response.body

View File

@ -1,6 +1,6 @@
import typing import typing
from .base import Request, Status from .base import Request, ResponseType, Status
class CompositeApplication: class CompositeApplication:
@ -30,7 +30,7 @@ class CompositeApplication:
def __call__( def __call__(
self, environ: dict, send_status: typing.Callable self, environ: dict, send_status: typing.Callable
) -> typing.Iterator[bytes]: ) -> typing.Iterator[ResponseType]:
try: try:
request = Request(environ) request = Request(environ)
except Exception: except Exception:

View File

@ -6,14 +6,17 @@ import typing
import urllib.parse import urllib.parse
from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import ensureDeferred from twisted.internet.defer import ensureDeferred, maybeDeferred
from twisted.internet.threads import deferToThread from twisted.internet.task import deferLater
from twisted.protocols.basic import LineOnlyReceiver from twisted.protocols.basic import LineOnlyReceiver
from .__version__ import __version__ from .__version__ import __version__
from .app.base import JetforceApplication, Status from .app.base import JetforceApplication, Status
from .tls import inspect_certificate from .tls import inspect_certificate
if typing.TYPE_CHECKING:
from .server import GeminiServer
class GeminiProtocol(LineOnlyReceiver): class GeminiProtocol(LineOnlyReceiver):
""" """
@ -44,7 +47,7 @@ class GeminiProtocol(LineOnlyReceiver):
response_buffer: str response_buffer: str
response_size: int response_size: int
def __init__(self, server: "GeminiServer", app: JetforceApplication): def __init__(self, server: GeminiServer, app: JetforceApplication):
self.server = server self.server = server
self.app = app self.app = app
@ -70,21 +73,20 @@ class GeminiProtocol(LineOnlyReceiver):
This method is implemented using an async coroutine, which has been This method is implemented using an async coroutine, which has been
supported by twisted since python 3.5 by wrapping the method in supported by twisted since python 3.5 by wrapping the method in
ensureDeferred(). Twisted + coroutines is a bitch to figure out, but ensureDeferred().
once it clicks it really does turn out to be an elegant solution.
Any time that we call into the application code, we wrap the call with There are two places that we call into the "application" code:
deferToThread() which will execute the code in a separate thread using
twisted's thread pool. deferToThread() will return a future object
that we can then `await` to get the result when the thread finishes.
This is important because we don't want application code to block the
twisted event loop from serving other requests at the same time.
In the future, I would like to add the capability for applications to 1. The initial invoking of app(environ, write_callback) which will
implement proper coroutines that can call `await` on directly without return an iterable.
needing to wrap them in threads. Conceptually, this shouldn't be too 2. Every time that we call next() on the iterable to retrieve bytes to
difficult, but it will require implementing an alternate version of write to the response body.
the JetforceApplication that's async-compatible.
In both of these places, the app can either return the result directly,
or it can return a "deferred" object, which is twisted's version of an
asyncio future. The server will await on the result of this deferred,
which yields control of the event loop for other requests to be handled
concurrently.
""" """
try: try:
self.parse_header() self.parse_header()
@ -98,13 +100,17 @@ class GeminiProtocol(LineOnlyReceiver):
try: try:
environ = self.build_environ() environ = self.build_environ()
response_generator = await deferToThread( response_generator = await maybeDeferred(
self.app, environ, self.write_status self.app, environ, self.write_status
) )
# Yield control of the event loop
await deferLater(self.server.reactor, 0)
while True: while True:
try: try:
data = await deferToThread(response_generator.__next__) data = await maybeDeferred(response_generator.__next__)
self.write_body(data) self.write_body(data)
# Yield control of the event loop
await deferLater(self.server.reactor, 0)
except StopIteration: except StopIteration:
break break
except Exception: except Exception:

View File

@ -29,6 +29,7 @@ def fetch(url, host=None, port=None, use_sni=False):
data = fp.read(1024) data = fp.read(1024)
while data: while data:
sys.stdout.buffer.write(data) sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
data = fp.read(1024) data = fp.read(1024)