Logo for Sneaky Crow, LLC (vector art of a crow with a green baseball cap on)

Brain Juice

In our last post we created a queue system to offload processing of our raw videos from the API. But, in doing so, we made it so the API returns the response well before the video is done processing.

Now, we need a way to communicate back to the client when a video is done processing. We'll achieve this by taking advantage of a Postgres' LISTEN directive.

We're going to create a trigger that watches for updates to the processing_status column on the videos table we created. Then, we'll create a websockets connection from client to server, and when we receive a notification from Postgres for the requested video, we pass that notification to the websockets client.

Adding a notify trigger

First, we'll need to create a migration that adds a trigger to the videos tables' processing_status column. Whenever that column gets updated, we want Postgres to notify us.

1
-- First, let's create the function to notify status changes
2
CREATE OR REPLACE FUNCTION notify_upload_status()
3
RETURNS TRIGGER AS $$
4
BEGIN
5
    -- Use the processing_status column directly
6
    PERFORM pg_notify('upload_status', NEW.id || ',' || NEW.processing_status);
7
    RETURN NEW;
8
END;
9
$$ LANGUAGE plpgsql;
10

11
-- Now, let's create the trigger
12
CREATE TRIGGER upload_status_trigger
13
AFTER UPDATE OF processing_status ON videos
14
FOR EACH ROW
15
EXECUTE FUNCTION notify_upload_status();

If you're using sqlx, you can run this with sqlx migrate run

Setting up websockets

Next, we'll want to establish a means of communication between our API and the web client. For this tutorial, we're using axum and askama, but this should work with any frontend and any framework that supports websockets. The reason we're using websockets is mainly so we can push an event from the server to the client, instead of making the client request the event.

We'll create a new route and handler for the websockets connection called /events

1
// ... omitted for brevity
2
.route("/events", get(event_handler))
3
// ... omitted for brevity

And then we create a handler that upgrades the websocket connection and passes it to another handler function that can utilize the socket.

1
pub async fn event_handler(
2
    ws: WebSocketUpgrade,
3
    State(state): State<Arc<AppState>>,
4
) -> impl IntoResponse {
5
    ws.on_upgrade(|socket| handle_socket(socket, state))
6
}

Alright, admittedly this part is a little confusing. But bear with me 🐻. What we need to do is create two "connections", and then link those connections together. One connection will be communicating with Postgres and listening for notifications. When it receives a notification, it'll parse it and pass it to the websockets connection, which will then push it to the client. We could also skip having the ev_ and just pass the events directly to websockets, but doing it this way allows us to extend it later for different kind of events. The ultimate goal is for this to be a event stream of multiple kinds of events.

We'll start by defining the senders/receivers, one for the websockets connection (using the socket) and another one for Postgres using a broadcast channel.

Note: Broadcast sends all messages to every receiver. You'll want to probably modify some of this behavior for production settings

1
/// A websocket handler that passes upload_status events to the client
2
/// NOTE: This currently sends _all_ notifications to the subscribed client
3
/// TODO: Add authorization and filter only events the client cares about
4
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
5
    // Split our websocket sender/receiver for communication with the client
6
    let (mut sender, mut receiver) = socket.split();
7

8
    // Create a new sender/receiver for sending/receiving events from Postgres
9
    let (ev_sender, mut ev_receiver): (broadcast::Sender<Event>, broadcast::Receiver<Event>) =
10
        broadcast::channel(100); // You can probably reduce this, I just pulled it from an example
11

12
    // ... omitted for brevity
13
}

Okay, now what we'll do is set up our ev_ recevier/sender to communicate with Postgres. Using tokio, we'll spawn this in a task so it can run concurrently with our websockets sender/receiver.

When we receive a notification from Postgres we'll also want to parse it into our Event struct so we can serialize it for the client.

We're going to take advantage of sqlx PgListener. This gives us some niceties like auto-reconnecting if the connection dies.

If you're using sql directly, you can use the Listen feature.

1
// ... omitted for brevity
2
// Spawn a task for listening for notifications from Postgres on the upload_status trigger
3
tokio::spawn(async move {
4
    // Start the listener
5
    let mut listener = PgListener::connect_with(&state.db).await.unwrap();
6
    listener.listen("upload_status").await.unwrap();
7
    tracing::debug!("Listening for upload status changes");
8

9
    // When we receive a notification, parse it into an Event and pass it to the sender
10
    while let Ok(notification) = listener.recv().await {
11
        let payload = notification.payload();
12
        let parts: Vec<&str> = payload.split(',').collect();
13
        tracing::debug!(
14
            "Notification received! Payload: {:?}, Parts: {:?}",
15
            payload,
16
            parts
17
        );
18
        if parts.len() == 2 {
19
            let event = Event::VideoProcessingStatus {
20
                video_id: parts[0].to_string(),
21
                status: parts[1].to_string(),
22
            };
23
            let _ = ev_sender.send(event);
24
        }
25
    }
26
});
27
// ... omitted for brevity

If you run this as is you should see the debug statement. Now we just need to setup the ev_receiver to pass the events to the websockets channel.

We'll spawn another task that will listen to received events and send them to our websockets clients. We'll also add the receiver from the websockets client in case we later need some client communications.

1
// ... omitted for brevity
2
// Send events to the client
3
let mut send_task = tokio::spawn(async move {
4
    while let Ok(event) = ev_receiver.recv().await {
5
        tracing::debug!("Received parsed event, sending to client");
6
        let server_message = ServerMessage { event };
7
        if let Ok(json) = serde_json::to_string(&server_message) {
8
            if sender
9
                .send(axum::extract::ws::Message::Text(json))
10
                .await
11
                .is_err()
12
            {
13
                break;
14
            }
15
        }
16
    }
17
});
18
// Parse messages from the client
19
let mut recv_task = tokio::spawn(async move {
20
    while let Some(Ok(_message)) = receiver.next().await {
21
        // Process incoming message from client
22
    }
23
});
24
// ... omitted for brevity

Lastly, we'll use the tokio::select! macro to exit if any of our connections complete.

1
// If any of the tasks exit, abort the other one
2
tokio::select! {
3
    _ = (&mut send_task) => recv_task.abort(),
4
    _ = (&mut recv_task) => send_task.abort(),
5
};

And that's all there is to it! We now have a events handler that can stream events from various sources back to our websockets client. Here's the complete function

1

2
/// The serialized message we send back to the client
3
#[derive(Serialize)]
4
struct ServerMessage {
5
    event: Event,
6
}
7

8
/// A websocket handler that passes upload_status events to the client
9
/// NOTE: This currently sends _all_ notifications to the subscribed client
10
/// TODO: Add authorization and filter only events the client cares about
11
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
12
    // Split our websocket sender/receiver for communication with the client
13
    let (mut sender, mut receiver) = socket.split();
14

15
    // Create a new sender/receiver for sending/receiving events from Postgres
16
    let (ev_sender, mut ev_receiver): (broadcast::Sender<Event>, broadcast::Receiver<Event>) =
17
        broadcast::channel(100);
18

19
    // Spawn a task for listening for notifications from Postgres on the upload_status trigger
20
    tokio::spawn(async move {
21
        // Start the listener
22
        let mut listener = PgListener::connect_with(&state.db).await.unwrap();
23
        listener.listen("upload_status").await.unwrap();
24
        tracing::debug!("Listening for upload status changes");
25

26
        // When we receive a notification, parse it into an Event and pass it to the sender
27
        while let Ok(notification) = listener.recv().await {
28
            let payload = notification.payload();
29
            let parts: Vec<&str> = payload.split(',').collect();
30
            tracing::debug!(
31
                "Notification received! Payload: {:?}, Parts: {:?}",
32
                payload,
33
                parts
34
            );
35
            if parts.len() == 2 {
36
                let event = Event::VideoProcessingStatus {
37
                    video_id: parts[0].to_string(),
38
                    status: parts[1].to_string(),
39
                };
40
                let _ = ev_sender.send(event);
41
            }
42
        }
43
    });
44
    // Send events to the client
45
    let mut send_task = tokio::spawn(async move {
46
        while let Ok(event) = ev_receiver.recv().await {
47
            tracing::debug!("Received parsed event, sending to client");
48
            let server_message = ServerMessage { event };
49
            if let Ok(json) = serde_json::to_string(&server_message) {
50
                if sender
51
                    .send(axum::extract::ws::Message::Text(json))
52
                    .await
53
                    .is_err()
54
                {
55
                    break;
56
                }
57
            }
58
        }
59
    });
60

61
    // Parse messages from the client
62
    let mut recv_task = tokio::spawn(async move {
63
        while let Some(Ok(_message)) = receiver.next().await {
64
            // Process incoming message from client
65
        }
66
    });
67

68
    // If any of the tasks exit, abort the other one
69
    tokio::select! {
70
        _ = (&mut send_task) => recv_task.abort(),
71
        _ = (&mut recv_task) => send_task.abort(),
72
    };
73
}

Here's the full repo with all the functionality in this series if you want a complete example.