diff --git a/exon/exon-core/src/config/mod.rs b/exon/exon-core/src/config/mod.rs index 43b4b2ed..25f5dfcf 100644 --- a/exon/exon-core/src/config/mod.rs +++ b/exon/exon-core/src/config/mod.rs @@ -43,7 +43,7 @@ pub fn new_exon_config() -> SessionConfig { } extensions_options! { - /// My own config options. + /// Exon config options. pub struct ExonConfigExtension { pub vcf_parse_info: bool, default = true pub vcf_parse_formats: bool, default = true diff --git a/exon/exon-exome/src/error.rs b/exon/exon-exome/src/error.rs index f0d002a7..613263aa 100644 --- a/exon/exon-exome/src/error.rs +++ b/exon/exon-exome/src/error.rs @@ -48,6 +48,17 @@ impl From for ExomeError { } } +impl From for DataFusionError { + fn from(error: ExomeError) -> Self { + match error { + ExomeError::DataFusionError(e) => e, + ExomeError::Execution(e) => DataFusionError::Execution(e), + ExomeError::TonicError(e) => DataFusionError::Execution(e.to_string()), + ExomeError::EnvironmentError(e) => DataFusionError::Execution(e), + } + } +} + impl std::fmt::Display for ExomeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/exon/exon-exome/src/exome/catalog.rs b/exon/exon-exome/src/exome/catalog.rs index 3b15fa5c..03ec429c 100644 --- a/exon/exon-exome/src/exome/catalog.rs +++ b/exon/exon-exome/src/exome/catalog.rs @@ -30,7 +30,6 @@ type CatalogServiceClient = /// ExomeCatalogClient is a client for interacting with the Exome Catalog service. #[derive(Clone)] pub struct ExomeCatalogClient { - pub(crate) organization_name: String, pub(crate) catalog_service_client: CatalogServiceClient, pub(crate) token: String, } @@ -43,17 +42,12 @@ impl ExomeCatalogClient { /// # Arguments /// /// * `url` - The URL of the Exome Catalog service. - /// * `organization_name` - The organization name associated with the client. /// * `token` - The token to use for authentication. /// /// # Returns /// /// An instance of ExomeCatalogClient on success, or a boxed error on failure. - pub async fn connect_with_tls( - url: String, - organization_name: String, - token: String, - ) -> Result { + pub async fn connect_with_tls(url: String, token: String) -> Result { let tls = tonic::transport::ClientTlsConfig::new(); let channel = tonic::transport::Channel::from_shared(url) @@ -78,7 +72,6 @@ impl ExomeCatalogClient { let _ = health_check_client.check(health_check_request).await?; let s = Self { - organization_name, catalog_service_client, token, }; @@ -92,17 +85,12 @@ impl ExomeCatalogClient { /// # Arguments /// /// * `url` - The URL of the Exome Catalog service. - /// * `organization_name` - The organization ID associated with the client. /// * `token` - The token to use for authentication. /// /// # Returns /// /// An instance of ExomeCatalogClient on success, or a boxed error on failure. - pub async fn connect( - url: String, - organization_name: String, - token: String, - ) -> Result { + pub async fn connect(url: String, token: String) -> Result { let channel = tonic::transport::Channel::from_shared(url) .map_err(|e| { tonic::Status::new( @@ -117,7 +105,6 @@ impl ExomeCatalogClient { proto::catalog_service_client::CatalogServiceClient::new(channel); let s = Self { - organization_name, catalog_service_client, token, }; @@ -151,12 +138,11 @@ impl ExomeCatalogClient { /// Retrieves a list of catalogs associated with a specific library from the Exome Catalog service. /// /// # Arguments - /// - /// * `library_id` - The ID of the library to retrieve catalogs for. + /// * `organization_name` - The name of the organization to retrieve catalogs for. + /// * `library_name` - The name of the library to retrieve catalogs for. /// /// # Returns - /// - /// A vector containing the retrieved catalogs on success, or a boxed error on failure. + /// A list of catalogs associated with the specified library. pub async fn get_catalogs( &mut self, organization_name: OrganizationName, @@ -180,7 +166,9 @@ impl ExomeCatalogClient { /// /// # Arguments /// - /// * `catalog_id` - The ID of the catalog to retrieve schemas for. + /// * `organization_name` - The name of the organization to retrieve schemas for. + /// * `library_name` - The name of the library to retrieve schemas for. + /// * `catalog_name` - The name of the catalog to retrieve schemas for. /// /// # Returns /// @@ -219,11 +207,12 @@ impl ExomeCatalogClient { &mut self, name: String, library_name: String, + organization_name: String, ) -> Result, Box> { let request = tonic::Request::new(proto::GetCatalogRequest { name, library_name, - organization_name: self.organization_name.clone(), + organization_name, }); let response = self @@ -250,14 +239,13 @@ impl ExomeCatalogClient { })?; let token = std::env::var("EXON_EXOME_TOKEN")?; - let organization_id = std::env::var("EXON_EXOME_ORGANIZATION_ID")?; let use_tls = std::env::var("EXON_EXOME_USE_TLS")?; if use_tls == "true" { - Self::connect_with_tls(url.to_string(), organization_id, token).await + Self::connect_with_tls(url.to_string(), token).await } else { - Self::connect(url.to_string(), organization_id, token).await + Self::connect(url.to_string(), token).await } } @@ -267,12 +255,13 @@ impl ExomeCatalogClient { schema_name: String, catalog_name: String, library_name: String, + organization_name: String, ) -> Result, ExomeError> { let request = self.make_request(proto::ListTablesRequest { schema_name, catalog_name, library_name, - organization_name: self.organization_name.clone(), + organization_name, })?; let mut client = self.catalog_service_client.clone(); @@ -346,12 +335,13 @@ impl ExomeCatalogClient { name: SchemaName, catalog_name: CatalogName, library_name: LibraryName, + organization_name: OrganizationName, ) -> Result { let request = self.make_request(proto::CreateSchemaRequest { name: name.to_string(), catalog_name: catalog_name.to_string(), library_name: library_name.to_string(), - organization_name: self.organization_name.clone(), + organization_name: organization_name.to_string(), })?; let mut client = self.catalog_service_client.clone(); diff --git a/exon/exon-exome/src/exome/catalog/schema.rs b/exon/exon-exome/src/exome/catalog/schema.rs index 9a59c7de..d8b17f63 100644 --- a/exon/exon-exome/src/exome/catalog/schema.rs +++ b/exon/exon-exome/src/exome/catalog/schema.rs @@ -69,10 +69,11 @@ impl Schema { let library_name = self.inner.library_name.clone(); let catalog_name = self.inner.catalog_name.clone(); let schema_name = self.inner.name.clone(); + let organization_name = self.inner.organization_name.clone(); let tables = self .exome_client - .get_tables(schema_name, catalog_name, library_name) + .get_tables(schema_name, catalog_name, library_name, organization_name) .await?; self.tables.clear(); diff --git a/exon/exon-exome/src/exome/physical_plan/create_catalog_exec.rs b/exon/exon-exome/src/exome/physical_plan/create_catalog_exec.rs index 87e86fa7..d3cce007 100644 --- a/exon/exon-exome/src/exome/physical_plan/create_catalog_exec.rs +++ b/exon/exon-exome/src/exome/physical_plan/create_catalog_exec.rs @@ -22,7 +22,11 @@ use datafusion::{ use futures::stream; -use crate::exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager}; +use crate::{ + exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager}, + exome_config::ExomeConfigExtension, + OrganizationName, +}; use super::CHANGE_SCHEMA; @@ -39,6 +43,7 @@ impl CreateCatalogExec { pub async fn create_catalog( self, + organization_name: OrganizationName, manager: Arc, ) -> Result { let changes = vec![Change::CreateCatalog(CreateCatalog::new( @@ -47,7 +52,7 @@ impl CreateCatalogExec { ))]; manager - .apply_changes(changes) + .apply_changes(organization_name, changes) .await .map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?; @@ -119,8 +124,22 @@ impl ExecutionPlan for CreateCatalogExec { } }; + let exome_config = match context + .session_config() + .get_extension::() + { + Some(exome_config) => exome_config, + None => { + return Err(DataFusionError::Execution( + "ExomeConfig not found".to_string(), + )) + } + }; + let this = self.clone(); - let stream = stream::once(this.create_catalog(exome_catalog_manager)); + let stream = stream::once( + this.create_catalog(exome_config.exome_organization(), exome_catalog_manager), + ); Ok(Box::pin(RecordBatchStreamAdapter::new( CHANGE_SCHEMA.clone(), diff --git a/exon/exon-exome/src/exome/physical_plan/create_schema_exec.rs b/exon/exon-exome/src/exome/physical_plan/create_schema_exec.rs index f8d578bb..e9866c2a 100644 --- a/exon/exon-exome/src/exome/physical_plan/create_schema_exec.rs +++ b/exon/exon-exome/src/exome/physical_plan/create_schema_exec.rs @@ -22,8 +22,12 @@ use datafusion::{ use futures::stream; -use crate::exome_catalog_manager::{ - CatalogName, Change, CreateSchema, ExomeCatalogManager, LibraryName, SchemaName, +use crate::{ + exome_catalog_manager::{ + CatalogName, Change, CreateSchema, ExomeCatalogManager, LibraryName, SchemaName, + }, + exome_config::ExomeConfigExtension, + OrganizationName, }; use super::CHANGE_SCHEMA; @@ -41,6 +45,7 @@ impl CreateSchemaExec { pub async fn create_schema( self, + organization_name: OrganizationName, manager: Arc, ) -> Result { let changes = vec![Change::CreateSchema(CreateSchema::new( @@ -50,7 +55,7 @@ impl CreateSchemaExec { ))]; manager - .apply_changes(changes) + .apply_changes(organization_name, changes) .await .map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?; @@ -122,8 +127,22 @@ impl ExecutionPlan for CreateSchemaExec { } }; + let exome_config = match context + .session_config() + .get_extension::() + { + Some(exome_config) => exome_config, + None => { + return Err(DataFusionError::Execution( + "ExomeConfig not found".to_string(), + )) + } + }; + let this = self.clone(); - let stream = stream::once(this.create_schema(exome_catalog_manager)); + let stream = stream::once( + this.create_schema(exome_config.exome_organization(), exome_catalog_manager), + ); Ok(Box::pin(RecordBatchStreamAdapter::new( CHANGE_SCHEMA.clone(), diff --git a/exon/exon-exome/src/exome/physical_plan/create_table_exec.rs b/exon/exon-exome/src/exome/physical_plan/create_table_exec.rs index a0d78566..20a42c6d 100644 --- a/exon/exon-exome/src/exome/physical_plan/create_table_exec.rs +++ b/exon/exon-exome/src/exome/physical_plan/create_table_exec.rs @@ -22,8 +22,12 @@ use datafusion::{ use futures::stream; -use crate::exome_catalog_manager::{ - CatalogName, Change, CreateTable, ExomeCatalogManager, LibraryName, SchemaName, TableName, +use crate::{ + exome_catalog_manager::{ + CatalogName, Change, CreateTable, ExomeCatalogManager, LibraryName, SchemaName, TableName, + }, + exome_config::ExomeConfigExtension, + OrganizationName, }; use super::CHANGE_SCHEMA; @@ -70,6 +74,7 @@ impl CreateTableExec { pub async fn create_table( self, + organization_name: OrganizationName, manager: Arc, ) -> Result { let changes = vec![Change::CreateTable(CreateTable::new( @@ -85,7 +90,7 @@ impl CreateTableExec { ))]; manager - .apply_changes(changes) + .apply_changes(organization_name, changes) .await .map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?; @@ -164,8 +169,22 @@ impl ExecutionPlan for CreateTableExec { } }; + let exome_config = match context + .session_config() + .get_extension::() + { + Some(exome_config) => exome_config, + None => { + return Err(DataFusionError::Execution( + "ExomeConfigExtension not found".to_string(), + )) + } + }; + let this = self.clone(); - let stream = stream::once(this.create_table(exome_catalog_manager)); + let stream = stream::once( + this.create_table(exome_config.exome_organization(), exome_catalog_manager), + ); Ok(Box::pin(RecordBatchStreamAdapter::new( CHANGE_SCHEMA.clone(), diff --git a/exon/exon-exome/src/exome/physical_plan/drop_catalog_exec.rs b/exon/exon-exome/src/exome/physical_plan/drop_catalog_exec.rs index e6c5e880..6288da1f 100644 --- a/exon/exon-exome/src/exome/physical_plan/drop_catalog_exec.rs +++ b/exon/exon-exome/src/exome/physical_plan/drop_catalog_exec.rs @@ -22,7 +22,11 @@ use datafusion::{ use futures::stream; -use crate::exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager}; +use crate::{ + exome_catalog_manager::{Change, CreateCatalog, ExomeCatalogManager}, + exome_config::ExomeConfigExtension, + OrganizationName, +}; use super::CHANGE_SCHEMA; @@ -39,6 +43,7 @@ impl DropCatalogExec { pub async fn drop_catalog( self, + organization_name: OrganizationName, manager: Arc, ) -> Result { let changes = vec![Change::CreateCatalog(CreateCatalog::new( @@ -47,7 +52,7 @@ impl DropCatalogExec { ))]; manager - .apply_changes(changes) + .apply_changes(organization_name, changes) .await .map_err(|e| DataFusionError::Execution(format!("Error applying changes: {}", e)))?; @@ -119,8 +124,22 @@ impl ExecutionPlan for DropCatalogExec { } }; + let exome_config = match context + .session_config() + .get_extension::() + { + Some(exome_config) => exome_config, + None => { + return Err(DataFusionError::Execution( + "ExomeConfig not found".to_string(), + )) + } + }; + let this = self.clone(); - let stream = stream::once(this.drop_catalog(exome_catalog_manager)); + let stream = stream::once( + this.drop_catalog(exome_config.exome_organization(), exome_catalog_manager), + ); Ok(Box::pin(RecordBatchStreamAdapter::new( CHANGE_SCHEMA.clone(), diff --git a/exon/exon-exome/src/exome_catalog_manager.rs b/exon/exon-exome/src/exome_catalog_manager.rs index a7ac05ac..8ccf1da0 100644 --- a/exon/exon-exome/src/exome_catalog_manager.rs +++ b/exon/exon-exome/src/exome_catalog_manager.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::{fmt, str::FromStr}; -use crate::{error::ExomeResult, exome::ExomeCatalogClient}; +use crate::{ + error::{ExomeError, ExomeResult}, + exome::ExomeCatalogClient, +}; macro_rules! impl_display_for { ($($t:ty),+) => { @@ -43,6 +46,14 @@ pub struct LibraryName(pub String); #[derive(Debug, Clone)] pub struct OrganizationName(pub String); +impl FromStr for OrganizationName { + type Err = ExomeError; + + fn from_str(s: &str) -> Result { + Ok(Self(s.to_string())) + } +} + impl_display_for!( SchemaName, CatalogName, @@ -151,6 +162,7 @@ impl ExomeCatalogManager { pub async fn apply_changes( &self, + organization_name: OrganizationName, change_set: impl IntoIterator, ) -> ExomeResult<()> { for change in change_set { @@ -160,7 +172,7 @@ impl ExomeCatalogManager { .create_catalog( create_catalog.name, create_catalog.library_name, - OrganizationName(self.client.organization_name.clone()), + organization_name.clone(), ) .await?; } @@ -190,6 +202,7 @@ impl ExomeCatalogManager { create_schema.name, create_schema.catalog_name, create_schema.library_name, + organization_name.clone(), ) .await?; } diff --git a/exon/exon-exome/src/exome_config.rs b/exon/exon-exome/src/exome_config.rs new file mode 100644 index 00000000..72291e21 --- /dev/null +++ b/exon/exon-exome/src/exome_config.rs @@ -0,0 +1,99 @@ +// Copyright 2023 WHERE TRUE Technologies. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; + +use datafusion::config::{ConfigExtension, ExtensionOptions}; + +use crate::{error::ExomeError, OrganizationName}; + +enum ConfigKeys { + ExomeOrganization, + ExomeLibrary, +} + +impl FromStr for ConfigKeys { + type Err = ExomeError; + + fn from_str(s: &str) -> Result { + match s { + "exome_organization" => Ok(Self::ExomeOrganization), + "exome_library" => Ok(Self::ExomeLibrary), + _ => Err(ExomeError::Execution(format!("Unknown config key: {}", s))), + } + } +} + +#[derive(Debug, Clone)] +pub struct ExomeConfigExtension { + exome_organization: String, + exome_library: String, +} + +impl Default for ExomeConfigExtension { + fn default() -> Self { + Self { + exome_organization: "public".to_string(), + exome_library: "example_library".to_string(), + } + } +} + +impl ExomeConfigExtension { + pub fn exome_organization(&self) -> OrganizationName { + OrganizationName(self.exome_organization.clone()) + } +} + +impl ConfigExtension for ExomeConfigExtension { + const PREFIX: &'static str = "exome"; +} + +impl ExtensionOptions for ExomeConfigExtension { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> { + match ConfigKeys::from_str(key)? { + ConfigKeys::ExomeOrganization => self.exome_organization = value.to_string(), + ConfigKeys::ExomeLibrary => self.exome_library = value.to_string(), + } + + Ok(()) + } + + fn entries(&self) -> Vec { + vec![ + datafusion::config::ConfigEntry { + key: "exome_organization".to_string(), + value: Some(self.exome_organization.clone()), + description: "The organization name for the exome catalog", + }, + datafusion::config::ConfigEntry { + key: "exome_library".to_string(), + value: Some(self.exome_library.clone()), + description: "The library name for the exome catalog", + }, + ] + } +} diff --git a/exon/exon-exome/src/exome_session.rs b/exon/exon-exome/src/exome_session.rs index 3cb4fc23..6a32a8eb 100644 --- a/exon/exon-exome/src/exome_session.rs +++ b/exon/exon-exome/src/exome_session.rs @@ -30,6 +30,7 @@ use crate::{ register_catalog, ExomeCatalogClient, }, exome_catalog_manager::{CatalogName, LibraryName, OrganizationName}, + exome_config::ExomeConfigExtension, exome_extension_planner::DfExtensionNode, exon_client::ExonClient, ExomeCatalogManager, ExomeExtensionPlanner, @@ -40,12 +41,8 @@ pub struct ExomeSession { } impl ExomeSession { - pub async fn connect( - url: String, - organization_name: String, - token: String, - ) -> ExomeResult { - let client = ExomeCatalogClient::connect(url, organization_name, token).await?; + pub async fn connect(url: String, token: String) -> ExomeResult { + let client = ExomeCatalogClient::connect(url, token).await?; let extension_manager = ExomeCatalogManager::new(client.clone()); @@ -148,7 +145,11 @@ impl From for ExomeSession { fn from(client: ExomeCatalogClient) -> Self { let extension_manager = ExomeCatalogManager::new(client.clone()); - let config = new_exon_config().with_extension(Arc::new(extension_manager)); + let mut config = new_exon_config().with_extension(Arc::new(extension_manager)); + config + .options_mut() + .extensions + .insert(ExomeConfigExtension::default()); let session = SessionContext::with_config_exon(config); @@ -162,6 +163,7 @@ impl ExonClient for ExomeSession { &mut self, catalog_name: CatalogName, library_name: LibraryName, + organization_name: OrganizationName, ) -> ExomeResult<()> { let manager = self .session @@ -174,11 +176,7 @@ impl ExonClient for ExomeSession { manager .client .clone() - .create_catalog( - catalog_name, - library_name, - OrganizationName(manager.client.organization_name.clone()), - ) + .create_catalog(catalog_name, library_name, organization_name) .await .map_err(|e| DataFusionError::Execution(format!("Error creating catalog {}", e)))?; @@ -241,12 +239,9 @@ mod tests { #[tokio::test] async fn test_exome_create_catalog() -> Result<(), Box> { - let exome_session = ExomeSession::connect( - "http://localhost:50051".to_string(), - "public".to_string(), - "token".to_string(), - ) - .await?; + let exome_session = + ExomeSession::connect("http://localhost:50051".to_string(), "token".to_string()) + .await?; // let sql = "CREATE DATABASE test_catalog;"; diff --git a/exon/exon-exome/src/exon_client.rs b/exon/exon-exome/src/exon_client.rs index 034a3d24..40322e6e 100644 --- a/exon/exon-exome/src/exon_client.rs +++ b/exon/exon-exome/src/exon_client.rs @@ -29,5 +29,6 @@ pub trait ExonClient { &mut self, catalog_name: CatalogName, library_name: LibraryName, + organization_name: OrganizationName, ) -> Result<(), ExomeError>; } diff --git a/exon/exon-exome/src/lib.rs b/exon/exon-exome/src/lib.rs index f4285d70..0d895bb9 100644 --- a/exon/exon-exome/src/lib.rs +++ b/exon/exon-exome/src/lib.rs @@ -15,6 +15,7 @@ mod error; mod exome; mod exome_catalog_manager; +mod exome_config; mod exome_extension_planner; mod exome_session; mod exon_client;