January 21st, 2023

Thread safe queue in Rust 🦀

A simple FIFO queue implemented in Rust with thread safety

I started my rust journey ~1 year ago and I feel like it helped me a lot to become a better developer, even if I primarily work with React and Typescript.

Today I'd like to share with you what I learned about thread safety in rust, by implementing a simple data structure such as a FIFO queue.

Let's get started by declaring a trait to describe our queues:

trait Queue<T> {
	/// Create a new, empty queue
	fn new() -> Self;
	
	/// Enqueue a new value
	fn push(&self, value: T);
	
	/// Dequeue a value 
	/// Returns None if the queue is empty
	fn pop(&self) -> Option<T>;
	
	/// Returns the number of elements enqueued
	fn len(&self) -> usize;
	
	/// Checks if the `size()` is 0
	fn is_empty(&self) -> bool;
}

This trait can then be implemented by a struct or data type to create a queue. The actual implementation of the methods will depend on the type of queue that you want to create.

Implementation

Ok, now that we have our generic trait we can start writing some code for our first queue, the FIFO (First In, First Out)

///! queue/mod.rs
use std::collections::VecDeque;
 
/// A FIFO queue implemented using a VecDeque
struct FifoQueue<T> {
    /// The underlying data structure of the queue
	data: VecDeque<T>
}

It's a simple generic struct with a data parameter holding the enqueued values. We use VecDeque over Vec for some convenience methods that we'll see later on.

Now let's implement our Queue<T> trait for FifoQueue<T>

///! queue/prelude.rs
impl<T> Queue<T> for FifoQueue<T> {
	fn new() -> Self {
		Self {
			data: VecDeque::new()
		}
	}
 
	/// Adds an element to the back of the queue
	fn push(&self, value: T) {
		self.data.push_back(value)
	}
 
	/// Removes an element from the front of the queue
	fn pop(&self) -> Option<T> {
		self.data.pop_front()
	}
	
	fn len(&self) -> usize {
		self.data.len()
	}
 
	fn is_empty(&self) -> bool {
		self.data.is_empty()
	}
}

The reason why we choose to use VecDeque over a simple Vec is the ability to use the pop_frop / push_back and later on pop_back / push_front methods

Thread Safe? How?

Our queue now is completed but is not thread-safe, so it is subject to race conditions and errors if accessed from multiple threads at the same time.

It's important to note that making a queue thread-safe can have an impact on performance, as synchronization mechanisms can slow down access to the queue. Therefore, it's important to evaluate whether a thread-safe queue is really necessary based on the needs of the program.

To make our queue struct thread safe we just have to wrap the data property holding our values with a Mutex (std::sync::Mutex and CondVar) and check for its status on every operation.

- data: VecDeque<T>
+ data: Mutex<VecDeque<T>>
+ cv: CondVar

The Mutex alone is not enough to be safe. The CondVar will allow us to wait for the queue to be populated before popping an element. In this way we can run 100 pushes and pops in parallel and have as a final result an empty queue.

First of all, let's update the pop method to return the element (T) itself and not the Option

- fn pop(&self) -> Option<T>;
+ fn pop(&self) -> T;

We update the method in the trait queue/prelude.rs to return always the element since now with CondVar we wait for the queue to be populated before popping an element.

///! queue/mod.rs
use std::{ collections::VecDeque, sync::{Mutex, CondVar} };
 
/// includes common traits
pub mod prelude;
 
// import our Queue<T> trait
use self::prelude::*; 
 
struct FifoQueue<T> {
	data: Mutex<VecDeque<T>>,
	/// The condvar used to signal when the queue is not empty
	cv: CondVar
}
 
impl<T> Queue<T> for FifoQueue<T> {
	fn new() -> Self {
		Self {
			data: Mutex::new(VecDeque::new()),
			cv: CondVar::new()
		}
	}
 
	/// push input on the back of the queue
	/// - unrecoverable if lock fails so just unwrap
	fn push(&self, value: T) {
		let mut data = self.data.lock().unwrap();
		data.push_back(value)
		
		// notify the thread that the queue has been populated
		self.cv.notify_one();
	}
 
	/// pop element from the queue
	/// - unrecoverable if the lock fails so just unwrap
	/// - same for condvar variable
	fn pop(&self) -> T {
		let mut data = self.data.lock().unwrap();
 
		// wait for the notification if the queue is empty
		while data.is_empty() {
			data = self.cv.wait(data).unwrap();
		}
 
		data.pop_front().unwrap()
	}
 
	fn len(&self) -> usize {
		let data = self.data.lock().unwrap();
		data.len()
	}
 
	fn is_empty(&self) -> bool {
		let data = self.data.lock().unwrap();
		data.is_empty()
	}
}

Testing

In the following test, we create a new queue using the Arc (atomic reference count) of Rust, which allows us to share the queue instance safely between threads. Once the threads are completed, the tests check if the queue contains the inserted elements by all the threads.

///! queue/mod.rs
use std::{ thread, sync::Arc, time::Duration };
 
#[cfg(test)]
mod test {
    use crate::queue::prelude::*;
    use crate::queue::FifoQueue;
    use std::{sync::Arc, thread};
 
    #[test]
    fn test_queue_thread_safety() {
        // create a queue of numbers
        let queue = Arc::new(FifoQueue::<i32>::new());
 
        let q1 = queue.clone();
        let t1 = thread::spawn(move || {
            q1.push(1);
            q1.push(2);
        });
 
        let q2 = queue.clone();
        let t2 = thread::spawn(move || {
            q2.push(3);
            q2.push(4)
        });
 
        t1.join().unwrap();
        t2.join().unwrap();
 
        assert_eq!(queue.len(), 4);
    }
}
 
 

This is just an example of how we can test the queue. As always this code can be tested in multiple ways, you could add some tests to check concurrent pushes and pops or test mixed operations.

Hint: you can also add some delays using thread::sleep() to fake some overload.

TLDR

Thread-safe queues are data structures that allow multiple threads to safely add and remove elements from a queue without the risk of data corruption or race conditions. In Rust, thread-safe queues can be implemented using a variety of techniques, including using a mutex to synchronize access to the queue, using an atomic reference counter to track shared ownership of the queue, or using a lock-free queue data structure. It is important to carefully consider the trade-offs of each approach and choose the one that is most appropriate for the specific needs of the application.

You can find example code at rawnly/queue-rs


Originally posted here