Skip to content

Commit

Permalink
feat(R): expose to_record_batch_reader (#498)
Browse files Browse the repository at this point in the history
Exposes `to_record_batch_reader` on the result object so its contents can be streamed to other arrow compatible tools.

#497
  • Loading branch information
tshauck authored May 3, 2024
1 parent 62eeea1 commit 8b5b5f7
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 35 deletions.
48 changes: 24 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow = { version = "51.0.0" }
async-trait = "0.1.80"
datafusion = { version = "37", features = ["compression", "parquet"] }
futures = "0.3"
noodles = { version = "0.70" }
noodles = { version = "0.71" }
object_store = { version = "0.9" }
tokio = { version = "1", features = ["io-util"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
Expand Down
12 changes: 9 additions & 3 deletions exon-r/exonr/R/main.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,20 @@ ExonDataFrame <- R6Class("ExonDataFrame",
#'
#' @return An Arrow table.
to_arrow = function() {
record_batch_stream <- self$to_record_batch_reader()

arrow::arrow_table(record_batch_stream)
},
#' @description Convert the ExonDataFrame a stream of record batches.
#'
#' @return A stream of record batches.
to_record_batch_reader = function() {
stream <- nanoarrow::nanoarrow_allocate_array_stream()
pointer_addr <- nanoarrow::nanoarrow_pointer_addr_chr(stream)

private$data_frame$to_arrow(pointer_addr)

record_batch_stream <- arrow::RecordBatchStreamReader$import_from_c(pointer_addr)

arrow::arrow_table(record_batch_stream)
arrow::RecordBatchStreamReader$import_from_c(pointer_addr)
}
),
private = list(
Expand Down
14 changes: 14 additions & 0 deletions exon-r/exonr/man/ExonDataFrame.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion exon-r/exonr/tests/testthat/test_read.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ test_that("querying an exon session works", {
session$execute("CREATE EXTERNAL TABLE gene_annotations STORED AS GFF LOCATION '../../../../exon/exon-core/test-data/datasources/gff/test.gff'")

rdf <- session$sql("SELECT seqname, source, type, start, \"end\", score, strand, phase FROM gene_annotations")
batch_reader <- rdf$to_arrow()
batch_reader <- rdf$to_record_batch_reader()

con <- dbConnect(duckdb::duckdb())

Expand All @@ -153,6 +153,29 @@ test_that("querying an exon session works", {

expect_equal(colnames(df), c("seqname", "source", "type", "start", "end", "score", "strand", "phase"))
expect_equal(nrow(df), 5000)
})

test_that("querying an exon session works", {
skip_if_not(requireNamespace("duckdb", quietly = F))

library(duckdb)

session <- ExonRSessionContext$new()
session$execute("CREATE EXTERNAL TABLE gene_annotations STORED AS GFF LOCATION '../../../../exon/exon-core/test-data/datasources/gff/test.gff'")

rdf <- session$sql("SELECT seqname, source, type, start, \"end\", score, strand, phase FROM gene_annotations")
table <- rdf$to_arrow()

con <- dbConnect(duckdb::duckdb())

arrow::to_duckdb(table, table_name = "gene_annotations", con = con)

result <- dbGetQuery(con, "SELECT * FROM gene_annotations")

df <- as.data.frame(result)

expect_equal(colnames(df), c("seqname", "source", "type", "start", "end", "score", "strand", "phase"))
expect_equal(nrow(df), 5000)

result <- dbGetQuery(con, "SELECT * FROM gene_annotations")

Expand Down
2 changes: 1 addition & 1 deletion exon/exon-bam/src/indexed_async_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ where

async fn read_record(&mut self, record: &mut RecordBuf) -> std::io::Result<Option<()>> {
if let Some(max_bytes) = self.max_bytes {
if self.reader.virtual_position().uncompressed() >= max_bytes {
if self.reader.get_ref().virtual_position().uncompressed() >= max_bytes {
return Ok(None);
}
}
Expand Down
2 changes: 1 addition & 1 deletion exon/exon-core/src/datasources/bam/indexed_file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl FileOpener for IndexedBAMOpener {
let mut first_bam_reader = noodles::bam::AsyncReader::new(stream_reader);

let header = first_bam_reader.read_header().await?;
let header_offset = first_bam_reader.virtual_position();
let header_offset = first_bam_reader.get_ref().virtual_position();

let offsets = if let Some(ref ext) = file_meta.extensions {
ext.downcast_ref::<BGZFIndexedOffsets>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl FileOpener for IndexedVCFOpener {
// We save this header for later to pass to the batch reader for record deserialization.
let header = vcf_reader.read_header().await?;

let header_offset = vcf_reader.virtual_position();
let header_offset = vcf_reader.get_ref().virtual_position();

let batch_stream = match file_meta.extensions {
Some(ref ext) => {
Expand Down Expand Up @@ -193,8 +193,8 @@ impl FileOpener for IndexedVCFOpener {
let mut async_reader = AsyncBGZFReader::from_reader(stream_reader);

// If we're at the start of the file, we need to seek to the header offset.
if vcf_reader.virtual_position().compressed() == 0
&& vcf_reader.virtual_position().uncompressed() == 0
if vcf_reader.get_ref().virtual_position().compressed() == 0
&& vcf_reader.get_ref().virtual_position().uncompressed() == 0
{
tracing::debug!("Seeking to header offset: {:?}", header_offset);
async_reader.scan_to_virtual_position(header_offset).await?;
Expand Down
2 changes: 1 addition & 1 deletion exon/exon-vcf/src/indexed_async_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
}

async fn read_record(&mut self) -> std::io::Result<Option<noodles::vcf::Record>> {
if self.reader.virtual_position().uncompressed() as usize >= self.max_bytes {
if self.reader.get_ref().virtual_position().uncompressed() as usize >= self.max_bytes {
return Ok(None);
}

Expand Down

0 comments on commit 8b5b5f7

Please sign in to comment.