Joshua Cooper I'm a full-stack web developer with a passion for well-written and efficient code. On the frontend, I use Typescript with React and on the backend I use Rust or Node.js with Typescript.

Asynchronous I/O and async/await packages in Rust

7 min read 1969

Asynchronous I/O and async/await Packages in Rust

Rust’s async story has been a work in progress for a while now. But since the stabilization of the async/await syntax late last year, things have really started coming together.

Async works differently in Rust than in other languages, such as JavaScript. One major difference is that async in Rust requires an executor to work. That isn’t even available in the standard library.

In this tutorial, we’ll look at three Rust async packages, evaluate their production-readiness, and demonstrate how to build fast, reliable, and highly concurrent applications.

  1. Tokio
  2. async-std
  3. futures

How does async work in Rust?

At its core, async/await in Rust is built on top of Futures. If you’re coming from JavaScript, you can think of futures like promises: they’re values that haven’t finished computing yet. Unlike promises, however, futures won’t make any progress until they are explicitly polled. This is where executors come in. An executor is a runtime that manages futures for you by polling them when they’re ready to make progress. Like I mentioned earlier, the standard library doesn’t include one. So to even get started with async Rust we need to use an external executor crate.

You might think this adds a layer of unnecessary complexity, but it allows us to have more control and use an executor that is tuned to our application’s needs rather than being forced to use a single solution like with most other languages.

Async is used as an alternative to OS threads for applications that are primarily IO-bound. Unlike OS threads, spawning a future is cheap and you can have millions of them running concurrently.

Let’s look at some of the top async crates for Rust.

1. Tokio

Tokio is the most popular crate for dealing with async Rust. In addition to an executor, Tokio provides async versions of many standard library types.

Much of the functionality in this crate is behind optional features that you’ll need to enable. This helps to keep compile time and binary size down when the features aren’t needed. For the sake of demonstration, we’ll use the "full" feature to enable them all.

[dependencies]
tokio = { version = "0.3", features = ["full"] }

Now you can spawn a Tokio runtime and give it a future to run.

We made a custom demo for .
No really. Click here to check it out.

fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        println!("Hello world");
    })
}

Here our application code could go inside the async block in the block_on method. This boilerplate can be replaced with a macro that lets you use an async main function instead.

Here’s how you would normally bootstrap the Tokio runtime.

#[tokio::main]
async fn main() {
    println!("Hello world");
}

Since this is now an async function, we should try using await too. To keep things simple, let’s use sleep to return a future to use await on.

use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let (v1, v2, v3) = tokio::join!(
        async {
            sleep(Duration::from_millis(1500)).await;
            println!("Value 1 ready");
            "Value 1"
        },
        async {
            sleep(Duration::from_millis(2800)).await;
            println!("Value 2 ready");
            "Value 2"
        },
        async {
            sleep(Duration::from_millis(600)).await;
            println!("Value 3 ready");
            "Value 3"
        },
    );

    assert_eq!(v1, "Value 1");
    assert_eq!(v2, "Value 2");
    assert_eq!(v3, "Value 3");
}

Here, we used tokio::join! to run multiple futures concurrently. It will wait for all of the futures to complete and return the result of each in a tuple. In this case, the third future will complete first, followed by the first and, finally, the second.

Blocking code

Using sleep also affords us a good opportunity to look at what happens when you block in an async context without using await.

Change the sleep in the second future to this:

std::thread::sleep(Duration::from_millis(2800));

Here we used the standard library sleep, which doesn’t return a future and blocks an entire thread. If you run this now, you’ll see that all futures are blocked from making progress as long as the second future is blocked. Since we didn’t use await, the second future can’t know to yield and give control back to the tokio runtime. If the runtime is blocked, all its futures will be blocked too.

Fortunately, Tokio has our back here. The tokio::task module contains an implementation of green threads, similar to Go’s goroutines. With spawn_blocking, you can get the Tokio runtime to run blocking code inside a dedicated thread pool, allowing other futures to continue making progress.

If you use this for the blocking sleep, your second future will look like this:

async {
    tokio::task::spawn_blocking(|| {
        std::thread::sleep(Duration::from_millis(2800));
    })
    .await
    .unwrap();
    println!("Value 2 ready");
    "Value 2"
},

Now the code runs as expected once again. Of course, this is a contrived example, but the blocking sleep can be replaced with any CPU-heavy blocking code and Tokio will take care of the rest.

Tokio tasks

Although you can do just fine by spawning blocking code in Tokio’s thread pool, to take full advantage of futures and async/await, let’s use asynchronous code from top to bottom. We’ll spawn futures into their own background task using tokio::task::spawn, an async version of std::thread::spawn. We’ll test this out by making a pool of async workers that can receive jobs to run in the background.

Start by defining the list of jobs using an enum.

#[derive(Debug)]
enum Message {
    SendWelcomeEmail { to: String },
    DownloadVideo { id: usize },
    GenerateReport,
    Terminate,
}

The main thing to notice here is the Terminate variant of Message, which tells workers to stop processing jobs when they are no longer needed. We’ve also derived Debug so we can print out the messages later.

Next, we need to use one of the channels provided by Tokio to communicate with the workers.

use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let (sender, receiver) = unbounded_channel();
    let receiver = Arc::new(Mutex::new(receiver));
}

Here, we’re using an unbounded channel, which is an async alternative to the MPSC channel in the standard library. In production, I’d strongly recommend using tokio::sync::mpsc::channel, a limited-size channel that provides back pressure when your application is under load to prevent it from being overwhelmed. The receiver is also wrapped in an Arc and a Tokio Mutex because it will be shared between multiple workers. This won’t compile yet because it can’t infer the type of values we’re going to send through the channel.

Now let’s can spawn a few workers.

let size = 5;
let mut workers = Vec::with_capacity(size);

for id in 0..size {
    let receiver = Arc::clone(&receiver);
    let worker = tokio::spawn(async move { /* ... */ });
    workers.push(worker);
}

This spawns some workers and pushes their JoinHandle onto a Vec. Now all that’s left is to fill out the message handlers .

use std::time::Duration;
use tokio::time::sleep;

// ...

let worker = tokio::spawn(async move {
    loop {
        let message = receiver
            .lock()
            .await
            .recv()
            .await
            .unwrap_or_else(|| Message::Terminate);
        println!("Worker {}: {:?}", id, message);
        match message {
            Message::Terminate => break,
            _ => sleep(Duration::from_secs(1 + id as u64)).await,
        }
    }
});

This just loops forever, printing out each message. If it gets a Terminate message, it breaks the loop. For any other message, it sleeps. In your own application, you’d match against each message you want to handle and put some real logic here.

Be careful with the lifetime of the mutex guard. The reason you don’t use while let Some(message) = receiver.lock()... is that it wouldn’t drop the mutex guard until after the content of the while loop executes. That means the mutex would be locked while you process the message and only one worker could work at a time.

To allow the workers to cleanly terminate, it’s important to send a Terminate message to all of them before the main function finishes. If you don’t do this, jobs will be interrupted and potentially left in a bad state.

for _ in &workers {
    let _ = sender.send(Message::Terminate);
}
for worker in workers {
    let _ = worker.await;
}

The first loop sends the message as many times as there are workers. The second then waits for each worker to complete. We’re using let _ = ... here to discard the results. This kind of code should be placed just before the point where you no longer need the workers. In our case, that’s at the very end of main.

Note that we didn’t use await after the send method here. That’s because we used an unbounded channel, which will never block a sender. Bounded channels can block the sender so you would need to use await when sending. You could also use try_send.

Above this cleanup code, let’s send jobs to the workers.

sender.send(Message::DownloadVideo { id: 10 }).unwrap();
sender.send(Message::GenerateReport).unwrap();
sender.send(Message::SendWelcomeEmail { to: "hi@example.com".into() }).unwrap();
sender.send(Message::DownloadVideo { id: 25 }).unwrap();

In this example, we used channels, a mutex, and spawned async tasks. Each of these have recognizable APIs if you’ve used the standard library equivalents.

It’s my strong opinion that Tokio is ready for production already. That said, the current version is being treated as a beta release for a soon-to-be released version 1.0. Once that’s released, the Tokio team will commit to maintaining it for a minimum of five years. I have found the entire Tokio stack to be of similarly high quality and is a real pleasure to use.

2. async-std

Like the name suggests, async-std attempts to be an asynchronous version of the Rust standard library. It has similar goals as Tokio, which is its main competitor, but is less popular and therefore less battle-tested in the wild.

While the APIs of Tokio and async-std aren’t identical, they are fairly similar and most concepts transfer between the two. Again, both of these libraries will already feel familiar if you have experience with the Rust standard library. For anyone still curious, I’ve published the code for the last example we did, along with an async-std port, on GitHub.

The most important thing to know is that Tokio and async-std are not 100 percent compatible. Libraries built on top of one might not work in the runtime of the other.

For now, the core language and standard library only provide the bare minimum for async/await support; the rest is built by community-written crates. Over time, as the right abstractions are found, some of the foundational pieces may be merged into the standard library and more libraries may become runtime-agnostic. Until then, be sure to check if any async libraries you use depend on a particular runtime.

If stability is a top priority, I would recommend Tokio over async-std because it’s simply more mature and has more libraries built on top of it. For anyone exploring Rust async, it’s still a good idea to try out async-std. Who knows, maybe you’ll find it suits your needs.

3. futures

The futures-rs crate provides much of the shared foundational pieces for async in Rust. Both Tokio and async-std use parts of the futures crate for things like shared traits. In fact, std::future::Future was originally taken from this crate and other parts will likely be moved into the standard library at some point.

A good example of what this crate contains is the stream module. Streams are essentially asynchronous iterators, and this module provides the Stream and StreamExt traits, which include combinator functions similar to the ones available for iterators.

Conclusion

In my opinion, async Rust is in a great place despite some concerns around runtime compatibility. Tokio and async-std are both general-purpose async runtimes that provide async alternatives to standard library types.

For a production application, I’d currently recommend Tokio due to its maturity and the number of crates built on top of it. For people looking for a batteries-included framework, Rust isn’t a good choice yet. But for those that prefer building applications out of smaller, more modular pieces, Tokio and its surrounding ecosystem are brilliant.

LogRocket: Full visibility into production Rust apps

Debugging Rust applications can be difficult, especially when users experience issues that are difficult to reproduce. If you’re interested in monitoring and tracking performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.

LogRocket is like a DVR for web apps, recording literally everything that happens on your Rust app. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.

Modernize how you debug your Rust apps — .

Joshua Cooper I'm a full-stack web developer with a passion for well-written and efficient code. On the frontend, I use Typescript with React and on the backend I use Rust or Node.js with Typescript.

Leave a Reply