Skip to content

Commit

Permalink
use FileScanConfig API
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 27, 2024
1 parent d83adb2 commit 8d5237e
Showing 1 changed file with 15 additions and 27 deletions.
42 changes: 15 additions & 27 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics,
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -242,41 +242,29 @@ impl TableProvider for IndexTableProvider {
// will not be returned.
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);

// Transform to the format needed to pass to ParquetExec
// Create one file group per file (default to scanning them all in parallel)
let file_groups = files
.into_iter()
.map(|(file_name, file_size)| {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
Ok(vec![PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
)])
})
.collect::<Result<Vec<_>>>()?;

// for now, simply use ParquetExec
// TODO make a builder for FileScanConfig
let object_store_url = ObjectStoreUrl::parse("file://")?;
let base_config = FileScanConfig {
object_store_url,
file_schema: self.schema(),
file_groups,
statistics: Statistics::new_unknown(self.index.schema()),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: vec![],
};
for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
}

let metadata_size_hint = None;

let table_parquet_options = TableParquetOptions::default();

// TODO make a builder for parquet exec
let exec = ParquetExec::new(
base_config,
file_scan_config,
Some(predicate),
metadata_size_hint,
table_parquet_options,
Expand Down

0 comments on commit 8d5237e

Please sign in to comment.