In our last post we created a queue system to offload processing of our raw videos from the API. But, in doing so, we made it so the API returns the response well before the video is done processing.
Now, we need a way to communicate back to the client when a video is done processing. We'll achieve this by taking advantage of a Postgres' LISTEN directive.
We're going to create a trigger that watches for updates to the processing_status
column on the videos
table
we created. Then, we'll create a websockets connection from client to server, and when we receive a notification
from Postgres for the requested video, we pass that notification to the websockets client.
Adding a notify trigger
First, we'll need to create a migration that adds a trigger to the videos
tables' processing_status
column. Whenever that column
gets updated, we want Postgres to notify us.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
If you're using sqlx, you can run this with sqlx migrate run
Setting up websockets
Next, we'll want to establish a means of communication between our API and the web client. For this tutorial, we're using axum and askama, but this should work with any frontend and any framework that supports websockets. The reason we're using websockets is mainly so we can push an event from the server to the client, instead of making the client request the event.
We'll create a new route and handler for the websockets connection called /events
|
|
|
And then we create a handler that upgrades the websocket connection and passes it to another handler function that can utilize the socket.
|
|
|
|
|
|
Alright, admittedly this part is a little confusing. But bear with me 🐻. What we need to do is create two "connections", and then
link those connections together. One connection will be communicating with Postgres and listening for notifications.
When it receives a notification, it'll parse it and pass it to the websockets connection, which will then push it to the client.
We could also skip having the ev_
and just pass the events directly to websockets, but
doing it this way allows us to extend it later for different kind of events. The ultimate goal is for this
to be a event stream of multiple kinds of events.
We'll start by defining the senders/receivers, one for the websockets connection (using the socket) and another one for Postgres using a broadcast channel.
Note: Broadcast sends all messages to every receiver. You'll want to probably modify some of this behavior for production settings
|
|
|
|
|
|
|
|
|
|
|
|
|
Okay, now what we'll do is set up our ev_
recevier/sender to communicate with Postgres. Using tokio, we'll spawn
this in a task so it can run concurrently with our websockets sender/receiver.
When we receive a notification from Postgres we'll also want to parse it into our Event
struct
so we can serialize it for the client.
We're going to take advantage of sqlx PgListener. This gives us some niceties like auto-reconnecting if the connection dies.
If you're using sql directly, you can use the Listen feature.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
If you run this as is you should see the debug statement. Now we just need to setup
the ev_receiver
to pass the events to the websockets channel.
We'll spawn another task that will listen to received events and send them to our websockets clients. We'll also add the receiver from the websockets client in case we later need some client communications.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Lastly, we'll use the tokio::select!
macro to exit if any of our connections complete.
|
|
|
|
|
And that's all there is to it! We now have a events handler that can stream events from various sources back to our websockets client. Here's the complete function
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Here's the full repo with all the functionality in this series if you want a complete example.