From 0ecc84c0e9a23e6b720367001cfc9c5edb92e4cf Mon Sep 17 00:00:00 2001 From: Ivan Ukhov Date: Tue, 20 Feb 2024 15:23:44 +0100 Subject: [PATCH] Rewrite via loop --- Cargo.toml | 3 +- README.md | 14 +++---- src/lib.rs | 115 ++++++++++++++++------------------------------------- 3 files changed, 41 insertions(+), 91 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a3834e..31c4360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "folder" -version = "0.5.1" +version = "0.6.0" edition = "2021" license = "Apache-2.0/MIT" authors = ["Ivan Ukhov "] @@ -12,4 +12,5 @@ categories = ["filesystem"] keywords = ["directory", "parallel", "recursive", "scan", "walk"] [dependencies] +loop = "0.1" walkdir = "2.3" diff --git a/README.md b/README.md index e082108..4e15b1a 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,11 @@ The package allows for scanning directories in parallel. ## Example ```rust -let results: Vec<_> = folder::scan( - "src", - |path| true, - |path, _| Ok(path.exists()), - (), - 1, -) -.collect(); -assert_eq!(format!("{results:?}"), r#"[("src/lib.rs", Ok(true))]"#); +use std::path::Path; + +let filter = |_: &Path| true; +let map = |path: &Path, _| Ok(path.exists()); +let (paths, results): (Vec<_>, Vec<_>) = folder::scan("src", filter, map, (), None).unzip(); ``` ## Contribution diff --git a/src/lib.rs b/src/lib.rs index 65659e7..65004e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,96 +1,48 @@ //! Scanning directories in parallel. +//! +//! # Arguments +//! +//! * `path` is the location to scan; +//! * `filter` is a function for choosing files, which is be invoked sequentially; +//! * `map` is a function for processing files, which is be invoked in parallel; +//! * `context` is an context passed to the processing function; and +//! * `workers` is the number of threads to use. +//! +//! # Examples +//! +//! ``` +//! use std::path::Path; +//! +//! let filter = |_: &Path| true; +//! let map = |path: &Path, _| Ok(path.exists()); +//! let (paths, results): (Vec<_>, Vec<_>) = folder::scan("src", filter, map, (), None).unzip(); +//! ``` use std::io::Result; use std::ops::Deref; -use std::path::{Path, PathBuf}; -use std::sync::mpsc; -use std::sync::{Arc, Mutex}; -use std::thread; +use std::path::PathBuf; use walkdir::WalkDir; -/// Process an iterator in parallel. -pub fn parallelize( - iterator: I, - map: M, - context: C, - workers: usize, -) -> impl Iterator)> + DoubleEndedIterator -where - I: Iterator, - M: Fn(&E, C) -> Result + Copy + Send + 'static, - E: Send + 'static, - C: Clone + Send + 'static, - V: Send + 'static, -{ - let (forward_sender, forward_receiver) = mpsc::channel::(); - let (backward_sender, backward_receiver) = mpsc::channel::<(E, Result)>(); - let forward_receiver = Arc::new(Mutex::new(forward_receiver)); - - let _: Vec<_> = (0..workers) - .map(|_| { - let forward_receiver = forward_receiver.clone(); - let backward_sender = backward_sender.clone(); - let context = context.clone(); - thread::spawn(move || loop { - let entry = match forward_receiver.lock().unwrap().recv() { - Ok(entry) => entry, - Err(_) => break, - }; - let result = map(&entry, context.clone()); - backward_sender.send((entry, result)).unwrap(); - }) - }) - .collect(); - let mut count = 0; - for entry in iterator { - forward_sender.send(entry).unwrap(); - count += 1; - } - (0..count).map(move |_| backward_receiver.recv().unwrap()) -} - /// Process a path in parallel. /// /// The function traverses files in a given path, selects those satisfying a criterion, and /// processes the chosen ones in parallel, returning the corresponding results. -/// -/// # Arguments -/// -/// * `path` is the location to scan; -/// * `filter` is a function for choosing files, which is be invoked sequentially; -/// * `map` is a function for processing files, which is be invoked in parallel; -/// * `context` is an context passed to the processing function; and -/// * `workers` is the number of threads to use. -/// -/// # Examples -/// -/// ``` -/// let results: Vec<_> = folder::scan( -/// "src", -/// |path| true, -/// |path, _| Ok(path.exists()), -/// (), -/// 1, -/// ) -/// .collect(); -/// assert_eq!(format!("{results:?}"), r#"[("src/lib.rs", Ok(true))]"#); -/// ``` -pub fn scan( - path: P, - filter: F, - map: M, - context: C, - workers: usize, -) -> impl Iterator)> + DoubleEndedIterator +pub fn scan( + path: Path, + filter: Filter, + map: Map, + context: Context, + workers: Option, +) -> impl DoubleEndedIterator)> where - P: AsRef, - F: Fn(&Path) -> bool + Copy, - M: Fn(&Path, C) -> Result + Copy + Send + 'static, - C: Clone + Send + 'static, - V: Send + 'static, + Path: AsRef, + Filter: Fn(&std::path::Path) -> bool + Copy, + Map: Fn(&std::path::Path, Context) -> Result + Copy + Send + 'static, + Context: Clone + Send + 'static, + Value: Send + 'static, { - parallelize( + r#loop::parallelize( WalkDir::new(path) .follow_links(true) .into_iter() @@ -110,6 +62,7 @@ mod tests { #[test] fn nonexistent() { - let _: Vec<_> = super::scan(Path::new("foo"), |_| true, |_, _| Ok(true), (), 1).collect(); + let _: Vec<_> = + super::scan(Path::new("foo"), |_| true, |_, _| Ok(true), (), None).collect(); } }