It’s useful to reuse objects that are expensive to create.

Doing so can increase the efficiency of your programs & make it easier to reason about them.


A resource pool is a construct that allows multiple clients to share the same objects.

I recently needed to pool UdpSockets when following dnsguide.

The pool allowed me to reuse sockets that were already bound to a port & share them concurrently.

The Recipe

A resource pool needs three mechanisms:

  1. Concurrency control
  2. Object storage
  3. Object reuse

Concurrency Control

The pool needs to give exclusive access to the objects it manages; an object should have only one owner at a time.

Object Storage

The pool needs to store objects somewhere so that it knows how to retrieve them when needed.

Object Reuse

The pool needs to recycle objects that are no longer being used.

RAII Guards are a great way to implement object reuse. Std’s Mutex & MutexGuard are exemplary for this pattern.

Alternatively, a library designer may allow their clients to manually add & remove entries from the pool. This is error-prone.

Implementation

This section will describe the implementation of tub, an async pool crate.

Pool is a construct that allows clients to retrieve an exclusive reference to an object from the pool.

Clients call Pool::acquire to retrieve a future that returns a Guard. The future will resolve when an object is available.

The Guard is a construct that allows object access and automatic recycling + reuse. It uses RAII to ensure that the object is returned to the pool when the guard is dropped.

The Pool is built on top of tokio::sync::Notify and crossbeam_queue::ArrayQueue. These dependencies provide concurrency control and object storage.

Structs

pub struct Pool<T> {
    inner: Arc<PoolInner<T>>,
}

struct PoolInner<T> {
    queue: ArrayQueue<T>,
    notify: Notify,
}

pub struct Guard<T> {
    value: Option<T>,
    inner: Arc<PoolInner<T>>,
}

PoolInner is used to hold the queue and notify. We wrap PoolInner in an Arc so that we can clone it and share it between the Pool and Guard.

Impl

You can refer to tub/src/lib.rs for the full implementation.

Pool

impl<T> Pool<T> {
    #[inline]
    pub async fn acquire(&self) -> Guard<T> {
        let inner = self. inner.clone();
        loop {
            if let Some(value) = inner.queue.pop() {
                return Guard {
                    value: Some(value),
                    inner,
                };
            }

            inner.notify.notified().await;
        }
    }
}

This block of code returns a future that resolves into a Guard when a shared object is available.

It first checks if there are any available objects. If there are, it returns a Guard with the object. Otherwise, it waits for a notification that an object is available.

ArrayQueue::pop is used to retrieve an object from the queue. This queue is safe in the presence of concurrent access & does not block the current thread.

Notify::notified is used to wake the task up when an object is available.

The method loops until an object is acquired (it acts similar to a compare-and-swap loop that you see frequently in concurrent data structures).

Guard

impl<T> Drop for Guard<T> {
    #[inline]
    fn drop(&mut self) {
        if let Some(value) = self.value.take() {
            // Safety: The queue will never be full when a Guard is alive
            let _ = self.inner.queue.push(value);
            self.inner.notify.notify_one();
        }
    }
}

impl<T> Deref for Guard<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        // Safety: The value is always Some
        self.value.as_ref().unwrap()
    }
}

impl<T> DerefMut for Guard<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        // Safety: The value is always Some
        self.value.as_mut().unwrap()
    }
}

The Guard allows clients to access the object and automatically return it to the pool when it is dropped. The value is wrapped in an Option so that it can be taken out of the Guard when it is dropped.

Object access is achieved by implementing Deref & DerefMut for Guard. The implementation simply calls as_ref & as_mut on the Option and unwraps the result. This is safe because the Option is always Some when the Guard is alive.

Object reclamation is achieved by implementing Drop for Guard. When the Guard is dropped, it will return the object to the pool by calling ArrayQueue::push and Notify::notify_one.

It is safe to call ArrayQueue::push because the queue will never be full when a Guard is alive.

Notify::notify_one is used instead of Notify::notify_all because only one client will be able to “win” the race for the recycled object.

Using Notify::notify_all would result in all clients being woken up, even if only one of them can actually acquire the object. This causes contention, which results in wasted CPU cycles and higher latency.

Benchmarking

It’s time for a showdown!

We will put a few random async pool crates from crates.io to the test.

The crates

The following crates are analyzed:

The following crates were omitted due to issues when benchmarking:

  • deadpool - appears to deadlock using the unmanaged pool. I may have misused the crate.
  • qp - appears to deadlock
  • lease - heap corruption detected
  • async-resource - ran out of resources in comparison benchmark

Methodology

The code used for benchmarking is publicly available.

By following Criterion’s Async Benchmarking Guide, I was able to create a benchmark powered by the Tokio runtime.

The tests exercise resource acquisition and release by spawning a number of tasks that acquire a resource from the pool and then release it.

All benchmarks were written in a similar way, namely:

  1. Create a pool with 1 object
  2. Spawn iters tasks that each acquire a resource from the pool
  3. Wait for all tasks to complete

The following code snippet shows the benchmarking method for tub:

async fn run_tub(pool: tub::Pool<u32>, iters: usize) {
    let pool = Arc::new(pool);
    join_all(
        (0..iters)
            .map(|_| {
                let pool = pool.clone();
                tokio::spawn(async move {
                    // Acquire a resource from the pool
                    let guard = pool.acquire().await;
                    
                    // Return the resource to the pool
                    drop(guard)
                })
            })
            .collect::<Vec<_>>(),
    )
    .await;
}

This method is run multiple times during the benchmarks. For more information on the benchmarking suite, see Criterion.

Results

The benchmarks help us understand how efficient the underlying mechanisms for concurrency control, object storage, and object reuse are.

Here is a benchmark between tub, simple-pool, and async-object-pool.

all

It seems that async-object-pool is orders of magnitude slower, so we take a close look at tub and simple-pool.

two

100,000 acquires & releases

In the following sections I show benchmarks for running 100,000 acquire & release operations across tasks.

tub

Benchmarks for tub:

tub-pdf

async-object-pool

Benchmarks for async-object-pool:

async-object-pool-pdf

simple-pool

Benchmarks for simple-pool:

simple-pool-pdf

async-resource

Benchmarks for async-resource:

async-resource-pdf

Conclusion

In this post we have seen how to create an async pool by building the necessary mechanisms for concurrency control, object storage, and object reuse.

The benchmarks show that tub performs well relative to similar crates.

And just like that another no name async-pool crate is born!

Addendum

It may be possible that I’ve used the API for these crates incorrectly, causing some performance degradation (I have this suspicion for async-object-pool).

If I did, feel free to raise a pull request addressing the benchmarks on https://github.com/wcygan/tub/.