-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
use super::{ | ||
grpc_compaction_client::{GrpcCompactionClient, GrpcCompactionClientConfig}, | ||
local_compaction_client::LocalCompactionClient, | ||
}; | ||
use chroma_config::{registry::Registry, Configurable}; | ||
use chroma_error::ChromaError; | ||
use chroma_system::System; | ||
use chroma_types::{ManualCompactionError, ManualCompactionRequest, ManualCompactionResponse}; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Debug, Clone)] | ||
pub enum CompactionClient { | ||
Local(LocalCompactionClient), | ||
Check failure on line 13 in rust/frontend/src/compaction_client/compaction_client.rs
|
||
Grpc(GrpcCompactionClient), | ||
Check failure on line 14 in rust/frontend/src/compaction_client/compaction_client.rs
|
||
} | ||
|
||
impl CompactionClient { | ||
pub async fn manually_compact( | ||
&mut self, | ||
request: ManualCompactionRequest, | ||
) -> Result<ManualCompactionResponse, ManualCompactionError> { | ||
match self { | ||
CompactionClient::Local(client) => client.manually_compact(request).await, | ||
CompactionClient::Grpc(client) => client.manually_compact(request).await, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
pub enum CompactionClientConfig { | ||
Local, | ||
Grpc(GrpcCompactionClientConfig), | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Configurable<(CompactionClientConfig, System)> for CompactionClient { | ||
async fn try_from_config( | ||
(config, system): &(CompactionClientConfig, System), | ||
registry: &Registry, | ||
) -> Result<Self, Box<dyn ChromaError>> { | ||
match config { | ||
CompactionClientConfig::Local => { | ||
let client = | ||
LocalCompactionClient::try_from_config(&((), system.clone()), registry).await?; | ||
Ok(CompactionClient::Local(client)) | ||
} | ||
CompactionClientConfig::Grpc(grpc_config) => { | ||
let client = GrpcCompactionClient::try_from_config( | ||
&(grpc_config.clone(), system.clone()), | ||
registry, | ||
) | ||
.await?; | ||
Ok(CompactionClient::Grpc(client)) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
use chroma_config::{registry::Registry, Configurable}; | ||
use chroma_error::ChromaError; | ||
use chroma_sysdb::SysDb; | ||
use chroma_system::System; | ||
use chroma_types::{ | ||
chroma_proto::{self, compactor_client::CompactorClient}, | ||
CollectionUuid, ManualCompactionError, ManualCompactionRequest, ManualCompactionResponse, | ||
}; | ||
use serde::{Deserialize, Serialize}; | ||
use thiserror::Error; | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
pub struct GrpcCompactionClientConfig { | ||
pub url: String, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(super) struct GrpcCompactionClient { | ||
sysdb: SysDb, | ||
client: CompactorClient<tonic::transport::Channel>, | ||
} | ||
|
||
impl GrpcCompactionClient { | ||
pub async fn manually_compact( | ||
&mut self, | ||
request: ManualCompactionRequest, | ||
) -> Result<ManualCompactionResponse, ManualCompactionError> { | ||
async fn get_version(sysdb: &mut SysDb, collection_id: CollectionUuid) -> i32 { | ||
let mut collection = sysdb | ||
.get_collections(Some(collection_id), None, None, None, None, 0) | ||
.await | ||
.unwrap(); // todo | ||
let collection = collection.pop().unwrap(); // todo | ||
return collection.version; | ||
} | ||
|
||
let version_before_compaction = get_version(&mut self.sysdb, request.collection_id).await; | ||
|
||
self.client | ||
.compact(chroma_proto::CompactionRequest { | ||
ids: Some(chroma_proto::CollectionIds { | ||
ids: vec![request.collection_id.to_string()], | ||
}), | ||
}) | ||
.await | ||
.unwrap(); | ||
|
||
loop { | ||
let version_after_compaction = | ||
get_version(&mut self.sysdb, request.collection_id).await; | ||
if version_after_compaction > version_before_compaction { | ||
break; | ||
} | ||
// todo | ||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; | ||
} | ||
|
||
Ok(ManualCompactionResponse {}) | ||
} | ||
} | ||
|
||
#[derive(Debug, Error)] | ||
pub enum FromConfigError { | ||
#[error("Failed to create gRPC client: {0}")] | ||
GrpcClient(#[from] tonic::transport::Error), | ||
} | ||
|
||
impl ChromaError for FromConfigError { | ||
fn code(&self) -> chroma_error::ErrorCodes { | ||
match self { | ||
FromConfigError::GrpcClient(_) => chroma_error::ErrorCodes::Internal, | ||
} | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Configurable<(GrpcCompactionClientConfig, System)> for GrpcCompactionClient { | ||
async fn try_from_config( | ||
(config, _system): &(GrpcCompactionClientConfig, System), | ||
registry: &Registry, | ||
) -> Result<Self, Box<dyn ChromaError>> { | ||
let client = CompactorClient::connect(config.url.clone()) | ||
.await | ||
.map_err(|err| FromConfigError::GrpcClient(err).boxed())?; | ||
Ok(Self { | ||
client, | ||
sysdb: registry.get().map_err(|err| err.boxed())?, | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
use chroma_config::{registry::Registry, Configurable}; | ||
use chroma_error::ChromaError; | ||
use chroma_log::LocalCompactionManager; | ||
use chroma_system::{ComponentHandle, System}; | ||
use chroma_types::{ManualCompactionError, ManualCompactionRequest, ManualCompactionResponse}; | ||
|
||
#[derive(Debug, Clone)] | ||
pub(super) struct LocalCompactionClient { | ||
handle: ComponentHandle<LocalCompactionManager>, | ||
Check failure on line 9 in rust/frontend/src/compaction_client/local_compaction_client.rs
|
||
} | ||
|
||
impl LocalCompactionClient { | ||
pub async fn manually_compact( | ||
&mut self, | ||
request: ManualCompactionRequest, | ||
Check failure on line 15 in rust/frontend/src/compaction_client/local_compaction_client.rs
|
||
) -> Result<ManualCompactionResponse, ManualCompactionError> { | ||
// let compact_message = CompactionMessage { | ||
// collection_id: request.collection_id, | ||
// start_offset: start_log_offset, | ||
// total_records, | ||
// }; | ||
// self.handle | ||
// .request(compact_message, None) | ||
// .await | ||
// .unwrap() | ||
// .unwrap(); | ||
// todo | ||
|
||
Ok(ManualCompactionResponse {}) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Configurable<((), System)> for LocalCompactionClient { | ||
async fn try_from_config( | ||
(_config, _system): &((), System), | ||
registry: &Registry, | ||
) -> Result<Self, Box<dyn ChromaError>> { | ||
// Assume the registry has a compaction manager handle | ||
let handle = registry | ||
.get::<ComponentHandle<LocalCompactionManager>>() | ||
.map_err(|err| err.boxed())?; | ||
Ok(Self { handle }) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
pub mod compaction_client; | ||
mod grpc_compaction_client; | ||
mod local_compaction_client; |