Finally figured out twisted concurrency

This commit is contained in:
Michael Lazar 2020-05-17 23:08:16 -04:00
parent 2b44182d84
commit abea5601d7
3 changed files with 68 additions and 17 deletions

45
examples/counter.py Normal file
View File

@ -0,0 +1,45 @@
"""
An endpoint that streams incrementing numbers forever.
This is an example of how a jetforce application can respond with a generator
function instead of plain text/bytes. The server will iterate over the
generator and write the data to the socket in-between each iteration. This can
be useful if you want to serve a large response, like a binary file, without
loading the entire response into memory at once.
The server will schedule your application code to be run inside of a separate
thread, using twisted's built-in thread pool. So even though the counter
function contains a sleep(), it will not block the server from handling other
requests. Try requesting this endpoint over two connections simultaneously.
> jetforce-client gemini://localhost
> jetforce-client gemini://localhost
"""
import time
from jetforce import GeminiServer, JetforceApplication, Response, Status
def counter():
"""
Generator function that counts to .
"""
x = 0
while True:
time.sleep(1)
x += 1
yield f"{x}\r\n"
app = JetforceApplication()
@app.route()
def index(request):
return Response(Status.SUCCESS, "text/plain", counter())
if __name__ == "__main__":
server = GeminiServer(app)
server.run()

View File

@ -5,7 +5,8 @@ import typing
import urllib.parse import urllib.parse
from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import ensureDeferred
from twisted.internet.threads import deferToThread
from twisted.protocols.basic import LineOnlyReceiver from twisted.protocols.basic import LineOnlyReceiver
from .__version__ import __version__ from .__version__ import __version__
@ -64,10 +65,10 @@ class GeminiProtocol(LineOnlyReceiver):
connection without managing any state. connection without managing any state.
""" """
self.request = line self.request = line
return self._handle_request_noblock() return ensureDeferred(self._handle_request_noblock())
async def _handle_request_noblock(self):
@inlineCallbacks
def _handle_request_noblock(self):
try: try:
self.parse_header() self.parse_header()
except Exception: except Exception:
@ -79,9 +80,15 @@ class GeminiProtocol(LineOnlyReceiver):
try: try:
environ = self.build_environ() environ = self.build_environ()
for data in self.app(environ, self.write_status): response_generator = await deferToThread(
self.app, environ, self.write_status
)
while True:
try:
data = await deferToThread(response_generator.__next__)
self.write_body(data) self.write_body(data)
yield # Yield control to the event loop except StopIteration:
break
except Exception: except Exception:
self.write_status(Status.CGI_ERROR, "An unexpected error occurred") self.write_status(Status.CGI_ERROR, "An unexpected error occurred")
finally: finally:
@ -163,16 +170,14 @@ class GeminiProtocol(LineOnlyReceiver):
self.meta = meta self.meta = meta
self.response_buffer = f"{status}\t{meta}\r\n" self.response_buffer = f"{status}\t{meta}\r\n"
def write_body(self, data: typing.Union[str, bytes, None]) -> None: def write_body(self, data: typing.Union[str, bytes]) -> None:
""" """
Write bytes to the gemini response body. Write bytes to the gemini response body.
""" """
if data is None:
return
self.flush_status()
if isinstance(data, str): if isinstance(data, str):
data = data.encode() data = data.encode()
self.flush_status()
self.response_size += len(data) self.response_size += len(data)
self.transport.write(data) self.transport.write(data)

View File

@ -7,6 +7,7 @@ A dead-simple gemini client intended to be used for server development and testi
import argparse import argparse
import socket import socket
import ssl import ssl
import sys
import urllib.parse import urllib.parse
context = ssl.create_default_context() context = ssl.create_default_context()
@ -25,11 +26,11 @@ def fetch(url: str, host: str = None, port: str = None):
with socket.create_connection((host, port)) as sock: with socket.create_connection((host, port)) as sock:
with context.wrap_socket(sock) as ssock: with context.wrap_socket(sock) as ssock:
ssock.sendall((url + "\r\n").encode()) ssock.sendall((url + "\r\n").encode())
fp = ssock.makefile("rb") fp = ssock.makefile("rb", buffering=0)
header = fp.readline().decode() data = fp.read(1024)
print(header) while data:
body = fp.read().decode() sys.stdout.buffer.write(data)
print(body) data = fp.read(1024)
def run_client(): def run_client():