Skip to content

Commit

Permalink
feat(exome): add exome options
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck committed Nov 25, 2023
1 parent 412b264 commit 2b7ed47
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 63 deletions.
2 changes: 1 addition & 1 deletion exon/exon-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn new_exon_config() -> SessionConfig {
}

extensions_options! {
/// My own config options.
/// Exon config options.
pub struct ExonConfigExtension {
pub vcf_parse_info: bool, default = true
pub vcf_parse_formats: bool, default = true
Expand Down
11 changes: 11 additions & 0 deletions exon/exon-exome/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ impl From<tonic::transport::Error> for ExomeError {
}
}

impl From<ExomeError> for DataFusionError {
fn from(error: ExomeError) -> Self {
match error {
ExomeError::DataFusionError(e) => e,
ExomeError::Execution(e) => DataFusionError::Execution(e),
ExomeError::TonicError(e) => DataFusionError::Execution(e.to_string()),
ExomeError::EnvironmentError(e) => DataFusionError::Execution(e),
}
}
}

impl std::fmt::Display for ExomeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
42 changes: 16 additions & 26 deletions exon/exon-exome/src/exome/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type CatalogServiceClient =
/// ExomeCatalogClient is a client for interacting with the Exome Catalog service.
#[derive(Clone)]
pub struct ExomeCatalogClient {
pub(crate) organization_name: String,
pub(crate) catalog_service_client: CatalogServiceClient,
pub(crate) token: String,
}
Expand All @@ -43,17 +42,12 @@ impl ExomeCatalogClient {
/// # Arguments
///
/// * `url` - The URL of the Exome Catalog service.
/// * `organization_name` - The organization name associated with the client.
/// * `token` - The token to use for authentication.
///
/// # Returns
///
/// An instance of ExomeCatalogClient on success, or a boxed error on failure.
pub async fn connect_with_tls(
url: String,
organization_name: String,
token: String,
) -> Result<Self, ExomeError> {
pub async fn connect_with_tls(url: String, token: String) -> Result<Self, ExomeError> {
let tls = tonic::transport::ClientTlsConfig::new();

let channel = tonic::transport::Channel::from_shared(url)
Expand All @@ -78,7 +72,6 @@ impl ExomeCatalogClient {
let _ = health_check_client.check(health_check_request).await?;

let s = Self {
organization_name,
catalog_service_client,
token,
};
Expand All @@ -92,17 +85,12 @@ impl ExomeCatalogClient {
/// # Arguments
///
/// * `url` - The URL of the Exome Catalog service.
/// * `organization_name` - The organization ID associated with the client.
/// * `token` - The token to use for authentication.
///
/// # Returns
///
/// An instance of ExomeCatalogClient on success, or a boxed error on failure.
pub async fn connect(
url: String,
organization_name: String,
token: String,
) -> Result<Self, ExomeError> {
pub async fn connect(url: String, token: String) -> Result<Self, ExomeError> {
let channel = tonic::transport::Channel::from_shared(url)
.map_err(|e| {
tonic::Status::new(
Expand All @@ -117,7 +105,6 @@ impl ExomeCatalogClient {
proto::catalog_service_client::CatalogServiceClient::new(channel);

let s = Self {
organization_name,
catalog_service_client,
token,
};
Expand Down Expand Up @@ -151,12 +138,11 @@ impl ExomeCatalogClient {
/// Retrieves a list of catalogs associated with a specific library from the Exome Catalog service.
///
/// # Arguments
///
/// * `library_id` - The ID of the library to retrieve catalogs for.
/// * `organization_name` - The name of the organization to retrieve catalogs for.
/// * `library_name` - The name of the library to retrieve catalogs for.
///
/// # Returns
///
/// A vector containing the retrieved catalogs on success, or a boxed error on failure.
/// A list of catalogs associated with the specified library.
pub async fn get_catalogs(
&mut self,
organization_name: OrganizationName,
Expand All @@ -180,7 +166,9 @@ impl ExomeCatalogClient {
///
/// # Arguments
///
/// * `catalog_id` - The ID of the catalog to retrieve schemas for.
/// * `organization_name` - The name of the organization to retrieve schemas for.
/// * `library_name` - The name of the library to retrieve schemas for.
/// * `catalog_name` - The name of the catalog to retrieve schemas for.
///
/// # Returns
///
Expand Down Expand Up @@ -219,11 +207,12 @@ impl ExomeCatalogClient {
&mut self,
name: String,
library_name: String,
organization_name: String,
) -> Result<Option<proto::Catalog>, Box<dyn std::error::Error>> {
let request = tonic::Request::new(proto::GetCatalogRequest {
name,
library_name,
organization_name: self.organization_name.clone(),
organization_name,
});

let response = self
Expand All @@ -250,14 +239,13 @@ impl ExomeCatalogClient {
})?;

let token = std::env::var("EXON_EXOME_TOKEN")?;
let organization_id = std::env::var("EXON_EXOME_ORGANIZATION_ID")?;

let use_tls = std::env::var("EXON_EXOME_USE_TLS")?;

if use_tls == "true" {
Self::connect_with_tls(url.to_string(), organization_id, token).await
Self::connect_with_tls(url.to_string(), token).await
} else {
Self::connect(url.to_string(), organization_id, token).await
Self::connect(url.to_string(), token).await
}
}

Expand All @@ -267,12 +255,13 @@ impl ExomeCatalogClient {
schema_name: String,
catalog_name: String,
library_name: String,
organization_name: String,
) -> Result<Vec<proto::Table>, ExomeError> {
let request = self.make_request(proto::ListTablesRequest {
schema_name,
catalog_name,
library_name,
organization_name: self.organization_name.clone(),
organization_name,
})?;

let mut client = self.catalog_service_client.clone();
Expand Down Expand Up @@ -346,12 +335,13 @@ impl ExomeCatalogClient {
name: SchemaName,
catalog_name: CatalogName,
library_name: LibraryName,
organization_name: OrganizationName,
) -> Result<String, ExomeError> {
let request = self.make_request(proto::CreateSchemaRequest {
name: name.to_string(),
catalog_name: catalog_name.to_string(),
library_name: library_name.to_string(),
organization_name: self.organization_name.clone(),
organization_name: organization_name.to_string(),
})?;

let mut client = self.catalog_service_client.clone();
Expand Down
3 changes: 2 additions & 1 deletion exon/exon-exome/src/exome/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ impl Schema {
let library_name = self.inner.library_name.clone();
let catalog_name = self.inner.catalog_name.clone();
let schema_name = self.inner.name.clone();
let organization_name = self.inner.organization_name.clone();

let tables = self
.exome_client
.get_tables(schema_name, catalog_name, library_name)
.get_tables(schema_name, catalog_name, library_name, organization_name)
.await?;

self.tables.clear();
Expand Down
25 changes: 22 additions & 3 deletions exon/exon-exome/src/exome/physical_plan/create_catalog_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use datafusion::{

use futures::stream;

use crate::exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager};
use crate::{
exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager},
exome_config::ExomeConfigExtension,
OrganizationName,
};

use super::CHANGE_SCHEMA;

Expand All @@ -39,6 +43,7 @@ impl CreateCatalogExec {

pub async fn create_catalog(
self,
organization_name: OrganizationName,
manager: Arc<ExomeCatalogManager>,
) -> Result<RecordBatch, DataFusionError> {
let changes = vec![Change::CreateCatalog(CreateCatalog::new(
Expand All @@ -47,7 +52,7 @@ impl CreateCatalogExec {
))];

manager
.apply_changes(changes)
.apply_changes(organization_name, changes)
.await
.map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?;

Expand Down Expand Up @@ -119,8 +124,22 @@ impl ExecutionPlan for CreateCatalogExec {
}
};

let exome_config = match context
.session_config()
.get_extension::<ExomeConfigExtension>()
{
Some(exome_config) => exome_config,
None => {
return Err(DataFusionError::Execution(
"ExomeConfig not found".to_string(),
))
}
};

let this = self.clone();
let stream = stream::once(this.create_catalog(exome_catalog_manager));
let stream = stream::once(
this.create_catalog(exome_config.exome_organization(), exome_catalog_manager),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
CHANGE_SCHEMA.clone(),
Expand Down
27 changes: 23 additions & 4 deletions exon/exon-exome/src/exome/physical_plan/create_schema_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ use datafusion::{

use futures::stream;

use crate::exome_catalog_manager::{
CatalogName, Change, CreateSchema, ExomeCatalogManager, LibraryName, SchemaName,
use crate::{
exome_catalog_manager::{
CatalogName, Change, CreateSchema, ExomeCatalogManager, LibraryName, SchemaName,
},
exome_config::ExomeConfigExtension,
OrganizationName,
};

use super::CHANGE_SCHEMA;
Expand All @@ -41,6 +45,7 @@ impl CreateSchemaExec {

pub async fn create_schema(
self,
organization_name: OrganizationName,
manager: Arc<ExomeCatalogManager>,
) -> Result<RecordBatch, DataFusionError> {
let changes = vec![Change::CreateSchema(CreateSchema::new(
Expand All @@ -50,7 +55,7 @@ impl CreateSchemaExec {
))];

manager
.apply_changes(changes)
.apply_changes(organization_name, changes)
.await
.map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?;

Expand Down Expand Up @@ -122,8 +127,22 @@ impl ExecutionPlan for CreateSchemaExec {
}
};

let exome_config = match context
.session_config()
.get_extension::<ExomeConfigExtension>()
{
Some(exome_config) => exome_config,
None => {
return Err(DataFusionError::Execution(
"ExomeConfig not found".to_string(),
))
}
};

let this = self.clone();
let stream = stream::once(this.create_schema(exome_catalog_manager));
let stream = stream::once(
this.create_schema(exome_config.exome_organization(), exome_catalog_manager),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
CHANGE_SCHEMA.clone(),
Expand Down
27 changes: 23 additions & 4 deletions exon/exon-exome/src/exome/physical_plan/create_table_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ use datafusion::{

use futures::stream;

use crate::exome_catalog_manager::{
CatalogName, Change, CreateTable, ExomeCatalogManager, LibraryName, SchemaName, TableName,
use crate::{
exome_catalog_manager::{
CatalogName, Change, CreateTable, ExomeCatalogManager, LibraryName, SchemaName, TableName,
},
exome_config::ExomeConfigExtension,
OrganizationName,
};

use super::CHANGE_SCHEMA;
Expand Down Expand Up @@ -70,6 +74,7 @@ impl CreateTableExec {

pub async fn create_table(
self,
organization_name: OrganizationName,
manager: Arc<ExomeCatalogManager>,
) -> Result<RecordBatch, DataFusionError> {
let changes = vec![Change::CreateTable(CreateTable::new(
Expand All @@ -85,7 +90,7 @@ impl CreateTableExec {
))];

manager
.apply_changes(changes)
.apply_changes(organization_name, changes)
.await
.map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?;

Expand Down Expand Up @@ -164,8 +169,22 @@ impl ExecutionPlan for CreateTableExec {
}
};

let exome_config = match context
.session_config()
.get_extension::<ExomeConfigExtension>()
{
Some(exome_config) => exome_config,
None => {
return Err(DataFusionError::Execution(
"ExomeConfigExtension not found".to_string(),
))
}
};

let this = self.clone();
let stream = stream::once(this.create_table(exome_catalog_manager));
let stream = stream::once(
this.create_table(exome_config.exome_organization(), exome_catalog_manager),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
CHANGE_SCHEMA.clone(),
Expand Down
Loading

0 comments on commit 2b7ed47

Please sign in to comment.