Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add runtime dataset fetch and parse in-place #186

Merged
merged 14 commits into from
Feb 9, 2022
5 changes: 0 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
/target
/Cargo.lock

# This is generated by the benchmarks crate build script, do not version with git.
/benchmarks/benches/datasets_paths.rs
/benchmarks/target
/benchmarks/Cargo.lock
3 changes: 3 additions & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/target
/Cargo.lock
/real-roaring-datasets
11 changes: 4 additions & 7 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ publish = false
roaring = { path = ".." }

[dev-dependencies]
once_cell = "1.9.0"
git2 = "0.13.25"
zip = "0.5.13"
indicatif = "0.16.2"
criterion = { version = "0.3", features = ["html_reports"] }
quickcheck = "0.9"
quickcheck_macros = "0.9"

[build-dependencies]
anyhow = "1.0"
bytes = "1.0"
convert_case = "0.4"
reqwest = { version = "0.11.3", features = ["blocking", "rustls-tls"], default-features = false }
zip = "0.5.12"

[features]
simd = ["roaring/simd"]

Expand Down
186 changes: 186 additions & 0 deletions benchmarks/benches/datasets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use std::env;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};

use git2::FetchOptions;
use once_cell::sync::OnceCell as SyncOnceCell;

use roaring::RoaringBitmap;

static INSTANCE: SyncOnceCell<Vec<Dataset>> = SyncOnceCell::new();

pub struct Datasets;

pub struct DatasetsIter {
iter: std::slice::Iter<'static, Dataset>,
}

impl Iterator for DatasetsIter {
type Item = &'static Dataset;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

impl IntoIterator for Datasets {
type Item = &'static Dataset;
type IntoIter = DatasetsIter;

fn into_iter(self) -> Self::IntoIter {
DatasetsIter {
iter: INSTANCE
.get_or_init(|| {
init_datasets().and_then(parse_datasets).expect("a collection of datasets")
})
.iter(),
}
}
}

pub struct Dataset {
pub name: String,
pub bitmaps: Vec<RoaringBitmap>,
}

fn init_datasets() -> Result<PathBuf, Box<dyn std::error::Error>> {
let out_dir = env::var_os("CARGO_MANIFEST_DIR").ok_or(env::VarError::NotPresent)?;

let out_path = Path::new(&out_dir);
let repo_path = out_path.join("real-roaring-datasets");

// Setup progress callbacks

let pb_cell = once_cell::unsync::OnceCell::new();
let mut cb = git2::RemoteCallbacks::new();

cb.transfer_progress(|progress| {
let pb = pb_cell.get_or_init(|| {
indicatif::ProgressBar::new(progress.total_objects() as u64)
.with_style(
indicatif::ProgressStyle::default_bar()
.template(&format!(
"{{prefix}}{{msg:.cyan/blue}} [{{bar}}] {{pos}}/{}",
progress.total_objects()
))
.progress_chars("#> "),
)
.with_prefix(" ")
.with_message("Recieving objects")
});

pb.set_position((progress.local_objects() + progress.received_objects()) as u64);
true
});

let mut fetch_opts = FetchOptions::new();
fetch_opts.remote_callbacks(cb);

// Do update

if !Path::new(&repo_path).exists() {
git2::build::RepoBuilder::new()
.fetch_options(fetch_opts)
.clone("git://github.com/RoaringBitmap/real-roaring-datasets.git", &repo_path)?;
} else {
let repo = git2::Repository::open(&repo_path)?;
repo.find_remote("origin")?.fetch(&["master"], Some(&mut fetch_opts), None)?;

let head = repo.head()?.peel_to_commit()?;
let origin_master_head = repo
.find_branch("origin/master", git2::BranchType::Remote)?
.into_reference()
.peel_to_commit()?;

if head.id() != origin_master_head.id() {
repo.reset(origin_master_head.as_object(), git2::ResetType::Hard, None)?;
}
}

if let Some(pb) = pb_cell.get() {
pb.finish()
}

Ok(repo_path)
}

fn parse_datasets<P: AsRef<Path>>(path: P) -> Result<Vec<Dataset>, Box<dyn std::error::Error>> {
const DATASET_FILENAME_WHITELIST: &[&str] = &[
"census-income.zip",
"census-income_srt.zip",
"census1881.zip",
"census1881_srt.zip",
"weather_sept_85.zip",
"weather_sept_85_srt.zip",
"wikileaks-noquotes.zip",
"wikileaks-noquotes_srt.zip",
];

use indicatif::{ProgressBar, ProgressStyle};
use std::io::BufRead;
use zip::ZipArchive;

let dir = path.as_ref().read_dir()?;

let mut datasets = Vec::new();

// Future work: Reuse this buffer to parse croaring bitmaps for comparison
let mut numbers = Vec::new();

for dir_entry_result in dir {
let dir_entry = dir_entry_result?;
let metadata = dir_entry.metadata()?;
let file_name = dir_entry.file_name();
// TODO dont panic
let file_name_str = file_name.to_str().expect("utf-8 filename");

if metadata.is_file() && DATASET_FILENAME_WHITELIST.contains(&file_name_str) {
let file = File::open(dir_entry.path())?;
let name = file_name_str.split_at(file_name_str.len() - ".zip".len()).0.to_string();

let mut zip = ZipArchive::new(file)?;

let mut total_size = 0;
for i in 0..zip.len() {
let file = zip.by_index(i)?;
total_size += file.size();
}

let pb = ProgressBar::new(total_size)
.with_style(
ProgressStyle::default_bar()
.template(" {prefix:.green} [{bar}] {msg}")
.progress_chars("#> "),
)
.with_prefix("Parsing")
.with_message(name.clone());

let mut bitmaps = Vec::with_capacity(zip.len());
for i in 0..zip.len() {
let file = zip.by_index(i)?;
let size = file.size();
let buf = BufReader::new(file);

for bytes in buf.split(b',') {
let bytes = bytes?;
let str = String::from_utf8(bytes)?;
let n = str.trim().parse::<u32>()?;
numbers.push(n);
}

let bitmap = RoaringBitmap::from_sorted_iter(numbers.iter().copied())?;
numbers.clear();
bitmaps.push(bitmap);

pb.set_position(pb.position() + size);
}

pb.finish();
datasets.push(Dataset { name, bitmaps });
}
}
datasets.sort_unstable_by(|a, b| a.name.cmp(&b.name));
println!();
Ok(datasets)
}
Loading