In our previous post we created a function that can process a video into a stream, and some routes for serving those static files. The problem with doing this within the API is it can take up a lot of resources. Ideally, we want to offload this processing.
So, what we're going to do in this article is create a queue system that can run our jobs.
We'll create a basic queue with an initial job type of ProcessRawVideo
, representing processing a video into our raw stream.
This queue system will use Postgres to manage jobs, and will take advantage of SKIP LOCKED
when querying for jobs
to add some concurrency.
This is largely inspired by Sylvain Kerkour's post on creating a job queue system in Rust, but slightly modified for our purposes. We're mostly focusing on the high level of moving a video processing job through our pipeline.
creating the queue
Our queue is largely made of three core entities: the queue itself, the jobs it processes, and the runners that execute the job. We want to to be able to push and pull jobs to our queue, and we want to be able run several jobs concurrently.
We're going to use Postgres to store jobs to be processed. And when we query Postgres we'll take advantage of it's locking mechanisms to have some concurrency.
dependencies
We'll mostly be using sqlx for it's simplicity here, but this should be fairly interchangable with whatever library you're using.
Here's our Cargo.toml
dependencies. A lot of these are preference and not really required to make it work. Again, this is
largely an extension of Sylvain Kerkour's tutorial on queues.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db module
This module holds our function for creating a database connection pool. We'll create a connection for our workers to use for updating the processing status and job queue in our database.
|
|
|
|
|
|
|
|
|
|
migrations
We'll want to create two tables, our main one is for managing the jobs, but we also want to update our videos. In our videos table, we want to track where the raw video is located, whether it's been processed, and then standard stuff like an ID and timestamps.
Tip: You can create a top level folder called migrations
and then use the sqlx
cli to run them, with sqlx migrate run
For the queue table, which holds the scheduled jobs, we want to track a timestamp that allows us to schedule it, how many failed attempts (if any) there are, it's current status, and then the message payload. Additionally, the standard ids and timestamps.
We'll also add some indexs on the status
and scheduled_for
columns for faster querying, as most of the time we're going
to be querying for rows that are Queued
.
|
|
|
|
|
|
|
|
|
|
|
|
Next, we need a table for storing videos
. These two migrations don't need to be in any particular order,
or could be combined into one. I just like to isolate my migrations logic.
For our videos, we want to store the raw_file_path
where the raw video was uploaded, a column processed_file_path
for representing where the processed m3u8
playlist is, and a processing_status
which maps to the
current status of processing. Additionally, standard id and timestamp columns.
|
|
|
|
|
|
|
|
|
We can run those with sqlx migrate run
, which will setup the tables in our connected database. Make sure your DATABASE_URL
environment variable is set right.
error module
I like to create a high level error module for my applications to re-use across my workspace. This is optional, but largely used in the examples below.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
queue module
queue trait and job definitions
Now we enter the more meaty parts of this article. But, before we define the queue, which will hold jobs,
we need to define what a Job
is. A job should have a unique identifier and some kind of message with a
payload to run it. For now, we only need a ProcessRawVideo
payload, but we'll have this be a member of an
enum representing all payloads called Message
.
Our specific message for processing a video will accept a video_id
and a path
. The path should be a path to the
raw video. We included our path as a column on our videos
table, but including it here saves us a query. Then
we can use the video_id
to update the row after we're done processing.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Now, we can define our queue. We're going to define a trait that's our base queue,
and then implement that trait into a PostgresQueue
. By having the queue itself be a trait we could
use different configurations, like if we moved to a different backend for managing the queue parts.
Our queue needs to be able to push jobs, pull jobs, fails jobs, delete jobs, and clear the queue. All work together to manage the queue itself.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
postgres queue impl
Next, we're going to implement our new queue trait into a PostgresQueue
, which, as you might guess, is a queue
for Postgres (yeeeeehaw).
First, we'll create our basic queue structs and implement a function for spawning one.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
There's not too much to this. 5
is an preferential number for myself, you can adjust this safely
to whatever you want for your system. Aside from that we just need to make sure we have a pool connection to utilize
Next, we are going to get into the beef 🥩. Admittedly, there's not a lot of fancy stuff happening here.
- For the pull, we need to pull jobs who's status is
Queued
, and set it asRunning
when we return it. It's important to do the update as we pull it so it gets locked appropriately. Finally, we'll useSKIP LOCKED
to skip any locked rows, which is how we achieve our concurrency - For the push, we just need to create a new row who's status is
Queued
- Delete, fail, and clear are pretty self-explanatory
We'll start with delete
, clear
, and fail
since they're pretty simple:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Not much too those, now we can implement our push and pull methods to bring it all together.
First, we'll do the push
method. This method should:
- Create a unique ID for the job
- Calculate the scheduled date
- Wrap the message payload in JSON
- Insert all the above into a new row in the
queue
table
Here's how that looks:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
You're going to find a pattern to this all: It's actually not too bad complexity-wise. The design of this implementation mainly relies on Postgres to do the complicated locking and unlocking of jobs. Which is great, because that let's us largely focus on our application!
Lastly, but not least, the pull
method. This uses a maximum limit of 100 jobs. This is an artifact of the original
implementation, but I think it's a solid number to cap at. I imagine you can increase this limit, you'd
be surprised what you can do with one database.
What pull
needs to do is, based on the number of jobs provided (with a max of 100) is
query the database and update each returned item's status to Running
. We'll also have it skip locked rows, so we can
run multiple queues if we wanted (concurrency). We'll also want to limit the jobs to not pulled any failed ones over our
maximum retry rate. Here's how it all looks put together:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Great, our queue is made. All that's left is to create a function for running our video processor, and then wrap it all together.
runner module
This module defines our functions for running our jobs. We'll define one function that will spawn number_of_jobs
and then concurrently pass each one
into a handler that will do the actual processing.
First, our runner that will pull our jobs in and pass them to handlers concurrently
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
And then, a slightly updated version of our ffmpeg function from the last post. Mostly just wrapped in the Job itself.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
And that's our entire library for the queue! We can now create our queues! Yeehaw
Finalizing
Finally, we run our queue. So, the way this works in a manner similar to a pub/sub model is that we can have many producers (push to the queue) and a single consumer (pull from the queue). That doesn't mean we can run many queues that pull, it just means one job should only ever go to one of those queue instances, but many producers can push new jobs to the queue.
Now, there's a lot of ways you can have these queues communicate. But the most straight-forward way is to run one somewhere for pushing events, and one run somewhere for pulling events. For example, I create a queue instance in my API and allow it to push items to the queue, then I have a separate binary running the queue as a worker and running the jobs.
Here's a simple binary that will run our jobs. It spawns an async task for running the queue worker. This is what would act as a consumer to our queue.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
And for the producer, it's even simpler. Just spawn a queue (make sure it's pointed at the same database) and push
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
And that's a wrap! We now have a queue that we can push jobs too, backed by postgres, and a way to run those jobs on a
separate process. Next up we'll create a way to LISTEN
to updates on the status of a video processing job so we can update
our clients when it's done.