Skip to content

Commit

Permalink
refactor: use ExonArrayBuilder for bcf (#484)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck authored Apr 22, 2024
1 parent 0b50da3 commit b7af849
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 34 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow = { version = "51.0.0" }
async-trait = "0.1.80"
datafusion = { version = "37", features = ["compression", "parquet"] }
futures = "0.3"
noodles = { version = "0.69" }
noodles = { version = "0.70" }
object_store = { version = "0.9" }
tokio = { version = "1", features = ["io-util"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
Expand Down
8 changes: 5 additions & 3 deletions exon/exon-bcf/src/batch_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use exon_common::ExonArrayBuilder;

use arrow::{error::ArrowError, record_batch::RecordBatch};

use exon_vcf::VCFArrayBuilder;
Expand Down Expand Up @@ -50,7 +52,6 @@ where
reader,
config,
header: Arc::new(header),
// string_maps,
})
}

Expand Down Expand Up @@ -136,7 +137,7 @@ impl BatchAdapter {
let mut record_batch = VCFArrayBuilder::create(
self.config.file_schema.clone(),
self.config.batch_size,
self.config.projection.as_deref(),
self.config.projection.clone(),
self.header.clone(),
)?;

Expand All @@ -152,7 +153,8 @@ impl BatchAdapter {
return Ok(None);
}

let batch = RecordBatch::try_new(self.config.file_schema.clone(), record_batch.finish())?;
let schema = self.config.projected_schema()?;
let batch = record_batch.try_into_record_batch(schema)?;

match &self.config.projection {
Some(projection) => Ok(Some(batch.project(projection)?)),
Expand Down
14 changes: 14 additions & 0 deletions exon/exon-bcf/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ impl BCFConfig {
self.projection = projection;
self
}

/// Get the projection, returning the identity projection if none is set.
pub fn projection(&self) -> Vec<usize> {
self.projection
.clone()
.unwrap_or_else(|| (0..self.file_schema.fields().len()).collect())
}

/// Get the projected schema.
pub fn projected_schema(&self) -> arrow::error::Result<SchemaRef> {
let schema = self.file_schema.project(&self.projection())?;

Ok(Arc::new(schema))
}
}
10 changes: 10 additions & 0 deletions exon/exon-core/src/datasources/bcf/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ impl<T: ExonIndexedListingOptions + 'static> TableProvider for ListingBCFTable<T
.limit_option(limit)
.build();

if let Some(region) = self.config.options.regions().first() {
let plan = self
.config
.options
.create_physical_plan_with_regions(file_scan_config, vec![region.clone()])
.await?;

return Ok(plan);
}

let plan = self
.config
.options
Expand Down
41 changes: 40 additions & 1 deletion exon/exon-core/src/session_context/exon_context_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ mod tests {

use crate::{
datasources::{
bigwig, fasta::table_provider::ListingFASTATableOptions,
bcf::table_provider::ListingBCFTableOptions, bigwig,
fasta::table_provider::ListingFASTATableOptions,
fastq::table_provider::ListingFASTQTableOptions,
},
session_context::ExonSessionExt,
Expand Down Expand Up @@ -833,4 +834,42 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_bcf_file() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();

let bcf_path = exon_test::test_path("bcf", "index.bcf");

let df = ctx
.read_bcf(
bcf_path.to_str().unwrap(),
ListingBCFTableOptions::default(),
)
.await?;

assert_eq!(df.count().await?, 621);

Ok(())
}

#[tokio::test]
async fn test_bcf_file_with_region() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();

let bcf_path = exon_test::test_path("bcf", "index.bcf");

let region = "1".parse()?;

let df = ctx
.read_bcf(
bcf_path.to_str().unwrap(),
ListingBCFTableOptions::default().with_regions(vec![region]),
)
.await?;

assert_eq!(df.count().await?, 191);

Ok(())
}
}
32 changes: 15 additions & 17 deletions exon/exon-vcf/src/array_builder/eager_array_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
use std::sync::Arc;

use arrow::{
array::{
ArrayBuilder, ArrayRef, Float32Builder, GenericListBuilder, GenericStringBuilder,
Int64Builder,
},
array::{ArrayRef, Float32Builder, GenericListBuilder, GenericStringBuilder, Int64Builder},
datatypes::SchemaRef,
error::ArrowError,
};
use exon_common::ExonArrayBuilder;
use noodles::vcf::{
variant::record::{AlternateBases, Filters, Ids},
Header,
Expand All @@ -47,14 +45,16 @@ pub struct VCFArrayBuilder {
header: Arc<Header>,

projection: Vec<usize>,

n_rows: usize,
}

impl VCFArrayBuilder {
/// Creates a new `VCFArrayBuilder` from a `Schema`.
pub fn create(
schema: SchemaRef,
capacity: usize,
projection: Option<&[usize]>,
projection: Option<Vec<usize>>,
header: Arc<Header>,
) -> Result<Self, ArrowError> {
let info_field = schema.field_with_name("info")?;
Expand All @@ -66,6 +66,7 @@ impl VCFArrayBuilder {
};

Ok(Self {
n_rows: 0,
chromosomes: GenericStringBuilder::<i32>::new(),
positions: Int64Builder::new(),
ids: GenericListBuilder::<i32, GenericStringBuilder<i32>>::new(GenericStringBuilder::<
Expand All @@ -89,16 +90,6 @@ impl VCFArrayBuilder {
})
}

/// Returns the number of records in the builder.
pub fn len(&self) -> usize {
self.chromosomes.len()
}

/// Returns whether the builder is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Appends a record to the builder.
pub fn append<T>(&mut self, record: T) -> Result<(), ArrowError>
where
Expand Down Expand Up @@ -169,11 +160,14 @@ impl VCFArrayBuilder {
}
}

self.n_rows += 1;

Ok(())
}
}

/// Builds the `ArrayRef`.
pub fn finish(&mut self) -> Vec<ArrayRef> {
impl ExonArrayBuilder for VCFArrayBuilder {
fn finish(&mut self) -> Vec<ArrayRef> {
let mut arrays: Vec<ArrayRef> = vec![];

for col_idx in self.projection.iter() {
Expand All @@ -193,4 +187,8 @@ impl VCFArrayBuilder {

arrays
}

fn len(&self) -> usize {
self.n_rows
}
}

0 comments on commit b7af849

Please sign in to comment.