Logo for Sneaky Crow, LLC (vector art of a crow with a green baseball cap on)

Brain Juice

In our previous post we created a function that can process a video into a stream, and some routes for serving those static files. The problem with doing this within the API is it can take up a lot of resources. Ideally, we want to offload this processing.

So, what we're going to do in this article is create a queue system that can run our jobs. We'll create a basic queue with an initial job type of ProcessRawVideo, representing processing a video into our raw stream. This queue system will use Postgres to manage jobs, and will take advantage of SKIP LOCKED when querying for jobs to add some concurrency.

This is largely inspired by Sylvain Kerkour's post on creating a job queue system in Rust, but slightly modified for our purposes. We're mostly focusing on the high level of moving a video processing job through our pipeline.

creating the queue

Our queue is largely made of three core entities: the queue itself, the jobs it processes, and the runners that execute the job. We want to to be able to push and pull jobs to our queue, and we want to be able run several jobs concurrently.

We're going to use Postgres to store jobs to be processed. And when we query Postgres we'll take advantage of it's locking mechanisms to have some concurrency.

dependencies

We'll mostly be using sqlx for it's simplicity here, but this should be fairly interchangable with whatever library you're using.

Here's our Cargo.toml dependencies. A lot of these are preference and not really required to make it work. Again, this is largely an extension of Sylvain Kerkour's tutorial on queues.

1
tokio = { version = "1", features = ["full"] }
2
sqlx = { version = "0.7", features = [
3
    "runtime-tokio-rustls",
4
    "postgres",
5
    "chrono",
6
    "uuid",
7
    "json",
8
] }
9
chrono = "0.4"
10
uuid = { version = "1", features = ["serde", "v4"] }
11
async-trait = "0.1"
12
serde = { version = "1", features = ["derive"] }
13
thiserror = "1"
14
anyhow = "1"
15
ulid = { version = "1", features = ["uuid"] }
16
futures = "0.3"
17
tracing = "0.1"
18
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

db module

This module holds our function for creating a database connection pool. We'll create a connection for our workers to use for updating the processing status and job queue in our database.

1
use sqlx::postgres::PgPool;
2

3
/// Function to establish a connection to the PostgreSQL database
4
pub async fn connect_to_database() -> Result<PgPool, sqlx::Error> {
5
    let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
6
    tracing::debug!("Creating DB connection Pool");
7
    let pool = PgPool::connect(&database_url).await?;
8

9
    Ok(pool)
10
}

migrations

We'll want to create two tables, our main one is for managing the jobs, but we also want to update our videos. In our videos table, we want to track where the raw video is located, whether it's been processed, and then standard stuff like an ID and timestamps.

Tip: You can create a top level folder called migrations and then use the sqlx cli to run them, with sqlx migrate run

For the queue table, which holds the scheduled jobs, we want to track a timestamp that allows us to schedule it, how many failed attempts (if any) there are, it's current status, and then the message payload. Additionally, the standard ids and timestamps.

We'll also add some indexs on the status and scheduled_for columns for faster querying, as most of the time we're going to be querying for rows that are Queued.

1
CREATE TABLE queue (
2
  id UUID PRIMARY KEY,
3
  created_at TIMESTAMP WITH TIME ZONE NOT NULL,
4
  updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
5

6
  scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL,
7
  failed_attempts INT NOT NULL,
8
  status INT NOT NULL,
9
  message JSONB NOT NULL
10
);
11
CREATE INDEX index_queue_on_scheduled_for ON queue (scheduled_for);
12
CREATE INDEX index_queue_on_status ON queue (status);

Next, we need a table for storing videos. These two migrations don't need to be in any particular order, or could be combined into one. I just like to isolate my migrations logic.

For our videos, we want to store the raw_file_path where the raw video was uploaded, a column processed_file_path for representing where the processed m3u8 playlist is, and a processing_statuswhich maps to the current status of processing. Additionally, standard id and timestamp columns.

1
CREATE TABLE videos (
2
    id TEXT PRIMARY KEY,
3
    created_at TIMESTAMP WITH TIME ZONE NOT NULL,
4
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
5

6
    raw_file_path TEXT NOT NULL,
7
    processed_file_path TEXT,
8
    processing_status TEXT NOT NULL DEFAULT 'pending'
9
);

We can run those with sqlx migrate run, which will setup the tables in our connected database. Make sure your DATABASE_URL environment variable is set right.

error module

I like to create a high level error module for my applications to re-use across my workspace. This is optional, but largely used in the examples below.

1
/// Various errors that our queue can have
2
#[derive(thiserror::Error, Debug, Clone)]
3
pub enum Error {
4
    #[error("Bad config: {0}")]
5
    BadConfig(String),
6
    #[error("Connecting to database: {0}")]
7
    ConnectingToDatabase(String),
8
    #[error("Internal error: {0}")]
9
    Internal(String),
10
    #[error("Not found: {0}")]
11
    NotFound(String),
12
    #[error("Migrating database: {0}")]
13
    DatabaseMigration(String),
14
    #[error("Invalid job message: {0}")]
15
    InvalidJobMessage(String),
16
    #[error("Video Processing: {0}")]
17
    VideoProcessingError(String),
18
}
19
/// Convertion of queue from an sqlx error to this error
20
impl std::convert::From<sqlx::Error> for Error {
21
    fn from(err: sqlx::Error) -> Self {
22
        match err {
23
            sqlx::Error::RowNotFound => Error::NotFound("row not found".into()),
24
            _ => Error::Internal(err.to_string()),
25
        }
26
    }
27
}

queue module

queue trait and job definitions

Now we enter the more meaty parts of this article. But, before we define the queue, which will hold jobs, we need to define what a Job is. A job should have a unique identifier and some kind of message with a payload to run it. For now, we only need a ProcessRawVideo payload, but we'll have this be a member of an enum representing all payloads called Message.

Our specific message for processing a video will accept a video_id and a path. The path should be a path to the raw video. We included our path as a column on our videos table, but including it here saves us a query. Then we can use the video_id to update the row after we're done processing.

1
/// The job to be processed, containing the message payload
2
#[derive(Debug, Clone, Serialize, Deserialize)]
3
pub struct Job {
4
    pub id: Uuid,
5
    pub message: Message,
6
}
7

8
/// The payload of the job, containing the different jobs and their required data
9
#[derive(Debug, Clone, Serialize, Deserialize)]
10
pub enum Message {
11
    /// Our primary processing job
12
    /// Takes in a raw video (mp4) and converts it into an HLS stream
13
    // NOTE: We intentionally have the user provide the path to save us a db query to get it, but we
14
    // need a db query after processing to update the job status
15
    ProcessRawVideo { path: String, video_id: String },
16
}

Now, we can define our queue. We're going to define a trait that's our base queue, and then implement that trait into a PostgresQueue. By having the queue itself be a trait we could use different configurations, like if we moved to a different backend for managing the queue parts.

Our queue needs to be able to push jobs, pull jobs, fails jobs, delete jobs, and clear the queue. All work together to manage the queue itself.

1
#[async_trait::async_trait]
2
pub trait Queue: Send + Sync + Debug {
3
    /// pushes a job to the queue
4
    async fn push(
5
        &self,
6
        job: Message,
7
        scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
8
    ) -> Result<(), Error>;
9
    /// pull fetches at most `number_of_jobs` from the queue.
10
    async fn pull(&self, number_of_jobs: i32) -> Result<Vec<Job>, Error>;
11
    /// deletes a job from the queue
12
    async fn delete_job(&self, job_id: Uuid) -> Result<(), Error>;
13
    /// fails a job in the queue
14
    async fn fail_job(&self, job_id: Uuid) -> Result<(), Error>;
15
    /// clears the queue
16
    async fn clear(&self) -> Result<(), Error>;
17
}

postgres queue impl

Next, we're going to implement our new queue trait into a PostgresQueue, which, as you might guess, is a queue for Postgres (yeeeeehaw).

First, we'll create our basic queue structs and implement a function for spawning one.

1
/// A queue that uses Postgres as it's backend
2
#[derive(Debug, Clone)]
3
pub struct PostgresQueue {
4
    db: PgPool,
5
    max_attempts: u32,
6
}
7
/// Implementation of Postgres-based queue
8
impl PostgresQueue {
9
    pub fn new(db: PgPool) -> PostgresQueue {
10
        let queue = PostgresQueue {
11
            db,
12
            max_attempts: 5,
13
        };
14

15
        queue
16
    }
17
}

There's not too much to this. 5 is an preferential number for myself, you can adjust this safely to whatever you want for your system. Aside from that we just need to make sure we have a pool connection to utilize

Next, we are going to get into the beef 🥩. Admittedly, there's not a lot of fancy stuff happening here.

  • For the pull, we need to pull jobs who's status is Queued, and set it as Running when we return it. It's important to do the update as we pull it so it gets locked appropriately. Finally, we'll use SKIP LOCKED to skip any locked rows, which is how we achieve our concurrency
  • For the push, we just need to create a new row who's status is Queued
  • Delete, fail, and clear are pretty self-explanatory

We'll start with delete, clear, and fail since they're pretty simple:

1
#[async_trait::async_trait]
2
impl Queue for PostgresQueue {
3
    /// Delete an item from the queue based on it's job ID
4
    async fn delete_job(&self, job_id: Uuid) -> Result<(), Error> {
5
        let query = "DELETE FROM queue WHERE id = $1";
6

7
        sqlx::query(query).bind(job_id).execute(&self.db).await?;
8
        Ok(())
9
    }
10
    /// Fail the job based on it's ID, increments the failed attempts by 1
11
    async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> {
12
        let now = chrono::Utc::now();
13
        let query = "UPDATE queue
14
            SET status = $1, updated_at = $2, failed_attempts = failed_attempts + 1
15
            WHERE id = $3";
16

17
        sqlx::query(query)
18
            .bind(PostgresJobStatus::Queued)
19
            .bind(now)
20
            .bind(job_id)
21
            .execute(&self.db)
22
            .await?;
23
        Ok(())
24
    }
25
    /// Clear the entire queue of all jobs (delete all rows in the table)
26
    async fn clear(&self) -> Result<(), Error> {
27
        let query = "DELETE FROM queue";
28

29
        sqlx::query(query).execute(&self.db).await?;
30
        Ok(())
31
    }
32
}

Not much too those, now we can implement our push and pull methods to bring it all together.

First, we'll do the push method. This method should:

  • Create a unique ID for the job
  • Calculate the scheduled date
  • Wrap the message payload in JSON
  • Insert all the above into a new row in the queue table

Here's how that looks:

1
// ...omitted for brevity
2
/// Push a new job with the included message payload to the queue storage
3
async fn push(
4
    &self,
5
    job: Message,
6
    date: Option<chrono::DateTime<chrono::Utc>>,
7
) -> Result<(), Error> {
8
    let scheduled_for = date.unwrap_or(chrono::Utc::now());
9
    let failed_attempts: i32 = 0;
10
    let message = Json(job);
11
    let status = PostgresJobStatus::Queued;
12
    let now = chrono::Utc::now();
13
    let job_id: Uuid = Ulid::new().into();
14
    let query = "INSERT INTO queue
15
        (id, created_at, updated_at, scheduled_for, failed_attempts, status, message)
16
        VALUES ($1, $2, $3, $4, $5, $6, $7)";
17

18
    sqlx::query(query)
19
        .bind(job_id)
20
        .bind(now)
21
        .bind(now)
22
        .bind(scheduled_for)
23
        .bind(failed_attempts)
24
        .bind(status)
25
        .bind(message)
26
        .execute(&self.db)
27
        .await?;
28
    Ok(())
29
}
30
// ... omitted for brevity

You're going to find a pattern to this all: It's actually not too bad complexity-wise. The design of this implementation mainly relies on Postgres to do the complicated locking and unlocking of jobs. Which is great, because that let's us largely focus on our application!

Lastly, but not least, the pull method. This uses a maximum limit of 100 jobs. This is an artifact of the original implementation, but I think it's a solid number to cap at. I imagine you can increase this limit, you'd be surprised what you can do with one database.

What pull needs to do is, based on the number of jobs provided (with a max of 100) is query the database and update each returned item's status to Running. We'll also have it skip locked rows, so we can run multiple queues if we wanted (concurrency). We'll also want to limit the jobs to not pulled any failed ones over our maximum retry rate. Here's how it all looks put together:

1
// ... omitted for brevity
2
/// Pulls <number_of_jobs> from the queue (maximum 100)
3
/// This updates all jobs pulled into a `Running` status
4
async fn pull(&self, number_of_jobs: i32) -> Result<Vec<Job>, Error> {
5
    let number_of_jobs = if number_of_jobs > 100 {
6
        100
7
    } else {
8
        number_of_jobs
9
    };
10
    let now = chrono::Utc::now();
11
    let query = "UPDATE queue
12
        SET status = $1, updated_at = $2
13
        WHERE id IN (
14
            SELECT id
15
            FROM queue
16
            WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5
17
            ORDER BY scheduled_for
18
            FOR UPDATE SKIP LOCKED
19
            LIMIT $6
20
        )
21
        RETURNING *";
22

23
    let jobs: Vec<PostgresJob> = sqlx::query_as::<_, PostgresJob>(query)
24
        .bind(PostgresJobStatus::Running)
25
        .bind(now)
26
        .bind(PostgresJobStatus::Queued)
27
        .bind(now)
28
        .bind(self.max_attempts as i32)
29
        .bind(number_of_jobs)
30
        .fetch_all(&self.db)
31
        .await?;
32
    Ok(jobs.into_iter().map(Into::into).collect())
33
}
34
// ... omitted for brevity

Great, our queue is made. All that's left is to create a function for running our video processor, and then wrap it all together.

runner module

This module defines our functions for running our jobs. We'll define one function that will spawn number_of_jobs and then concurrently pass each one into a handler that will do the actual processing.

First, our runner that will pull our jobs in and pass them to handlers concurrently

1
/// Runs a loop that pulls jobs from the queue and runs <concurrency> jobs each loop
2
pub async fn run_worker(queue: Arc<dyn Queue>, concurrency: usize, db_conn: &Pool<Postgres>) {
3
    loop {
4
        // Pulls jobs from the queue
5
        let jobs = match queue.pull(concurrency as i32).await {
6
            Ok(jobs) => jobs,
7
            Err(err) => {
8
                // Trace the error
9
                tracing::error!("runner: error pulling jobs {}", err);
10
                // Go to sleep and try again
11
                tokio::time::sleep(Duration::from_millis(500)).await;
12
                Vec::new()
13
            }
14
        };
15
        // Just for debugging the amount of jobs a queue has pulled in
16
        let number_of_jobs = jobs.len();
17
        if number_of_jobs > 0 {
18
            tracing::debug!("Fetched {} jobs", number_of_jobs);
19
        }
20
        // Run each jobs concurrently
21
        stream::iter(jobs)
22
            .for_each_concurrent(concurrency, |job| async {
23
                tracing::debug!("Starting job {}", job.id);
24
                let job_id = job.id;
25

26
                let res = match handle_job(job, db_conn).await {
27
                    Ok(_) => queue.delete_job(job_id).await,
28
                    Err(err) => {
29
                        println!("run_worker: handling job({}): {}", job_id, &err);
30
                        queue.fail_job(job_id).await
31
                    }
32
                };
33

34
                match res {
35
                    Ok(_) => {}
36
                    Err(err) => {
37
                        println!("run_worker: deleting / failing job: {}", &err);
38
                    }
39
                }
40
            })
41
            .await;
42
        // Take a break for a bit, we don't need to run every moment (our jobs are unlikely to complete that quickly)
43
        tokio::time::sleep(Duration::from_millis(125)).await;
44
    }
45
}

And then, a slightly updated version of our ffmpeg function from the last post. Mostly just wrapped in the Job itself.

1
/// Individually processes a single job, based on its Job message type
2
async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
3
    match job.message {
4
        // TODO: If you want to do other kinds of processing, you can define their jobs here
5
        // Process Raw Videos into HLS streams
6
        message @ Message::ProcessRawVideo { .. } => {
7
            tracing::debug!("Processing raw video: {:?}", &message);
8
            // Get the required data to parse the video
9
            let (input_path, video_id) = match &message {
10
                Message::ProcessRawVideo { path, video_id } => (path, video_id),
11
            };
12
            tracing::debug!(
13
                "Processing video: input_path={}, video_id={}",
14
                input_path,
15
                video_id
16
            );
17
            // Create our HLS stream from the mp4
18
            let output_path = format!("{}.m3u8", input_path.trim_end_matches(".mp4"));
19
            let output = std::process::Command::new("ffmpeg")
20
                .args(&[
21
                    "-i",
22
                    input_path,
23
                    "-c:v",
24
                    "libx264",
25
                    "-c:a",
26
                    "aac",
27
                    "-f",
28
                    "hls",
29
                    "-hls_time",
30
                    "10",
31
                    "-hls_list_size",
32
                    "0",
33
                    &output_path,
34
                ])
35
                .output()
36
                .map_err(|e| Error::VideoProcessingError(e.to_string()))?;
37

38
            if !output.status.success() {
39
                tracing::error!("Error processing video into hls");
40
                let error = String::from_utf8_lossy(&output.stderr);
41
                return Err(Error::VideoProcessingError(error.to_string()));
42
            }
43
            // Update the video ID status
44
            sqlx::query("UPDATE videos SET processing_status = 'processed' WHERE id = $1")
45
                .bind(&video_id)
46
                .execute(db)
47
                .await
48
                .map_err(|e| Error::VideoProcessingError(e.to_string()))?;
49
            tracing::debug!("Successfully processed video {}", &video_id);
50
        }
51
    };
52

53
    Ok(())
54
}

And that's our entire library for the queue! We can now create our queues! Yeehaw

Finalizing

Finally, we run our queue. So, the way this works in a manner similar to a pub/sub model is that we can have many producers (push to the queue) and a single consumer (pull from the queue). That doesn't mean we can run many queues that pull, it just means one job should only ever go to one of those queue instances, but many producers can push new jobs to the queue.

Now, there's a lot of ways you can have these queues communicate. But the most straight-forward way is to run one somewhere for pushing events, and one run somewhere for pulling events. For example, I create a queue instance in my API and allow it to push items to the queue, then I have a separate binary running the queue as a worker and running the jobs.

Here's a simple binary that will run our jobs. It spawns an async task for running the queue worker. This is what would act as a consumer to our queue.

1
const CONCURRENCY: usize = 5;
2

3
#[tokio::main]
4
async fn main() -> Result<(), anyhow::Error> {
5
    // Start the tracer
6
    tracing_subscriber::registry()
7
        .with(
8
            tracing_subscriber::EnvFilter::try_from_default_env()
9
                .unwrap_or_else(|_| "event_processor=debug,db=debug".into()),
10
        )
11
        .with(tracing_subscriber::fmt::layer())
12
        .init();
13
    // Create a database connection
14
    let db = db::connect_to_database().await?;
15
    // Initialize a queue
16
    tracing::debug!("Initializing queue");
17
    let queue = Arc::new(PostgresQueue::new(db.clone()));
18
    // Pass our queue (shared ref) to our runner
19
    let worker_queue = queue.clone();
20
    tokio::spawn(async move { run_worker(worker_queue, CONCURRENCY, &db).await });
21
    Ok(())
22
}

And for the producer, it's even simpler. Just spawn a queue (make sure it's pointed at the same database) and push

1
const CONCURRENCY: usize = 5;
2

3
#[tokio::main]
4
async fn main() -> Result<(), anyhow::Error> {
5
    // Start the tracer
6
    tracing_subscriber::registry()
7
        .with(
8
            tracing_subscriber::EnvFilter::try_from_default_env()
9
                .unwrap_or_else(|_| "event_processor=debug,db=debug".into()),
10
        )
11
        .with(tracing_subscriber::fmt::layer())
12
        .init();
13
    // Create a database connection
14
    let db = db::connect_to_database().await?;
15
    // Initialize a queue
16
    tracing::debug!("Initializing queue");
17
    let queue = PostgresQueue::new(db.clone());
18
    // Spawn a message and push it to our queue
19
    let path = "some/path/to/an/file.mp4".to_string();
20
    let video_id = "some_id_we_already_created".to_string();
21
    let job = Message::ProcessRawVideo { path, video_id };
22
    queue
23
        .push(job, None) /// If you want to schedule your job for later replace `None`
24
        .await
25
        .expect("Could not push job to queue");
26
    Ok(())
27
}

And that's a wrap! We now have a queue that we can push jobs too, backed by postgres, and a way to run those jobs on a separate process. Next up we'll create a way to LISTEN to updates on the status of a video processing job so we can update our clients when it's done.