diff --git a/CHANGELOG.md b/CHANGELOG.md index ef85427..6a4e942 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,9 @@ - If a gemini response returns a twisted.Deferred object, the errback will now be invoked when the TCP connection is closed. - +- Added a new example that demonstrates streaming data to client connections + (examples/chatroom.py). + ### v0.4.0 (2020-06-09) #### Features diff --git a/examples/chatroom.py b/examples/chatroom.py new file mode 100644 index 0000000..2c2e1d2 --- /dev/null +++ b/examples/chatroom.py @@ -0,0 +1,149 @@ +""" +A chatroom that will hold client connections open forever and stream messages +in real-time. + +This example demonstrates how you can setup a request handler to return a +Deferred object instead of plain text/bytes. The deferred will wait for an +event to trigger it (in this case, a new message being posted to the /submit +endpoint), and at that point it will send the data to the client. An error +callback is also added to the deferred object, which will be triggered if the +client closes the connection prematurely. + +This demo requires a gemini client that can stream text to the user without +waiting for the whole request to complete first. The jetforce-client tool can +do this, but most other gemini clients probably won't be able to handle +streaming. +""" +from collections import deque +from datetime import datetime + +from jetforce import GeminiServer, JetforceApplication, Response, Status +from twisted.internet.defer import AlreadyCalledError, Deferred + + +class MessageQueue: + def __init__(self, filename): + self.listeners = [] + + # Keep the most recent 100 messages in memory for efficiency, and + # persist *all* messages to a plain text file. + self.history_log = deque(maxlen=100) + self.filename = filename + self.load_history() + + def load_history(self): + try: + with open(self.filename) as fp: + for line in fp: + self.history_log.append(line) + except OSError: + pass + + def update_history(self, message): + self.history_log.append(message) + with open(self.filename, "a") as fp: + fp.write(message) + + def publish(self, message): + message = f"[{datetime.utcnow():%Y-%m-%dT%H:%M:%SZ}] {message}\n" + self.update_history(message) + + # Stream the message to all open client connections + listeners = self.listeners + self.listeners = [] + for listener in listeners: + try: + listener.callback(message) + except AlreadyCalledError: + # The connection has disconnected, ignore it + pass + + def subscribe(self): + # Register a deferred response that will trigger whenever the next + # message is published to the queue + d = Deferred() + self.listeners.append(d) + return d + + +queue = MessageQueue("/tmp/jetforce_chat.txt") + +app = JetforceApplication() + + +HOMEPAGE = r""" +# Gemini Chat + +``` The coolest chatroom in gemini! + _________________________________ +< The coolest chatroom in gemini! > + --------------------------------- + \ ^__^ + \ (oo)\_______ + (__)\ )\/\ + ||----w | + || || +``` + +A live, unmoderated chat room over gemini:// + +You can set a username by connecting with a client certificate. +Anonymous users will be identified by their IP addresses. + +=> /history +(view the last 100 messages) + +=> /stream +(open a long-running gemini connection that will stream in real-time) + +=> /submit +(submit a message to the room) +""".strip() + + +def get_username(request): + if "REMOTE_USER" in request.environ: + return request.environ["REMOTE_USER"] + else: + return request.environ["REMOTE_ADDR"] + + +@app.route("", strict_trailing_slash=False) +def index(request): + return Response(Status.SUCCESS, "text/gemini", HOMEPAGE) + + +@app.route("/history") +def history(request): + body = "".join(queue.history_log) + return Response(Status.SUCCESS, "text/plain", body) + + +@app.route("/submit") +def submit(request): + if request.query: + message = f"<{get_username(request)}> {request.query}" + queue.publish(message) + return Response(Status.INPUT, "Enter Message:") + + +@app.route("/stream") +def stream(request): + def on_disconnect(failure): + queue.publish(f"*** {get_username(request)} disconnected") + return failure + + def stream_forever(): + yield "Connection established...\n" + while True: + deferred = queue.subscribe() + deferred.addErrback(on_disconnect) + yield deferred + + queue.publish(f"*** {get_username(request)} joined") + return Response(Status.SUCCESS, "text/plain", stream_forever()) + + +if __name__ == "__main__": + server = GeminiServer(app) + server.run()