Concurrency and parallel computing are hot topics in the world of software development. To be fair, this popularity is well deserved because they offer significant performance boosts to programs. When incorrectly implemented, however, they cause issues that can be hard to debug, such as data races and deadlocks.
Fortunately, the Rust programming language comes equipped with fearless concurrency, which is made possible through Rust’s ownership and borrow checker model. Thus, many concurrency mistakes leading to data races and deadlocks are captured at compile time.
In this article, we will explore how to implement data parallelism with Rayon Rust and how the rayon crate improves upon the parallel features of the Rust programming language by making it easier to convert sequential iterators to their similar counterparts.
Before getting started, ensure you are familiar with the Rust programming language and have a basic understanding of Rust iterators. To brush up on your Rust basics, check out our guide to getting up to speed with Rust.
Jump ahead:
- Understanding sequential iterators in Rust
- Implementing parallel iterators
- Using Rayon’s
par_bridge
method - Exploring Rayon’s instances
- Working with
non-Send
andnon-Sync
types
Understanding sequential iterators in Rust
The iterators provided by the Rust standard library perform operations sequentially by default. Suppose we have a list of 1000 names. Now, let’s say we want to check if the name Alice
can be found in the list. To do this, we could use the various iterator methods found in the standard library, as shown below:
let names = vec![ ... ]; let is_present = names.iter().any(|name| name == "Alice");
Although this method works, the names in the list are traversed in numerical order until Alice
is found. This may not be the best method if Alice
is found later in the list. So, let’s look at a more practical example. Let’s say we want to download a large 2000 MB image. Here’s how we might go about it sequentially (prototype):
/** !! Does not compile !! **/ // Using reqwest crate fn main()-> reqwest::Result<()> { let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; ranges.iter().for_each(|(start, end)| { let client = reqwest::blocking::Client::new(); let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); std::fs::write(format!("file{}", a), response.bytes().unwrap()) }); // finally, join all files Ok(()) }
This method sequentially downloads the file even though we divided the file into chunks. The first chunk gets downloaded first, the second and third, and so on. All this happens on a single thread. However, there are several downsides to this approach. Suppose the server only allows a maximum of 1MB per second for each thread. That would mean that this download would take roughly 2000 seconds on maximum internet connectivity. That is over 33 minutes!
Implementing parallel iterators
Executing operations in parallel typically improve the speed of a process. Returning to our previous example, each megabyte takes one second to download. Let’s assume the file is divided into eight equal chunks, each having 250MB such that each chunk has its thread.
That means that each thread takes about 250 seconds to download its chunk. Because the threads run simultaneously (run in parallel,) the total time is still 250 seconds — that’s about four minutes! A few seconds might be needed to merge the files to get the complete file.
To put it in prototype code:
/** !! Does not compile !! **/ use rayon::prelude::*; use reqwest::header::RANGE; // — Snip – // let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; ranges.par_iter().for_each(|(start, end)| { let client = reqwest::blocking::Client::new(); let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); std::fs::write(format!("file{}", a), response.bytes().unwrap()) }); // finally, join all files
You may have noticed the use of the par_iter
method. This method is provided by the rayon crate. Rayon allows programmers to easily convert various iterator methods provided by the standard library into iterators that execute in parallel. To convert your sequential iterator code snippet into a parallel one, first import the rayon crate with use rayon::prelude::*;
and replace the into_iter
or iter
methods with the corresponding parallel version of into_par_iter
or par_iter
. Here are some examples:
use rayon::prelude::*; let names = vec![ ... ]; let is_present = names.par_iter().any(|name| name == "Alice"); // sequential fn sum_of_char_len(list: &[&str]) -> usize { list.iter().map(|word| word.len()).sum() } // parallel fn par_sum_of_char_len(list: &[&str]) -> usize { list.par_iter().map(|word| word.len()).sum() }
Using Rayon’s par_bridge
method
The par_iter
and into_par_iter
methods typically work on built-in collections such as lists and hashmaps. However, they do not work on custom iterator types. The method comes in handy for a custom iterator to execute in parallel. To illustrate this, here is a simple prime number generator:
struct PrimeGenerator { start: usize, end: usize, } fn is_prime(num: usize) -> bool { if num <= 1 { return false; } for i in 2..num { if num % i == 0 { return false; } } true } impl Iterator for PrimeGenerator { type Item = usize; fn next(&mut self) -> Option<Self::Item> { while !is_prime(self.start) { self.start += 1; } if self.start > self.end { None } else { let prime = self.start; self.start += 1; Some(prime) } } }
Here, we created a simple struct
and implemented the Iterator
trait. The iterator
yields a prime number within the specified range. To execute this iterator
in parallel, we would have to use the par_bridge
method, as shown below:
fn get_primes() { let primes = PrimeGenerator { start: 0, end: 20, }; println!("ranges are {:?}", primes.par_bridge().collect::<Vec<_>>()) }
Note: This prints a list of prime numbers in no particular order.
Because par_bridge
produces an iterator
, several iterator
methods could be applied to it. These include, map
, for_each
, sum
, and so on. You can view the entire list here.
Let’s visualize this below:
// sum prime numbers between a particular range let primes = PrimeGenerator { start: 0, end: 20, }; let sum_of_primes: usize = primes.par_bridge().sum();
Exploring Rayon’s instances
Parallel iterators are a flexible way to introduce concurrency in your application without the hassle of sharing tasks among several threads. However, rayon also provides join
, scope
, and ThreadPoolBuilder
instances that allow you to create custom tasks. Let’s dive into each one of them.
The join
method
The join
function allows programmers to potentially execute two closures in parallel. Rayon’s method of parallelization
is based on a principle known as work stealing. The first closure is executed on the current thread, while the second closure is made available to be executed on another thread. If there is an idle thread, that thread will execute the closure.
However, when there are no idle threads, as the first closure returns, the second closure is executed on the current thread. Suppose the second closure is already being executed by another idle thread (stolen). In that case, the current thread waits for the thief (the idle thread currently executing the second closure) to complete. With that said, the closures passed into the join
method should be CPU-bound tasks and not a blocking task like I/O. You can read more about it here. Here are some usages of join
:
fn strings_to_num(slice: &[&str]) -> Vec<usize> { slice.iter().map(|s| { s.parse::<usize>().expect(&format!("{} is not a number", s)) }).collect() } fn factorial(mut n: u128) -> u128 { (1..=n).reduce(|multiple, next| multiple * next).unwrap() } fn test_join() { let (a, b) = rayon::join(|| factorial(30), || strings_to_num(&["12", "100", "19887870", "56", "9098"]), ); println!("factorial of 1000 is {}", a); println!("numbers are {:?}", b) }
Note: Because the closures execute on different threads, a panic in one of the closures does not cancel the execution of the other closure.
The scope
method
The scope
function allows you to spawn an arbitrary number of tasks asynchronously. Similar to join
, scope
returns once all the tasks spawned within it. Let’s look at an example:
fn test_scope() { let mut map: Option<HashMap<String, usize>> = None; let mut factorial = 1; let mut other = None; rayon::scope(|s| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); map = Some(HashMap::from_iter(iter)); }); s.spawn(|_s| { factorial = factorial_sequential(30); }); s.spawn(|_s| { other = Some("value") }) }); println!("map {:?}", map.unwrap()); println!("factorial {:?}", factorial); println!("other {:?}", other); }
There are a few points that you should note. First, unlike join
, scope
doesn’t return anything. However, it is possible to write values to local variables that outlive the scope
. For example, the map
variable defined above outlives the scope
function. This allows scope
to write to map
. Also, the map
variable can be read outside scope
.
Secondly, following Rust borrow checker rules, multiple spawn closures modifying the same variable are not allowed. Additionally, there cannot be a mutable borrow in one closure and an immutable borrow in the other. Thus, the following does not compile:
// !! Does not compile !! // — Snip — // rayon::scope(|s: &Scope| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); map = Some(HashMap::from_iter(iter)); // first mutable usage occurs here }); s.spawn(|s: &Scope| { factorial = factorial_sequential(30); println! ("my map is {:?}", map) // reading of variable here produces compile error (immutable borrow) }); s.spawn(|s: &Scope| { other = Some("value") }) }); // —- Snip —- //
Working with non-Send
and non-Sync
types
In Rust, types that implement the Send
and Sync
traits are generally assumed to be thread-safe. That is, they can be shared amongst multiple threads without undefined behavior or data races. These traits are automatically implemented by the compiler if it is appropriate in that situation. Thus, in the examples above, the Send
and Sync
traits are automatically implemented for usize
and &str
.
The example above does not compile because one thread could be reading the map
variable, and the other might be writing to it, potentially leading to data races. To fix that issue, the go-to method is to wrap the variable in Arc
for custom types. You may want to add Mutex
to make the atomic
variable mutable. For primitive types, such as integers, it is best to use types provided by the std::sync::atomic
. Now, let’s fix the above code so it compiles:
fn test_scope() { let map: Arc<Mutex<Option<HashMap<String, usize>>>> = Arc::new(Mutex::new(None)); let mut factorial = 1; let mut other = None; rayon::scope(|s: &Scope| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); let mut map = map.lock().unwrap(); *map = Some(HashMap::from_iter(iter)); //note the use of the "dereference" symbol * }); s.spawn(|_s| { factorial = factorial_sequential(30); println!("my map is {:?}", map.lock().unwrap()) // "map" may or may not be None a }); s.spawn(|_s| { other = Some("value") }) }); println!("map {:?}", map.lock().unwrap()); // —- Snip —- // }
Conclusion
Although concurrent and asynchronous tasks are tricky to implement, they have the potential to enhance the performance of the programs you write. Fortunately, Rust removes the tricky part and makes writing asynchronous apps a breeze. Rayon comes as an add-on to further simplify this task. With its handy parallel iterator implementation, there is much more flexibility than you could imagine. The more you try your hands with asynchronous code problems, the better you become. Happy coding!
LogRocket: Full visibility into web frontends for Rust apps
Debugging Rust applications can be difficult, especially when users experience issues that are difficult to reproduce. If you’re interested in monitoring and tracking performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.
LogRocket is like a DVR for web and mobile apps, recording literally everything that happens on your Rust app. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.
Modernize how you debug your Rust apps — start monitoring for free.