Async Rust: Few examples to get it right

Async Rust: Few examples to get it right

·

4 min read

Before we start, I hope you already know how to create a new rust project, add some dependencies and other basic stuff. If not, I recommend to read the Book first :)

OK, lets go!

In Rust to run code asynchronously you should start the block or function with async keyword. Async keyword transforms block of code into a state machine that implements a trait called Future. A future represents an asynchronous computation. For example

async fn one() -> u32 {
   1
}

is the same as

fn one() -> impl Future<Output=u32> { 
   async { 1 } 
}

Async bodies are lazy: they do nothing until they are run. The most common way to run a Future is to .await it. .await can be used inside an async function, otherwise you should use thread-blocking operations like futures::executor::block_on to run the future, but it is not important, since in this article we will work with async code only.

To allow async main function, you need to choose runtime for running our asynchronous program. In this example i will use Tokio runtime. So let`s start with adding this line to our Cargo.toml file into dependencies section.

tokio = { version = "1.15.0", features = ["full"] }

It is easy to make our main function async with tokio, we just need to mark function to be executed by selected runtime this way:

use tokio;

#[tokio::main]
async fn main() {}

Let`s write simple async function that we will use for future testing of our code

async fn task(name: &str, long: bool) {
    println!("Executing {}", name);

    if long {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }

    println!("{} executed", name);
}

Name argument is needed here for user-friendly output of our program.

Long argument stands for if the function gets blocked for a second during its execution.

Note that we will use tokio::time::sleep here instead of std::thread::sleep as std::thread::sleep is thread-blocking operation and will block all the thread instead of allowing other tasks to run if the future is currently unable to make progress.

Update main function with this code and then try to run it

use tokio;
use std::time::Duration;

#[tokio::main]
async fn main() {
    task("Long Task 1", true).await;
    task("Short Task 1", false).await;
    task("Long Task 2", true).await;
    task("Short Task 2", false).await;
}

As a result we will have such output:

Executing Long Task 1
Long Task 1 executed
Executing Short Task 1
Short Task 1 executed
Executing Long Task 2
Long Task 2 executed
Executing Short Task 2
Short Task 2 executed

As you can see all tasks here were executed consistently, short tasks were waiting for a long ones to be executed. This is definitely not what we wanted.

Let`s improve our code to run the tasks concurrently. To achieve our goal, we need to add a new dependency to our Cargo.toml file.

Add this line to dependencies section:

futures = "0.3.5"

The futures crate provides a number of core abstractions for writing asynchronous code. In our case we will need futures::future::join_all functionality. join_all function creates a future which represents a collection of the outputs of the futures given.

The returned future will drive execution for all of its underlying futures, collecting the results into a destination Vec<T> in the same order as they were provided.

So lets rewrite our main function. Now we need to collect our tasks futures into vector and then execute them via join_all function.

use tokio;
use std::time::Duration;
use futures;

#[tokio::main]
async fn main() {
    let mut futures = Vec::new();

    futures.push(task("Long Task 1", true));
    futures.push(task("Short Task 1", false));
    futures.push(task("Long Task 2", true));
    futures.push(task("Short Task 2", false));

    futures::future::join_all(futures).await;
}

And if we run it, the output will be:

Executing Long Task 1
Executing Short Task 1
Short Task 1 executed
Executing Long Task 2
Executing Short Task 2
Short Task 2 executed
Long Task 1 executed
Long Task 2 executed

As you can see, long blocked tasks yield to a small ones!

Our code works concurrently as we expected!

You can find all working examples on my GitHub repository :)