Skip to content

Commit

Permalink
Rewrite via loop
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanUkhov committed Feb 20, 2024
1 parent 371b55f commit 0ecc84c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 91 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "folder"
version = "0.5.1"
version = "0.6.0"
edition = "2021"
license = "Apache-2.0/MIT"
authors = ["Ivan Ukhov <[email protected]>"]
Expand All @@ -12,4 +12,5 @@ categories = ["filesystem"]
keywords = ["directory", "parallel", "recursive", "scan", "walk"]

[dependencies]
loop = "0.1"
walkdir = "2.3"
14 changes: 5 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ The package allows for scanning directories in parallel.
## Example

```rust
let results: Vec<_> = folder::scan(
"src",
|path| true,
|path, _| Ok(path.exists()),
(),
1,
)
.collect();
assert_eq!(format!("{results:?}"), r#"[("src/lib.rs", Ok(true))]"#);
use std::path::Path;

let filter = |_: &Path| true;
let map = |path: &Path, _| Ok(path.exists());
let (paths, results): (Vec<_>, Vec<_>) = folder::scan("src", filter, map, (), None).unzip();
```

## Contribution
Expand Down
115 changes: 34 additions & 81 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,48 @@
//! Scanning directories in parallel.
//!
//! # Arguments
//!
//! * `path` is the location to scan;
//! * `filter` is a function for choosing files, which is be invoked sequentially;
//! * `map` is a function for processing files, which is be invoked in parallel;
//! * `context` is an context passed to the processing function; and
//! * `workers` is the number of threads to use.
//!
//! # Examples
//!
//! ```
//! use std::path::Path;
//!
//! let filter = |_: &Path| true;
//! let map = |path: &Path, _| Ok(path.exists());
//! let (paths, results): (Vec<_>, Vec<_>) = folder::scan("src", filter, map, (), None).unzip();
//! ```
use std::io::Result;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::path::PathBuf;

use walkdir::WalkDir;

/// Process an iterator in parallel.
pub fn parallelize<I, M, E, C, V>(
iterator: I,
map: M,
context: C,
workers: usize,
) -> impl Iterator<Item = (E, Result<V>)> + DoubleEndedIterator
where
I: Iterator<Item = E>,
M: Fn(&E, C) -> Result<V> + Copy + Send + 'static,
E: Send + 'static,
C: Clone + Send + 'static,
V: Send + 'static,
{
let (forward_sender, forward_receiver) = mpsc::channel::<E>();
let (backward_sender, backward_receiver) = mpsc::channel::<(E, Result<V>)>();
let forward_receiver = Arc::new(Mutex::new(forward_receiver));

let _: Vec<_> = (0..workers)
.map(|_| {
let forward_receiver = forward_receiver.clone();
let backward_sender = backward_sender.clone();
let context = context.clone();
thread::spawn(move || loop {
let entry = match forward_receiver.lock().unwrap().recv() {
Ok(entry) => entry,
Err(_) => break,
};
let result = map(&entry, context.clone());
backward_sender.send((entry, result)).unwrap();
})
})
.collect();
let mut count = 0;
for entry in iterator {
forward_sender.send(entry).unwrap();
count += 1;
}
(0..count).map(move |_| backward_receiver.recv().unwrap())
}

/// Process a path in parallel.
///
/// The function traverses files in a given path, selects those satisfying a criterion, and
/// processes the chosen ones in parallel, returning the corresponding results.
///
/// # Arguments
///
/// * `path` is the location to scan;
/// * `filter` is a function for choosing files, which is be invoked sequentially;
/// * `map` is a function for processing files, which is be invoked in parallel;
/// * `context` is an context passed to the processing function; and
/// * `workers` is the number of threads to use.
///
/// # Examples
///
/// ```
/// let results: Vec<_> = folder::scan(
/// "src",
/// |path| true,
/// |path, _| Ok(path.exists()),
/// (),
/// 1,
/// )
/// .collect();
/// assert_eq!(format!("{results:?}"), r#"[("src/lib.rs", Ok(true))]"#);
/// ```
pub fn scan<P, F, M, C, V>(
path: P,
filter: F,
map: M,
context: C,
workers: usize,
) -> impl Iterator<Item = (PathBuf, Result<V>)> + DoubleEndedIterator
pub fn scan<Path, Filter, Map, Context, Value>(
path: Path,
filter: Filter,
map: Map,
context: Context,
workers: Option<usize>,
) -> impl DoubleEndedIterator<Item = (PathBuf, Result<Value>)>
where
P: AsRef<Path>,
F: Fn(&Path) -> bool + Copy,
M: Fn(&Path, C) -> Result<V> + Copy + Send + 'static,
C: Clone + Send + 'static,
V: Send + 'static,
Path: AsRef<std::path::Path>,
Filter: Fn(&std::path::Path) -> bool + Copy,
Map: Fn(&std::path::Path, Context) -> Result<Value> + Copy + Send + 'static,
Context: Clone + Send + 'static,
Value: Send + 'static,
{
parallelize(
r#loop::parallelize(
WalkDir::new(path)
.follow_links(true)
.into_iter()
Expand All @@ -110,6 +62,7 @@ mod tests {

#[test]
fn nonexistent() {
let _: Vec<_> = super::scan(Path::new("foo"), |_| true, |_, _| Ok(true), (), 1).collect();
let _: Vec<_> =
super::scan(Path::new("foo"), |_| true, |_, _| Ok(true), (), None).collect();
}
}

0 comments on commit 0ecc84c

Please sign in to comment.