Editor’s Note: This article was updated on 03 July 2023 to include information and use cases for working with large values and practical use cases for parallel iterators.
Concurrency and parallel computing are hot topics in the world of software development. To be fair, this popularity is well deserved because they offer significant performance boosts to programs. When incorrectly implemented, however, they cause issues that can be hard to debug, such as data races and deadlocks.
Fortunately, the Rust programming language comes equipped with fearless concurrency, which is made possible through Rust’s ownership and borrow checker model. Thus, many concurrency mistakes leading to data races and deadlocks are captured at compile time.
In this article, we will explore how to implement data parallelism with Rayon Rust and how the Rayon crate improves upon the parallel features of the Rust programming language by making it easier to convert sequential iterators to their similar counterparts.
Before getting started, ensure you are familiar with the Rust programming language and have a basic understanding of Rust iterators. To brush up on your Rust basics, check out our guide to getting up to speed with Rust.
Jump ahead:
par_bridge
method
non-Send
and non-Sync
typesThe iterators provided by the Rust standard library perform operations sequentially by default. Suppose we have a list of 1,000 names. Now, let’s say we want to check if the name Alice
can be found in the list. To do this, we could use the various iterator methods found in the standard library, as shown below:
let names = vec![ ... ]; let is_present = names.iter().any(|name| name == "Alice");
Although this method works, the names in the list are traversed in numerical order until Alice
is found. This may not be the best method if Alice
is found later in the list. So, let’s look at a more practical example. Let’s say we want to download a large 2000MB image. Here’s how we might go about it sequentially (prototype):
/** !! Does not compile !! **/ // Using reqwest crate fn main()-> reqwest::Result<()> { let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; ranges.iter().for_each(|(start, end)| { let client = reqwest::blocking::Client::new(); let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); std::fs::write(format!("file{}", a), response.bytes().unwrap()) }); // finally, join all files Ok(()) }
This method sequentially downloads the file even though we divided the file into chunks. The first chunk gets downloaded first, the second and third, and so on. All this happens on a single thread. However, there are several downsides to this approach. Suppose the server only allows a maximum of 1MB per second for each thread. That would mean that this download would take roughly 2000 seconds on maximum internet connectivity. That is over 33 minutes!
Executing operations in parallel typically improves the speed of a process. Returning to our previous example, each megabyte takes one second to download. Let’s assume the file is divided into eight equal chunks, each having 250MB such that each chunk has its thread.
That means that each thread takes about 250 seconds to download its chunk. Because the threads run simultaneously (i.e., run in parallel), the total time is still 250 seconds — that’s about four minutes! A few seconds might be needed to merge the files to get the complete file.
To put it in prototype code:
/** !! Does not compile !! **/ use rayon::prelude::*; use reqwest::header::RANGE; // — Snip – // let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; ranges.par_iter().for_each(|(start, end)| { let client = reqwest::blocking::Client::new(); let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); std::fs::write(format!("file{}", a), response.bytes().unwrap()) }); // finally, join all files
You may have noticed the use of the par_iter
method. This method is provided by the Rayon crate. Rayon allows programmers to easily convert various iterator methods provided by the standard library into iterators that execute in parallel.
To convert your sequential iterator code snippet into a parallel one, first import the rayon crate with use rayon::prelude::*;
and replace the into_iter
or iter
methods with the corresponding parallel version of into_par_iter
or par_iter
. Here are some examples:
use rayon::prelude::*; let names = vec![ ... ]; let is_present = names.par_iter().any(|name| name == "Alice"); // sequential fn sum_of_char_len(list: &[&str]) -> usize { list.iter().map(|word| word.len()).sum() } // parallel fn par_sum_of_char_len(list: &[&str]) -> usize { list.par_iter().map(|word| word.len()).sum() }
For a more complex example, let’s look at an image processing task where we want to apply a filter to each pixel of an image in parallel. Suppose we have an image represented as a 2D-vector of pixels, where each pixel is a three-element tuple representing the red, green, and blue color values:
use rayon::prelude::*; type Pixel = (u8, u8, u8); // Red, Green, Blue type Image = Vec<Vec<Pixel>>; // simple grayscale filter fn apply_filter(pixel: &Pixel) -> Pixel { let (r, g, b) = *pixel; // Compute the average of the red, green and blue values (use integer arithmetic) let grey = (r as u16 + g as u16 + b as u16) / 3; // Return a new pixel where all color channels have the grey value // The as u8 cast is safe because the average of three u8 values will always be within u8 range. (grey as u8, grey as u8, grey as u8) } fn main() { let image: Image = vec![vec![(10, 20, 30); 800]; 600]; let processed_image: Image = image.par_iter() .map(|row| { row.iter() .map(|pixel| apply_filter(pixel)) .collect() }) .collect(); println!("Processed image size: {}x{}", processed_image.len(), processed_image[0].len()); }
In this example, we use par_iter
to create a parallel iterator over the rows of the image. For each row, we use a sequential iter
and map
to apply the filter to each pixel. The outer map
and collect
operations are parallel, so each row of the image is processed in parallel.
This kind of data parallelism can significantly speed up tasks, like image processing, where each data element can be processed independently. Note that this is a simple example; actual image processing code might use more sophisticated algorithms for color conversion, error handling, and performance optimization.
par_bridge
methodThe par_iter
and into_par_iter
methods typically work on inbuilt collections such as lists and hashmaps. These natively support parallelism. For instance, you could easily process each item in a Vec
concurrently since each item is usually independent of the other. However, they do not work on custom iterator types.
These methods come in handy for a custom iterator to execute in parallel. To illustrate this, here’s a simple prime number generator:
struct PrimeGenerator { start: usize, end: usize, } fn is_prime(num: usize) -> bool { if num <= 1 { return false; } for i in 2..num { if num % i == 0 { return false; } } true } impl Iterator for PrimeGenerator { type Item = usize; fn next(&mut self) -> Option<Self::Item> { while !is_prime(self.start) { self.start += 1; } if self.start > self.end { None } else { let prime = self.start; self.start += 1; Some(prime) } } }
Here, we created a simple struct
and implemented the Iterator
trait. The iterator
yields a prime number within the specified range. To execute this iterator
in parallel, we would have to use the par_bridge
method, as shown below:
fn get_primes() { let primes = PrimeGenerator { start: 0, end: 20, }; println!("ranges are {:?}", primes.par_bridge().collect::<Vec<_>>()) }
N.B., this prints a list of prime numbers in no particular order
Because par_bridge
produces an iterator
, several iterator
methods could be applied to it. These include map
, for_each
, sum
, and so on. You can view the entire list here.
Let’s visualize this below:
// sum prime numbers between a particular range let primes = PrimeGenerator { start: 0, end: 20, }; let sum_of_primes: usize = primes.par_bridge().sum();
Normally, lines from a file are read sequentially. With par_bridge
, you can process these lines in parallel. This can significantly speed up tasks such as processing large log files or data sets:
use std::fs::File; use std::io::{self, BufRead, BufReader}; use rayon::prelude::*; // Function to read a file fn read_file(filename: &str) -> io::Result<BufReader<File>> { let file = File::open(filename)?; Ok(BufReader::new(file)) } let target_word = "ERROR"; let reader = read_file("large_file.txt").expect("Could not read file"); let error_count: usize = reader .lines() .filter_map(Result::ok) .par_bridge() .filter(|line| { line.contains(target_word) // This is a stand in for expensive line processing }) .count(); println!("Number of error logs: {}", error_count);
Since reader.lines
returns an iterator that doesn’t natively support parallelism, we use the par_bridge
method. This allows us to process each line in parallel, filtering out and counting the ones that contain the word "ERROR"
. Note that reading a line from the disk happens sequentially; i.e., line one is read first, followed by line two, line three, etc.
Parallel iterators are a flexible way to introduce concurrency in your application without the hassle of sharing tasks among several threads. However, Rayon also provides join
, scope
, and ThreadPoolBuilder
instances that allow you to create custom tasks. Let’s take a closer look at each one of them.
join
methodThe join
function allows programmers to potentially execute two closures in parallel. Rayon’s method of parallelization
is based on a principle known as work stealing. The first closure is executed on the current thread, while the second closure is made available to be executed on another thread. If there is an idle thread, that thread will execute the closure.
However, when there are no idle threads, as the first closure returns, the second closure is executed on the current thread. Suppose the second closure is already being executed (or “stolen”) by another idle thread. In that case, the current thread waits for the thief (the idle thread currently executing the second closure) to complete.
With that said, the closures passed into the join
method should be CPU-bound tasks and not a blocking task like I/O. You can read more about it here. Here are some usages of join
:
fn strings_to_num(slice: &[&str]) -> Vec<usize> { slice.iter().map(|s| { s.parse::<usize>().expect(&format!("{} is not a number", s)) }).collect() } fn factorial(mut n: u128) -> u128 { (1..=n).reduce(|multiple, next| multiple * next).unwrap() } fn test_join() { let (a, b) = rayon::join(|| factorial(30), || strings_to_num(&["12", "100", "19887870", "56", "9098"]), ); println!("factorial of 1000 is {}", a); println!("numbers are {:?}", b) }
N.B., he closures execute on different threads, so a panic in one of the closures does not cancel the execution of the other closure
scope
methodThe scope
function allows you to spawn an arbitrary number of tasks asynchronously. Similar to join
, scope
returns once all the tasks spawned within it. Let’s look at an example:
fn test_scope() { let mut map: Option<HashMap<String, usize>> = None; let mut factorial = 1; let mut other = None; rayon::scope(|s| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); map = Some(HashMap::from_iter(iter)); }); s.spawn(|_s| { factorial = factorial_sequential(30); }); s.spawn(|_s| { other = Some("value") }) }); println!("map {:?}", map.unwrap()); println!("factorial {:?}", factorial); println!("other {:?}", other); }
There are a few points that you should note. First, unlike join
, scope
doesn’t return anything. However, it is possible to write values to local variables that outlive the scope
. For example, the map
variable defined above outlives the scope
function. This allows scope
to write to map
. Also, the map
variable can be read outside scope
.
Secondly, following Rust borrow checker rules, multiple spawn closures modifying the same variable are not allowed. Additionally, there cannot be a mutable borrow in one closure and an immutable borrow in the other. Thus, the following does not compile:
// !! Does not compile !! // — Snip — // rayon::scope(|s: &Scope| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); map = Some(HashMap::from_iter(iter)); // first mutable usage occurs here }); s.spawn(|s: &Scope| { factorial = factorial_sequential(30); println! ("my map is {:?}", map) // reading of variable here produces compile error (immutable borrow) }); s.spawn(|s: &Scope| { other = Some("value") }) }); // —- Snip —- //
non-Send
and non-Sync
typesIn Rust, types that implement the Send
and Sync
traits are generally assumed to be thread-safe. In other words, they can be shared among multiple threads without undefined behavior or data races. These traits are automatically implemented by the compiler if it is appropriate in that situation. Thus, in the examples above, the Send
and Sync
traits are automatically implemented for usize
and &str
.
The example above does not compile because one thread could be reading the map
variable, and the other might be writing to it, potentially leading to data races. To fix that issue, the go-to method is to wrap the variable in Arc
for custom types. You may want to add Mutex
to make the atomic
variable mutable. For primitive types, such as integers, it is best to use types provided by the std::sync::atomic
. Now, let’s fix the above code so it compiles:
fn test_scope() { let map: Arc<Mutex<Option<HashMap<String, usize>>>> = Arc::new(Mutex::new(None)); let mut factorial = 1; let mut other = None; rayon::scope(|s: &Scope| { s.spawn(|_s| { let iter = (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); let mut map = map.lock().unwrap(); *map = Some(HashMap::from_iter(iter)); //note the use of the "dereference" symbol * }); s.spawn(|_s| { factorial = factorial_sequential(30); println!("my map is {:?}", map.lock().unwrap()) // "map" may or may not be None a }); s.spawn(|_s| { other = Some("value") }) }); println!("map {:?}", map.lock().unwrap()); // —- Snip —- // }
In computing, working with large values can often present unique challenges due to the constraints imposed by the hardware and the design of programming languages. However, parallel processing, which is possible with Rust’s Rayon library, can make these tasks more manageable by breaking the large values into smaller chunks and processing these chunks simultaneously. Let’s look at a couple of practical use cases.
The dot product of two vectors is a common operation in fields such as machine learning, physics, and engineering. If you’re working with very large vectors, calculating the dot product can take a considerable amount of time. However, this process can be parallelized quite effectively with Rayon.
The dot product of two vectors is the sum of the products of their corresponding components. Here’s how you can calculate it using Rayon:
use rayon::prelude::*; let vec1 = vec![1, 2, 3, 4, 5]; let vec2 = vec![6, 7, 8, 9, 10]; let dot_product: i32 = vec1.par_iter() .zip(vec2.par_iter()) .map(|(&x, &y)| x * y) .sum(); println!("{}", dot_product); // Prints 130
In this example, the par_iter()
function is used to create parallel iterators over vec1
and vec2
. The zip()
function is then used to pair up the corresponding elements from the two vectors. The map()
function multiplies the pairs of elements together, and sum()
adds up all these products to compute the dot product.
Calculating the dot product is a great example of a problem that can be easily divided into smaller parts that can be executed concurrently. Each pair of elements can be multiplied independently of all the others, so this operation is perfectly suited to parallel processing. Also, since the sum operation is associative, it can also be done in parallel, which Rayon takes care of automatically.
Another common situation is when you need to process a large file. Let’s take the case of processing a large log file. This is a common task in various domains like web service management and cyber security. In this case, we could parallelize the task of filtering out specific logs based on certain criteria using Rayon.
For example, you might want to count the number of error logs in a large log file:
use std::fs::File; use std::io::{self, BufRead, BufReader}; use std::path::Path; use rayon::prelude::*; // Function to read a file into a Vec<String> fn read_lines<P>(filename: P) -> io::Result<Vec<String>> where P: AsRef<Path>, { let file = File::open(filename)?; let reader = BufReader::new(file); reader.lines().collect() } let lines = read_lines("large_log_file.log").expect("Could not read file"); let error_count: usize = lines.par_iter() .filter(|line| line.contains("ERROR")) .count(); println!("Number of error logs: {}", error_count);
In this code, we read the log file line by line, parallelize the operation of checking if a line contains "ERROR"
, and count the number of such lines.
This code is more efficient than a single-threaded version since the task of checking each line is done concurrently, which can be significantly faster for large log files. Parallelizing IO-bound tasks such as this can be a great way to improve the performance of your Rust applications when working with large files.
Working with large values can be daunting, but with Rust and Rayon, these problems can be tackled effectively by taking advantage of parallel processing. It not only improves the performance but also the efficiency of your code.
Although concurrent and asynchronous tasks are tricky to implement, they have the potential to enhance the performance of the programs you write. Fortunately, Rust removes the tricky part and makes writing asynchronous apps a breeze.
Rayon comes as an add-on to further simplify this task. With its handy parallel iterator implementation, there is much more flexibility than you could imagine. The more you try your hands with asynchronous code problems, the better you become. Happy coding!
Debugging Rust applications can be difficult, especially when users experience issues that are hard to reproduce. If you’re interested in monitoring and tracking the performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.
LogRocket is like a DVR for web and mobile apps, recording literally everything that happens on your Rust application. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.
Modernize how you debug your Rust apps — start monitoring for free.
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowBuild scalable admin dashboards with Filament and Laravel using Form Builder, Notifications, and Actions for clean, interactive panels.
Break down the parts of a URL and explore APIs for working with them in JavaScript, parsing them, building query strings, checking their validity, etc.
In this guide, explore lazy loading and error loading as two techniques for fetching data in React apps.
Deno is a popular JavaScript runtime, and it recently launched version 2.0 with several new features, bug fixes, and improvements […]