Carl Fredrik Samson Programmer located in Norway with an interest in concurrent systems. I'm curious about how things really work, whether it's computers or other areas of interest.

A practical guide to async in Rust

10 min read 2974

A Practical Guide to Async in Rust

Programming languages have different methods of representing asynchronous operations. The way Rust handles concurrency should be familiar if you’ve ever used async/await in JavaScript. The keywords are the same and the fundamental model is similar, except that in Rust, deferred computations are called futures instead of promises. What is not so familiar is that you need to pick a runtime to actually run your asynchronous code.

Rust targets everything from bare-metal, embedded devices to programs running on advanced operating systems and, like C++, focuses on zero-cost abstractions. This impacts what is and isn’t included in the standard library.

Once you have the know-how, you’ll find it’s really not difficult to get started with async in Rust. If you’re writing an asynchronous program in Rust for the first time or just need to use an asynchronous library and don’t know where to start, this guide is for you. I’ll try to get you going as quickly as possible while introducing you to all the essentials you should know.

Essentials

‌An async application should pull in at least two crates from Rusts ecosystem:

  1. futures, an official Rust crate that lives in the rust-lang repository
  2. A runtime of your choosing, such as Tokio, async_std, smol, etc.

Some people don’t want to pull in more dependencies than they need to, but these are as essential as the chrono or log crates. The only difference is that these focus on async instead.

We’ll use tokio for the purpose of this tutorial. You should get to know at least one runtime and focus on that first. You can check out other runtimes later. You’ll likely notice that they have a lot in common in terms of functionality, even if the API or implementations may differ.

To provide the best experience when working with async in Rust, you should enable some features. The dependencies section of your Cargo.toml should look like this:

[dependencies]
futures = { version = "0.3.*" }
tokio = {version = "0.2.*", features = ["full"] }

Your main.rs should look as follows.

use futures::prelude::*;
use tokio::prelude::*;

fn main() {
    todo!();
}

Runtimes

‌Contrary to what you might be used to with other languages, Rust doesn’t have a built-in runtime. We won’t discuss the pros and cons of that here, but you’ll need to make a choice and pull that in as a dependency.

Some libraries require you to use a specific runtime because they rely on runtime internals to provide a solid API for you to use or wrap their own API around an existing runtime. One example is the actix_web web framework, which wraps its own API around tokio.

Most of the time, you can choose any runtime you want. But no matter which runtime you choose, there are three basic operations you should figure out how to do before you start coding:‌

We made a custom demo for .
No really. Click here to check it out.

  1. How to start the runtime
  2. How to spawn a Future
  3. How to spawn blocking or CPU-intensive tasks

‌You can complete most tasks if you know these basic operations. Let’s walk through all three using Tokio as an example.

1. Starting the runtime

You can explicitly instantiate the runtime and spawn a future onto it. The future you spawn will be the main entry point for your program, so think of it like an asynchronous main function.

async fn app() {
    todo!()
}

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    let future = app();
    rt.block_on(future);
}

You could also use the shorter version. which basically does the same thing.

#[tokio::main]
async fn main() {

}

‌2. Spawning a Future on the runtime

This comes in handy when you want to run futures concurrently (i.e., tasks that are progressing simultaneously).

use tokio::task;

async fn our_async_program() {
    todo!();
}

async fn app() {
    let concurrent_future = task::spawn(our_async_program());
    todo!()
}

‌3. Spawning blocking or CPU-intensive tasks

This is a common problem when writing async code in general. If you want to take advantage of having a runtime that runs your code concurrently, you should avoid blocking or running CPU-intensive code in Futures themselves. Most of the code you write in async Rust will actually be executed in a Future, whic is important to be aware of.

Most runtimes provide a way to offload this work to a different thread, which helps you avoid blocking the thread that is actually driving your futures to completion. In tokio, you can do this via task::spawn_blocking.

Using our example, we can do the following.

use tokio::task;

fn fib_cpu_intensive(n: u32) -> u32 {
    match n {
        0 => 0,
        1 => 1,
        n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2),
    }
}

async fn app() {
    let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30));
    todo!()
}

Each runtime has a slightly different API to accomplish these tasks, but they all support them. If you know what to look for, you’ll have an easier time getting started.

An async project starter template

At this point where, we can work with async in Rust with almost the same ease with which we write normal synchronous code, but let’s venture a little further and cover some things that might come in handy later on.

I’ll provide a template you can use to start applications where you know you’ll need to write async code. I like to instantiate the runtime explicitly, which is what we’ll do in the template below.

use futures::prelude::*;
use tokio::prelude::*;
use tokio::task;
use log::*;

// Just a generic Result type to ease error handling for us. Errors in multithreaded
// async contexts needs some extra restrictions
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

async fn app() -> Result<()> {
    // I treat this as the `main` function of the async part of our program. 
    todo!()
}

fn main() {
    env_logger::init();
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    match rt.block_on(app()) {
        Ok(_) => info!("Done"),
        Err(e) => error!("An error ocurred: {}", e),
    };
}

‌I’ve pulled in a few more crates, including:

  • A runtime (tokio)
  • Rust’s logging facade, log
  • A logger (env_logger)

‌Our Cargo.toml now looks like this:

[dependencies]
futures = { version = "0.3.*"}
tokio = {version = "0.2.*", features = ["full"] }
log = "0.4.*"
env_logger = "0.7.*"

Remember that env_logger relies on the RUST_LOG environment variable to determine the log level.

Most of my async application projects start with a main.rs and a Cargo.toml like the ones presented above. I can add better error handling or logging if necessary as the project evolves. A popular crate for handling errors in applications is Anyhow. async-log is useful for improving logging in async context.

Throughout this tutorial, we’ll use this basic template for all our code. However, you might notice that I’ve adapted the log output slightly to better suit what I wanted to show. If you want to follow along, you should change your logging initialization in main.rs to the following.

let start = std::time::Instant::now();
env_logger::Builder::from_default_env().format(move |buf, rec| {
    let t = start.elapsed().as_secs_f32();
    writeln!(buf, "{:.03} [{}] - {}", t, rec.level(),rec.args())
}).init();

Async functions in Rust

‌Async functions in Rust differ somewhat from what you’re used to. When you learned Rust, you probably noticed how it’s very precise about what types the argument of a function has and what type the function returns.

Async functions differ in one important way: all your return types are “wrapped” into a Future.

You might read the documentation about Futures in Rust and think your async function needs to look like this:

async fn our_async_program() -> impl Future<Output = Result<String>> {
    future::ok("Hello world".to_string()).await
}

‌This is wrong! If you’re doing this, you’re overthinking it. An async function already wraps the return type, so you can write functions the way you’re used to.

This is what you actually want:

async fn our_async_program() -> Result<String> {
    future::ok("Hello world".to_string()).await
}

future::ok is one of the convenience methods we get from the futures crate. It wraps a value in a future that returns ready immediately.

This might seem a bit strange since Rust is usually extremely rigorous when it comes to declaring the correct types, but it’s actually a huge ergonomic boost because it automatically wraps the return types from our async functions.

You’ll often see examples using async blocks, such as async { ... }. These are similar to async functions in that they return a special kind of future that wraps whatever we return from the closure. One drawback with these closures is that you’ll have to jump through some hoops to return errors from them via ?. The return types can be difficult to reason about, which can cause some unneeded confusion when you’re starting out writing async Rust.

My suggestion is to use async functions if you can, especially if you intend to return anything from the future — at least until you’re comfortable with the different return types and how async in Rust works.

Making a web request

Futures in Rust are lazy. By default, they won’t do anything before they’re polled the first time. The future gets polled when you await it.

For example, if you call a function that returns a future at the start of your program but don’t await it before the end of the program, the actual request will not be made before you reach the point where you await it (in the end).

Let’s put what we’ve learned so far into practice.

Reqwest is a popular client library for creating web requests. We’ll use that together with Slowwly endpoint, which enables us to define a delay for the server response and gives us little more determinism in our concurrency, making it easier to reason about it.

Let’s add reqwest to our Cargo.toml by adding reqwest = "0.10.*" to the [dependencies] section.

Create a few requests and see what happens.

fn slowwly(delay_ms: u32) -> reqwest::Url {
    let url = format!(
    "http://slowwly.robertomurray.co.uk/delay/{}/url/http://www.google.co.uk", 
    delay_ms,
    );
    reqwest::Url::parse(&url).unwrap()
}

async fn app() -> Result<()> {
    info!("Starting program!");
    let _resp1 = reqwest::get(slowwly(1000)).await?;
    info!("Got response 1");
    let _resp2 = reqwest::get(slowwly(1000)).await?;
    info!("Got response 2");
    Ok(())
}

‌Running this gives us the following output.

1.264 [INFO] - Got response 1
2.467 [INFO] - Got response 2
2.468 [INFO] - Done

‌The time is in seconds/milliseconds. At 1.246, we got the first response from our endpoint (remember, we asked for a delay of one second on the first request). Roughly one second later, at 2.467, we got the second response. The whole program took 2.468 seconds to run.

So, our program is working, but this is not really concurrent, is it? Honestly, it’s not much better than a complicated synchronous program.

Let’s actually take advantage of our async runtime and run the requests concurrently.

async fn request(n: usize) -> Result<()> {
    reqwest::get(slowwly(1000)).await?;
    info!("Got response {}", n);
    Ok(())
}

async fn app() -> Result<()> {
    let resp1 = task::spawn(request(1));
    let resp2 = task::spawn(request(2));

    let _ = resp1.await??;
    let _ = resp2.await??;

    Ok(())
}

At this point, we should refactor our request out to a separate function for two reasons:

  1. We want our logging after we get the results from our request, so we need to wrap both the request and the logging in a task that we spawn on to the runtime and await
  2. The alternative would be to use an async { } block. But although it’s possible to specify a return type for our Result<()>, it’s pretty awkward, so we’ll avoid that for the purpose of this tutorial

‌If we run our code, we should get this:

1.247 [INFO] - Got response 2
1.256 [INFO] - Got response 1
1.257 [INFO] - Done

That looks better. Our second request finishes at 1.247 and our first at 1.256. The whole program takes 1.257 seconds, which is less than half the time it took in the first example.

Using spawn enables us to run our requests concurrently. Since Tokio defaults to a multithreaded runtime, tasks spawned this way can also run in parallel on different cores.

CPU-intensive tasks

Now that we got our program to run concurrently, we can combine some CPU-intensive tasks with some I/O-bound tasks and create a more complex scenario.

Let’s expand our example slightly by making 10 requests and doing some analysis of each response as we get them. We’re super excited to see the ratio of ones versus zeroes in the bytes we get from the response, so we’ll return a count for ones and zeros and report the ratio in the end.

use futures::future::join_all;

// Now we want to both fetch some data and do some CPU intensive analysis on it
async fn get_and_analyze(n: usize) -> Result<(u64, u64)> {
    let response: reqwest::Response = reqwest::get(slowwly(1000)).await?;
    info!("Dataset {}", n);

    // we get the body of the request
    let txt = response.text().await?;

    // We send our analysis work to a thread where there is no runtime running
    // so we don't block the runtime by analyzing the data
    let res= task::spawn_blocking(move ||analyze(&txt)).await?;
    info!("Processed {}", n);
    Ok(res)
}

// Counting the number of ones and zeros in the bytes we get.
fn analyze(txt: &str) -> (u64, u64) {
    let txt = txt.as_bytes();
    // Let's spend as much time as we can and count them in two passes
    let ones = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_ones() as u64);
    let zeros = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_zeros() as u64);
    (ones, zeros)
}

async fn app() -> Result<()> {

    // This is new. We can collect futures in a collection. Nice to know!
    let mut futures = vec![];

    for i in 1..=10 {
        let fut = task::spawn(get_and_analyze(i));
        futures.push(fut);
    }

    let results = join_all(futures).await;

    let mut total_ones = 0;
    let mut total_zeros = 0;

    // Returning errors using `?` in iterators can be a bit difficult. Using a
    // simple for loop to inspect and work with our results can often be more
    // ergonomic
    for result in results {
        // `spawn_blocking` returns a `JoinResult` we need to unwrap first
        let ones_res: Result<(u64, u64)> = result?;
        let (ones, zeros) = ones_res?;
        total_ones += ones;
        total_zeros += zeros;
    }

    info!("Ratio of ones/zeros: {:.02}",total_ones as f64 / total_zeros as f64);
    Ok(())
}

A few things to note:

  • The Futures crate has many convenient tools, including join_all, which treats a collection of futures as a single future and drives them all to completion, and the FuturesUnordered API from the same crate.
  • We can collect futures in a normal Vec
  • There can be a lot of error unwrapping when writing async code. This is normal, but can be a bit confusing sometimes. A tool such as rust-analyzer can help you keep track of what errors are returned. An error handling crate such as Anyhow can also help here‌

Let’s take a look at what our program outputs.

1.270 [INFO] - Dataset 7
1.275 [INFO] - Dataset 3
1.285 [INFO] - Dataset 2
1.285 [INFO] - Dataset 4
1.291 [INFO] - Dataset 9
1.297 [INFO] - Dataset 1
1.301 [INFO] - Dataset 5
1.308 [INFO] - Dataset 6
1.312 [INFO] - Dataset 8
1.322 [INFO] - Dataset 10
1.374 [INFO] - Processed 7
1.377 [INFO] - Processed 3
1.378 [INFO] - Processed 4
1.378 [INFO] - Processed 2
1.380 [INFO] - Processed 9
1.384 [INFO] - Processed 1
1.385 [INFO] - Processed 5
1.391 [INFO] - Processed 8
1.391 [INFO] - Processed 6
1.397 [INFO] - Processed 10
1.397 [INFO] - Ratio of ones/zeros: 0.95
1.397 [INFO] - Done

‌Since we send off all our datasets immediately and each one takes a second to return, all our responses come back in as datasets almost simultaneously. Each response is then sent for analysis on a thread pool.

We can see that datasets are not necessarily finished processing in the order in which they come in since they are processed on separate CPU cores.

To spawn or not to spawn?

Since we should spawn both futures and CPU-bound and blocking tasks, what can we write in our async functions?

Normally, you can write most of your code without worrying too much about that, but blocking and CPU-intensive tasks should make you stop and consider whether you should refactor that part of your code so it can be spawned on the thread pool designed for handling these.

If you encounter a situation where you might need one of the following modules, you should check if your run-time has an async alternative for the task you want to perform.

  • std::sync
  • std::thread
  • std::fs
  • std::net

If your runtime doesn’t have an equivalent, you can use spawn_blocking and do the operation in a thread pool like you would do with CPU-intensive tasks and await the result.

In general, functions that call in to your OS and might result in the OS parking the thread you’re calling from are especially harmful to concurrency since you’ll park the executor as well.

Using thread::sleep is a prime example of a function you should avoid in an async context for this exact reason. It’s tempting to use sleep to delay an operation, but it’s not a good idea because the OS will park the calling thread for the whole duration. Most runtimes have a way to cause a delay or sleep a task that does not block the executor, thereby disabling all other tasks running on that thread as well.

For CPU-bound tasks, the lines are much blurrier. In general, I would encourage you to not be paranoid about this. Just remember it’s easier to refactor back if you decide to reduce the number of calls to spawn_blocking than the other way around.

When in doubt, spawn.

By now you should be prepared to write async Rust, and I hope you’ll find it easier to get started on your next async project.

: Full visibility into your web apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.

.
Carl Fredrik Samson Programmer located in Norway with an interest in concurrent systems. I'm curious about how things really work, whether it's computers or other areas of interest.

17 Replies to “A practical guide to async in Rust”

  1. 3. Spawning blocking or CPU-intensive tasks
    uses `use tokio::runtime::task;` which causes an error. The example before that uses `use tokio::task;`.
    Is the former a mistake?

    1. Starting the runtime
    with `#[tokio::main]` is there implicitly `app()` called?
    This doesn’t seem to work for me.

  2. You’re right. It should be `use tokio::task;` as in the earlier example. I’ll check if I can get that corrected. Thanks for catching.

    with `#[tokio::main]` is there implicitly `app()` called?

    No, in the first example `app` becomes your “asynchronous main”, in the second example the macro turns you “main” into your “asynchronous main”. What happens behind the scenes is pretty much the same though but you lose access to your “synchronous main” in the latter example.

  3. Looks like the “shorter version” should be
    “`
    #[tokio::main]
    async fn main() {
    app().await;
    }
    “`

  4. I had to change
    “`
    let res = future::ok(“Hello world”.to_string()).await?;
    “`
    to
    “`
    let res = future::ok::<String,Box>(“Hello world”.to_string()).await.unwrap();
    “`
    to get rid of compile errors

  5. “CPU-intensive tasks”
    `let res= task::spawn_blocking(move ||analyze(txt)).await?;`
    is missing a `&`
    `let res= task::spawn_blocking(move ||analyze(&txt)).await?;`

  6. I guess the issues were meant as an exercise for the reader 😉
    Thanks for posting. Very helpful.

  7. You could write that but if you `cargo expand` (cargo install cargo-expand) the example I wrote you get something like this:
    “`
    fn main() {
    tokio::runtime::Builder::new()
    .basic_scheduler()
    .threaded_scheduler()
    .enable_all()
    .build()
    .unwrap()
    .block_on(async {
    {
    {
    todo!();
    }
    }
    })
    }
    “`

    As you see the “app” part is wrapped in an async block and passed to the runtime so “main” would in essence function like the “app” in the example above.

  8. Yeah, you’re right. I was trying to avoid these in this article but it seems I inadvertently introduced it in those examples. I think it’s better to change it to:

    “‘
    async fn our_async_program() -> Result {
    future::ok(“Hello world”.to_string()).await
    }
    “‘
    Since the compiler can infer the rest in this case. Thanks for pointing it out.

  9. Yes, you’re right. analyze took a “String” in an earlier draft but I changed that without catching this one. Should be fixed soon.

  10. Glad you enjoyed it. Well, I couldn’t make it too easy 🙂 Seriously, thanks for posting. The next person testing all the code should have a slightly easier time, though.

  11. Hi Philip. I see where the confusion lies now. You see, `#[tokio::main]`is a macro that rewrites `fn main()` in a way that the code ends up looking like the first example.

    The difference is that the code you write in a main function with `#[tokio::main]` is wrapped in an async block instead of put in a function called `app` but the end result is pretty similar.

    I posted the output from the macro in another answer below but it doesn’t look pretty in the comments section here. If you want to check it out for yourself install `cargo install cargo-expand` and run `cargo expand` in the root of a project with the example code in `main.rs`. You’ll see what it expands into.

  12. > // Returning errors using `?` in iterators can be a bit difficult. Using a
    // simple for loop to inspect and work with our results can often be more
    // ergonomic

    What’s wrong with `try_for_each`?

  13. Hi Daniel.

    That’s a good suggestion, but I feel the example gets harder to understand using Iterators since we can’t simply unwrap using `?` as we do in the rest of the examples.

    `results` is of type: `Vec<Result<Result<(u64, u64), Box, JoinError>>` which is a nested result making the iterator version pretty difficult to understand for people not intimately familiar with functional style programming.

    We might as well use `try_fold` instead of `try_for_each` if we were to do this operation using a functional style so the code would look something like this:

    “`
    let (total_ones, total_zeros) =
    results
    .into_iter()
    .flatten()
    .try_fold((0, 0), |(total_ones, total_zeros), res| {
    res.map(|(ones, zeros)| (total_ones + ones, total_zeros + zeros))
    })?;
    “`

    I feel it makes things more complicated than it needs to be in an example where the main focus is on keeping the code easy to read for a wide audience without really giving a lot of benefit in return 🙂

Leave a Reply