Kofi Otuo Medical student at the University of Ghana and a tech enthusiast. I enjoy programming all systems including Android, UNIX, and Windows. Currently specializing in Rust and blockchain development.

Implementing data parallelism with Rayon Rust

6 min read 1933

Implementing Data Parallelism With Rayon Rust

Concurrency and parallel computing are hot topics in the world of software development. To be fair, this popularity is well deserved because they offer significant performance boosts to programs. When incorrectly implemented, however, they cause issues that can be hard to debug, such as data races and deadlocks.

Fortunately, the Rust programming language comes equipped with fearless concurrency, which is made possible through Rust’s ownership and borrow checker model. Thus, many concurrency mistakes leading to data races and deadlocks are captured at compile time.

In this article, we will explore how to implement data parallelism with Rayon Rust and how the rayon crate improves upon the parallel features of the Rust programming language by making it easier to convert sequential iterators to their similar counterparts.

Before getting started, ensure you are familiar with the Rust programming language and have a basic understanding of Rust iterators. To brush up on your Rust basics, check out our guide to getting up to speed with Rust.

Jump ahead:

Understanding sequential iterators in Rust

The iterators provided by the Rust standard library perform operations sequentially by default. Suppose we have a list of 1000 names. Now, let’s say we want to check if the name Alice can be found in the list. To do this, we could use the various iterator methods found in the standard library, as shown below:

let names = vec![ ... ]; 
let is_present = names.iter().any(|name| name == "Alice"); 

Although this method works, the names in the list are traversed in numerical order until Alice is found. This may not be the best method if Alice is found later in the list. So, let’s look at a more practical example. Let’s say we want to download a large 2000 MB image. Here’s how we might go about it sequentially (prototype):

/** !! Does not compile !! **/ 




// Using reqwest crate 




fn main()-> reqwest::Result<()> { 
    let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; 
ranges.iter().for_each(|(start, end)| { 
        let client = reqwest::blocking::Client::new(); 
        let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); 
        std::fs::write(format!("file{}", a), response.bytes().unwrap()) 
}); 
// finally, join all files 
Ok(()) 
} 

This method sequentially downloads the file even though we divided the file into chunks. The first chunk gets downloaded first, the second and third, and so on. All this happens on a single thread. However, there are several downsides to this approach. Suppose the server only allows a maximum of 1MB per second for each thread. That would mean that this download would take roughly 2000 seconds on maximum internet connectivity. That is over 33 minutes!

Implementing parallel iterators

Executing operations in parallel typically improve the speed of a process. Returning to our previous example, each megabyte takes one second to download. Let’s assume the file is divided into eight equal chunks, each having 250MB such that each chunk has its thread.

That means that each thread takes about 250 seconds to download its chunk. Because the threads run simultaneously (run in parallel,) the total time is still 250 seconds — that’s about four minutes! A few seconds might be needed to merge the files to get the complete file.

To put it in prototype code:

/** !! Does not compile !! **/ 
use rayon::prelude::*; 
use reqwest::header::RANGE; 




// — Snip – // 
let ranges = &[(0, 249MB), (250MB, 499MB), (500MB, 749 MB), (750MB, 999MB), (1000MB,1249MB), (1250MB, 1499MB), (1500MB, 1749MB), (1750MB, 2000MB)]; 
ranges.par_iter().for_each(|(start, end)| { 
        let client = reqwest::blocking::Client::new(); 
        let response = client.get("random_video_url.mp4").header(RANGE, RangeHeader((start, end))).send().unwrap(); 
        std::fs::write(format!("file{}", a), response.bytes().unwrap()) 
}); 
// finally, join all files  

You may have noticed the use of the par_iter method. This method is provided by the rayon crate. Rayon allows programmers to easily convert various iterator methods provided by the standard library into iterators that execute in parallel. To convert your sequential iterator code snippet into a parallel one, first import the rayon crate with use rayon::prelude::*; and replace the into_iter or iter methods with the corresponding parallel version of into_par_iter or par_iter. Here are some examples:

use rayon::prelude::*; 
let names = vec![ ... ]; 
let is_present = names.par_iter().any(|name| name == "Alice"); 

// sequential 
fn sum_of_char_len(list: &[&str]) -> usize { 
    list.iter().map(|word| word.len()).sum() 
} 




// parallel 
fn par_sum_of_char_len(list: &[&str]) -> usize { 
    list.par_iter().map(|word| word.len()).sum() 
} 

Using Rayon’s par_bridge method

The par_iter and into_par_iter methods typically work on built-in collections such as lists and hashmaps. However, they do not work on custom iterator types. The method comes in handy for a custom iterator to execute in parallel. To illustrate this, here is a simple prime number generator:

struct PrimeGenerator { 
    start: usize, 
    end: usize, 
} 




fn is_prime(num: usize) -> bool { 
    if num <= 1 { 
        return false; 
    } 
    for i in 2..num { 
        if num % i == 0 { 
            return false; 
        } 
    } 
    true 
} 




impl Iterator for PrimeGenerator { 
    type Item = usize; 




    fn next(&mut self) -> Option<Self::Item> { 
        while !is_prime(self.start) { 
            self.start += 1; 
        } 
        if self.start > self.end { 
            None 
        } else { 
            let prime = self.start; 
            self.start += 1; 
            Some(prime) 
        } 
    } 
} 

Here, we created a simple struct and implemented the Iterator trait. The iterator yields a prime number within the specified range. To execute this iterator in parallel, we would have to use the par_bridge method, as shown below:

fn get_primes() { 
    let primes = PrimeGenerator { 
        start: 0, 
        end: 20, 
    }; 
    println!("ranges are {:?}", primes.par_bridge().collect::<Vec<_>>()) 
} 

Note: This prints a list of prime numbers in no particular order.

Because par_bridge produces an iterator, several iterator methods could be applied to it. These include, map, for_each, sum, and so on. You can view the entire list here.

Let’s visualize this below:

// sum prime numbers between a particular range 
let primes = PrimeGenerator { 
        start: 0, 
        end: 20, 
    }; 
let sum_of_primes: usize = primes.par_bridge().sum(); 

Exploring Rayon’s instances

Parallel iterators are a flexible way to introduce concurrency in your application without the hassle of sharing tasks among several threads. However, rayon also provides join, scope, and ThreadPoolBuilder instances that allow you to create custom tasks. Let’s dive into each one of them.

The join method

The join function allows programmers to potentially execute two closures in parallel. Rayon’s method of parallelization is based on a principle known as work stealing. The first closure is executed on the current thread, while the second closure is made available to be executed on another thread. If there is an idle thread, that thread will execute the closure.

However, when there are no idle threads, as the first closure returns, the second closure is executed on the current thread. Suppose the second closure is already being executed by another idle thread (stolen). In that case, the current thread waits for the thief (the idle thread currently executing the second closure) to complete. With that said, the closures passed into the join method should be CPU-bound tasks and not a blocking task like I/O. You can read more about it here. Here are some usages of join:

fn strings_to_num(slice: &[&str]) -> Vec<usize> { 
    slice.iter().map(|s| { 
        s.parse::<usize>().expect(&format!("{} is not a number", s)) 
    }).collect() 
} 




fn factorial(mut n: u128) -> u128 { 
    (1..=n).reduce(|multiple, next| multiple * next).unwrap() 
} 




fn test_join() { 
    let (a, b) = rayon::join(|| factorial(30), 
                            || strings_to_num(&["12", "100", "19887870", "56", "9098"]), 
    ); 
    println!("factorial of 1000 is {}", a); 
    println!("numbers are {:?}", b) 
} 

Note: Because the closures execute on different threads, a panic in one of the closures does not cancel the execution of the other closure.

The scope method

The scope function allows you to spawn an arbitrary number of tasks asynchronously. Similar to join, scope returns once all the tasks spawned within it. Let’s look at an example:

fn test_scope() { 
    let mut map: Option<HashMap<String, usize>> = None; 
    let mut factorial = 1; 
    let mut other = None; 
    rayon::scope(|s| { 
        s.spawn(|_s| { 
            let iter = 
                (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); 
            map = Some(HashMap::from_iter(iter)); 
        }); 
        s.spawn(|_s| { 
            factorial = factorial_sequential(30); 
        }); 
        s.spawn(|_s| { 
            other = Some("value") 
        }) 
    }); 




    println!("map {:?}", map.unwrap()); 
    println!("factorial {:?}", factorial); 
    println!("other {:?}", other); 
} 

There are a few points that you should note. First, unlike join, scope doesn’t return anything. However, it is possible to write values to local variables that outlive the scope. For example, the map variable defined above outlives the scope function. This allows scope to write to map. Also, the map variable can be read outside scope.

Secondly, following Rust borrow checker rules, multiple spawn closures modifying the same variable are not allowed. Additionally, there cannot be a mutable borrow in one closure and an immutable borrow in the other. Thus, the following does not compile:

// !! Does not compile !!
// — Snip — //
rayon::scope(|s: &Scope| {
        s.spawn(|_s| { 
            let iter = 
                (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); 
            map = Some(HashMap::from_iter(iter)); // first mutable usage occurs here 
        }); 
        s.spawn(|s: &Scope| { 
            factorial = factorial_sequential(30); 
            println! ("my map is {:?}", map) // reading of variable here produces compile error (immutable borrow) 
        }); 
        s.spawn(|s: &Scope| { 
            other = Some("value") 
        }) 
    }); 
// —- Snip —- // 

Working with non-Send and non-Sync types

In Rust, types that implement the Send and Sync traits are generally assumed to be thread-safe. That is, they can be shared amongst multiple threads without undefined behavior or data races. These traits are automatically implemented by the compiler if it is appropriate in that situation. Thus, in the examples above, the Send and Sync traits are automatically implemented for usize and &str.

The example above does not compile because one thread could be reading the map variable, and the other might be writing to it, potentially leading to data races. To fix that issue, the go-to method is to wrap the variable in Arc for custom types. You may want to add Mutex to make the atomic variable mutable. For primitive types, such as integers, it is best to use types provided by the std::sync::atomic. Now, let’s fix the above code so it compiles:

fn test_scope() { 
    let map: Arc<Mutex<Option<HashMap<String, usize>>>> = Arc::new(Mutex::new(None)); 
    let mut factorial = 1; 
    let mut other = None; 
    rayon::scope(|s: &Scope| { 
        s.spawn(|_s| { 
            let iter = 
                (0..10000).enumerate().map(|(a, b)| (format!("index {}", a), b)); 
            let mut map = map.lock().unwrap(); 
            *map = Some(HashMap::from_iter(iter)); //note the use of the "dereference" symbol * 
        }); 
        s.spawn(|_s| { 
            factorial = factorial_sequential(30); 
            println!("my map is  {:?}", map.lock().unwrap()) // "map" may or may not be None a 
        }); 
        s.spawn(|_s| { 
            other = Some("value") 
        }) 
    }); 
    println!("map {:?}", map.lock().unwrap()); 
   // —- Snip —- // 
} 

Conclusion

Although concurrent and asynchronous tasks are tricky to implement, they have the potential to enhance the performance of the programs you write. Fortunately, Rust removes the tricky part and makes writing asynchronous apps a breeze. Rayon comes as an add-on to further simplify this task. With its handy parallel iterator implementation, there is much more flexibility than you could imagine. The more you try your hands with asynchronous code problems, the better you become. Happy coding!

LogRocket: Full visibility into web frontends for Rust apps

Debugging Rust applications can be difficult, especially when users experience issues that are difficult to reproduce. If you’re interested in monitoring and tracking performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.

LogRocket is like a DVR for web and mobile apps, recording literally everything that happens on your Rust app. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.

Modernize how you debug your Rust apps — .

Kofi Otuo Medical student at the University of Ghana and a tech enthusiast. I enjoy programming all systems including Android, UNIX, and Windows. Currently specializing in Rust and blockchain development.

Leave a Reply