TL;DR: I published a parallel processing library for Rust that works differently than rayon and is especially useful in cases where rayon falls a bit short. I'll talk a little bit about it, and show you when it can be helpful.

rayon is a well known and regarded library for Rust:

Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel one.

In a nutshell, rayon is a "change your .iter() into .par_iter() and see your data being processed on multiple threads and enjoy your program getting faster.

As much as rayon is usually very useful, quite often I have been hitting one of its shortcomings: it is designed to use a collection as a starting point of parallelism. If you look at documentation for its IntoParallelIterator the types that support turning into parallel iterators are pretty much only collections. But what do you do when you have an iterator and you would like to start processing it using multiple threads?

Well, you could always .collect() it into a collection and then process using rayon. But very often that defeats the whole purpose!

Let's say we have a piece of code that iterates over the file system using walkdir and making some network requests for some of them:

    let results: Vec<_> = WalkDir::new(".")
        .into_iter()
        .map(|entry| entry.unwrap()) // for brevity
        .filter(|entry| should_make_a_request(&entry))
        .map(|entry| make_a_request(&entry))
        .collect();

It quickly becomes apparent that if make_a_request takes any non-trivial time, making requests in parallel could provide a significant speedup. But if we were to collect() all the results returned by WalkDir and should_make_a_request filter we might also be leaving performance on a table because sending requests won't start until WalkDir is completely finished and all the results are sitting neatly in a collection.

What we would ideally want is calling make_a_request on multiple threads while WalkDir is still scanning the filesystem and looking for the next results!

I have been hitting the variations of this problem in many projects. Usually, I already have a long-ish sequence of chained operators and that is working correctly and I move from "make it work, make it right" to the final "make it fast" and I want to either parallelize processing in one or more steps of such iterator or introduce pipelining between steps while preserving all the other important aspects like ordering of elements.

That's why I have finally written and published dpc-pariter. The above code can be easily converted to:

    let results: Vec<_> = WalkDir::new(".")
        .into_iter()
        .map(|entry| entry.unwrap()) // for brevity
        .filter(|entry| should_make_a_request(&entry))
        .map_parallel(|entry| make_a_request(&entry))
        .collect();

and job done. make_a_requests will be called in parallel from multiple threads as walkdir returns results. The default number of threads is equal to the number of cpus present, which is not best for an operation that is not CPU-bound, but that can be easily adjusted with:

    let results: Vec<_> = WalkDir::new(".")
        .into_iter()
        .map(|entry| entry.unwrap()) // for brevity
        .filter(|entry| should_make_a_request(&entry))
        .map_parallel(|entry| make_a_request(&entry))
        .threads(16)
        .collect();

Under the hood, dpc-pariter is using channels and a thread pool.

Parallel execution of an iteration step can be considered a way to multi-parallelize "horizontally", but quite often there's a need to parallelize iterator "vertically" (aka pipeline it).

If we have an iterator chain that looks somewhat like this:

some_iter
        .filter(step_a)
        .map(step_b)
        .map(step_c)
        .filter(step_d)

and we would like step_c and step_d to process an iterator item, while step_a and step_b are being executed in parallel already for the next element from some_iter it's as simple as:

some_iter
        .filter(step_a)
        .map(step_b)
        .readahead(0)
        .map(step_c)
        .filter(step_d)

Under the hood the some_iter.filter(step_a).map(step_b) part will be moved and executed on a separate newly spawned thread and the results fed to the rest of the iterator through a 0 sized channel,

Combined, pipelining and parallel processing should make optimizing the performance of iterator chains a breeze, and hopefully, I'll even get to go back to some of my past projects and refactor code that had to implement some of these optimizations through way less elegant means.

If you're interested, head to dpc-pariter and give it a shot. Feeback and (especially) PRs always welcome!

#rust #programming