Skip to content

Commit

Permalink
feat: add read_cram and read_fcs methods (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck authored Apr 23, 2024
1 parent cc03c8d commit 1ba2780
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 29 deletions.
6 changes: 6 additions & 0 deletions exon/exon-core/src/datasources/cram/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ impl ListingCRAMTableOptions {
self
}

/// Set the region filter for the table.
pub fn with_region(mut self, region: Option<Region>) -> Self {
self.region = region;
self
}

/// Infer the schema from the file.
async fn infer_schema_from_object_meta(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl ExonListingTableFactory {

let table_schema = options.infer_schema(state, &table_path).await?;

let config = ListingFCSTableConfig::new(table_path).with_options(options);
let config = ListingFCSTableConfig::new(table_path, options);
let table = ListingFCSTable::try_new(config, table_schema)?;

Ok(Arc::new(table))
Expand Down
41 changes: 16 additions & 25 deletions exon/exon-core/src/datasources/fcs/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,15 @@ use super::scanner::FCSScan;
pub struct ListingFCSTableConfig {
inner: ListingTableConfig,

options: Option<ListingFCSTableOptions>,
options: ListingFCSTableOptions,
}

impl ListingFCSTableConfig {
/// Create a new VCF listing table configuration
pub fn new(table_path: ListingTableUrl) -> Self {
pub fn new(table_path: ListingTableUrl, options: ListingFCSTableOptions) -> Self {
Self {
inner: ListingTableConfig::new(table_path),
options: None,
}
}

/// Set the options for the VCF listing table
pub fn with_options(self, options: ListingFCSTableOptions) -> Self {
Self {
options: Some(options),
..self
options,
}
}
}
Expand Down Expand Up @@ -174,22 +166,17 @@ impl ListingFCSTableOptions {
#[derive(Debug, Clone)]
/// A FCS listing table
pub struct ListingFCSTable {
table_paths: Vec<ListingTableUrl>,

table_schema: TableSchema,

options: ListingFCSTableOptions,
config: ListingFCSTableConfig,
}

impl ListingFCSTable {
/// Create a new FCS listing table
pub fn try_new(config: ListingFCSTableConfig, table_schema: TableSchema) -> Result<Self> {
Ok(Self {
table_paths: config.inner.table_paths,
table_schema,
options: config
.options
.ok_or_else(|| DataFusionError::Internal(String::from("Options must be set")))?,
config,
})
}
}
Expand All @@ -214,7 +201,7 @@ impl TableProvider for ListingFCSTable {
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters
.iter()
.map(|f| filter_matches_partition_cols(f, &self.options.table_partition_cols))
.map(|f| filter_matches_partition_cols(f, &self.config.options.table_partition_cols))
.collect())
}

Expand All @@ -225,7 +212,7 @@ impl TableProvider for ListingFCSTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let object_store_url = if let Some(url) = self.table_paths.first() {
let object_store_url = if let Some(url) = self.config.inner.table_paths.first() {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
Expand All @@ -236,10 +223,10 @@ impl TableProvider for ListingFCSTable {
let file_list = pruned_partition_list(
state,
&object_store,
&self.table_paths[0],
&self.config.inner.table_paths[0],
filters,
self.options.file_extension.as_str(),
&self.options.table_partition_cols,
self.config.options.file_extension.as_str(),
&self.config.options.table_partition_cols,
)
.await?
.try_collect::<Vec<_>>()
Expand All @@ -249,11 +236,15 @@ impl TableProvider for ListingFCSTable {
let file_scan_config =
FileScanConfigBuilder::new(object_store_url.clone(), file_schema, vec![file_list])
.projection_option(projection.cloned())
.table_partition_cols(self.options.table_partition_cols.clone())
.table_partition_cols(self.config.options.table_partition_cols.clone())
.limit_option(limit)
.build();

let plan = self.options.create_physical_plan(file_scan_config).await?;
let plan = self
.config
.options
.create_physical_plan(file_scan_config)
.await?;

Ok(plan)
}
Expand Down
7 changes: 4 additions & 3 deletions exon/exon-core/src/datasources/fcs/udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ impl TableFunctionImpl for FCSScanFunction {
Ok::<TableSchema, datafusion::error::DataFusionError>(schema)
})?;

let listing_table_config =
ListingFCSTableConfig::new(listing_scan_function.listing_table_url)
.with_options(listing_table_options);
let listing_table_config = ListingFCSTableConfig::new(
listing_scan_function.listing_table_url,
listing_table_options,
);

let listing_table = ListingFCSTable::try_new(listing_table_config, schema)?;

Expand Down
124 changes: 124 additions & 0 deletions exon/exon-core/src/session_context/exon_context_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
bcf::table_provider::{ListingBCFTable, ListingBCFTableOptions},
bed::table_provider::{ListingBEDTable, ListingBEDTableOptions},
bigwig,
cram::table_provider::{ListingCRAMTable, ListingCRAMTableConfig, ListingCRAMTableOptions},
exon_listing_table_options::ExonListingConfig,
genbank::table_provider::{ListingGenbankTable, ListingGenbankTableOptions},
gff::table_provider::{ListingGFFTable, ListingGFFTableOptions},
Expand Down Expand Up @@ -319,6 +320,21 @@ pub trait ExonSessionExt {
options: ListingFASTQTableOptions,
) -> Result<DataFrame, ExonError>;

/// Read a CRAM file.
async fn read_cram(
&self,
table_path: &str,
options: ListingCRAMTableOptions,
) -> Result<DataFrame, ExonError>;

/// Read an FCS file.
#[cfg(feature = "fcs")]
async fn read_fcs(
&self,
table_path: &str,
options: crate::datasources::fcs::table_provider::ListingFCSTableOptions,
) -> Result<DataFrame, ExonError>;

/// Read a GENBANK file.
#[cfg(feature = "genbank")]
async fn read_genbank(
Expand Down Expand Up @@ -362,6 +378,47 @@ impl ExonSessionExt for SessionContext {
Ok(table)
}

#[cfg(feature = "fcs")]
async fn read_fcs(
&self,
table_path: &str,
options: crate::datasources::fcs::table_provider::ListingFCSTableOptions,
) -> Result<DataFrame, ExonError> {
use crate::datasources::fcs::table_provider::ListingFCSTableConfig;

let table_path = ListingTableUrl::parse(table_path)?;

let table_schema = options.infer_schema(&self.state(), &table_path).await?;

let config = ListingFCSTableConfig::new(table_path, options);
let table = crate::datasources::fcs::table_provider::ListingFCSTable::try_new(
config,
table_schema,
)?;

let table = self.read_table(Arc::new(table))?;

Ok(table)
}

async fn read_cram(
&self,
table_path: &str,
options: ListingCRAMTableOptions,
) -> crate::Result<DataFrame> {
let table_path = ListingTableUrl::parse(table_path)?;

let table_schema = options.infer_schema(&self.state(), &table_path).await?;

// TODO: refactor this to use the new config setup
let config = ListingCRAMTableConfig::new(table_path, options);

let table = ListingCRAMTable::try_new(config, table_schema)?;
let table = self.read_table(Arc::new(table))?;

Ok(table)
}

async fn read_gtf(
&self,
table_path: &str,
Expand Down Expand Up @@ -664,6 +721,7 @@ mod tests {
use crate::{
datasources::{
bcf::table_provider::ListingBCFTableOptions, bigwig,
cram::table_provider::ListingCRAMTableOptions,
fasta::table_provider::ListingFASTATableOptions,
fastq::table_provider::ListingFASTQTableOptions,
},
Expand Down Expand Up @@ -817,6 +875,72 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_cram_file() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();

let cram_path = exon_test::test_path("cram", "test_input_1_a.cram");

let df = ctx
.read_cram(
cram_path.to_str().unwrap(),
ListingCRAMTableOptions::default(),
)
.await?;

assert_eq!(df.count().await?, 15);

let cram_path = exon_test::test_path("two-cram", "twolib.sorted.cram");
let fasta_reference = exon_test::test_path("two-cram", "rand1k.fa");

let df = ctx
.read_cram(
cram_path.to_str().unwrap(),
ListingCRAMTableOptions::default()
.with_fasta_reference(fasta_reference.to_str().map(|s| s.to_string())),
)
.await?;

assert_eq!(df.count().await?, 4);

let region = "1".parse()?;

let df = ctx
.read_cram(
cram_path.to_str().unwrap(),
ListingCRAMTableOptions::default()
.with_fasta_reference(fasta_reference.to_str().map(|s| s.to_string()))
.with_indexed(true)
.with_region(Some(region)),
)
.await?;

assert_eq!(df.count().await?, 0);

Ok(())
}

#[cfg(feature = "fcs")]
#[tokio::test]
async fn test_read_fcs() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();

let fcs_path = exon_test::test_path("fcs", "Guava Muse.fcs");

let df = ctx
.read_fcs(
fcs_path.to_str().unwrap(),
crate::datasources::fcs::table_provider::ListingFCSTableOptions::new(
FileCompressionType::UNCOMPRESSED,
),
)
.await?;

assert_eq!(df.count().await?, 108);

Ok(())
}

#[tokio::test]
async fn test_read_fasta_gzip() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();
Expand Down

0 comments on commit 1ba2780

Please sign in to comment.