Skip to content

Commit

Permalink
Support S3 storage for self-hosted backend (#34680)
Browse files Browse the repository at this point in the history
Add S3 support for self-hosted-backend. Includes refactor to reduce code duplication for initializing storage, moving file, search, export, import storages to `ApplicationStorage` struct that gets passed around instead of re-creating each storage at different layers.

GitOrigin-RevId: f24bfb0af21abeecc9d6ee580bf72e98882911a3
  • Loading branch information
emmaling27 authored and Convex, Inc. committed Feb 26, 2025
1 parent 8255dc1 commit 161e326
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 128 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async_lru = { path = "../async_lru" }
async_zip = { workspace = true }
async_zip_reader = { version = "0.1.0", path = "../async_zip_reader" }
authentication = { path = "../../crates/authentication" }
aws_s3 = { path = "../aws_s3" }
bytes = { workspace = true }
chrono = { workspace = true }
cmd_util = { path = "../cmd_util" }
Expand Down
162 changes: 121 additions & 41 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use authentication::{
validate_id_token,
Auth0IdToken,
};
use aws_s3::storage::S3Storage;
use bytes::Bytes;
use chrono::{
DateTime,
Expand Down Expand Up @@ -208,6 +209,10 @@ use model::{
},
ConfigModel,
},
database_globals::{
types::StorageTagInitializer,
DatabaseGlobalsModel,
},
deployment_audit_log::{
types::DeploymentAuditLogEvent,
DeploymentAuditLogModel,
Expand Down Expand Up @@ -287,9 +292,11 @@ use storage::{
BufferedUpload,
ClientDrivenUploadPartToken,
ClientDrivenUploadToken,
LocalDirStorage,
Storage,
StorageExt,
StorageGetStream,
StorageUseCase,
Upload,
};
use sync_types::{
Expand Down Expand Up @@ -503,17 +510,22 @@ pub enum EnvVarChange {
Set(EnvironmentVariable),
}

#[derive(Clone)]
pub struct ApplicationStorage {
pub files_storage: Arc<dyn Storage>,
pub modules_storage: Arc<dyn Storage>,
search_storage: Arc<dyn Storage>,
pub exports_storage: Arc<dyn Storage>,
snapshot_imports_storage: Arc<dyn Storage>,
}

pub struct Application<RT: Runtime> {
runtime: RT,
database: Database<RT>,
runner: Arc<ApplicationFunctionRunner<RT>>,
function_log: FunctionExecutionLog<RT>,
file_storage: FileStorage<RT>,
files_storage: Arc<dyn Storage>,
modules_storage: Arc<dyn Storage>,
search_storage: Arc<dyn Storage>,
exports_storage: Arc<dyn Storage>,
snapshot_imports_storage: Arc<dyn Storage>,
application_storage: ApplicationStorage,
usage_tracking: UsageCounter,
key_broker: KeyBroker,
instance_name: String,
Expand Down Expand Up @@ -544,11 +556,7 @@ impl<RT: Runtime> Clone for Application<RT> {
runner: self.runner.clone(),
function_log: self.function_log.clone(),
file_storage: self.file_storage.clone(),
files_storage: self.files_storage.clone(),
modules_storage: self.modules_storage.clone(),
search_storage: self.search_storage.clone(),
exports_storage: self.exports_storage.clone(),
snapshot_imports_storage: self.snapshot_imports_storage.clone(),
application_storage: self.application_storage.clone(),
usage_tracking: self.usage_tracking.clone(),
key_broker: self.key_broker.clone(),
instance_name: self.instance_name.clone(),
Expand All @@ -573,16 +581,79 @@ impl<RT: Runtime> Clone for Application<RT> {
}
}

/// Create storage based on the storage type configuration
pub async fn create_storage<RT: Runtime>(
runtime: RT,
storage_type: &model::database_globals::types::StorageType,
use_case: StorageUseCase,
) -> anyhow::Result<Arc<dyn Storage>> {
Ok(match storage_type {
model::database_globals::types::StorageType::S3 { s3_prefix } => {
Arc::new(S3Storage::for_use_case(use_case, s3_prefix.clone(), runtime).await?)
},
model::database_globals::types::StorageType::Local { dir } => {
let storage = LocalDirStorage::for_use_case(runtime, dir, use_case)?;
tracing::info!("{use_case} storage path: {:?}", storage.path());
Arc::new(storage)
},
})
}

impl<RT: Runtime> Application<RT> {
pub async fn initialize_storage(
runtime: RT,
database: &Database<RT>,
storage_tag_initializer: StorageTagInitializer,
instance_name: String,
) -> anyhow::Result<ApplicationStorage> {
let storage_type = {
let mut tx = database.begin_system().await?;
let storage_type = DatabaseGlobalsModel::new(&mut tx)
.initialize_storage_tag(storage_tag_initializer, instance_name)
.await?;
database
.commit_with_write_source(tx, "init_storage")
.await?;
storage_type
};

let files_storage =
create_storage(runtime.clone(), &storage_type, StorageUseCase::Files).await?;
let modules_storage =
create_storage(runtime.clone(), &storage_type, StorageUseCase::Modules).await?;
let search_storage = create_storage(
runtime.clone(),
&storage_type,
StorageUseCase::SearchIndexes,
)
.await?;
let exports_storage =
create_storage(runtime.clone(), &storage_type, StorageUseCase::Exports).await?;
let snapshot_imports_storage = create_storage(
runtime.clone(),
&storage_type,
StorageUseCase::SnapshotImports,
)
.await?;

// Search storage needs to be set for Database to be fully initialized
database.set_search_storage(search_storage.clone());
tracing::info!("{:?} storage is configured.", storage_type);

Ok(ApplicationStorage {
files_storage,
modules_storage,
search_storage,
exports_storage,
snapshot_imports_storage,
})
}

pub async fn new(
runtime: RT,
database: Database<RT>,
file_storage: FileStorage<RT>,
files_storage: Arc<dyn Storage>,
modules_storage: Arc<dyn Storage>,
search_storage: Arc<dyn Storage>,
exports_storage: Arc<dyn Storage>,
snapshot_imports_storage: Arc<dyn Storage>,
application_storage: ApplicationStorage,
usage_tracking: UsageCounter,
key_broker: KeyBroker,
instance_name: String,
Expand All @@ -598,7 +669,8 @@ impl<RT: Runtime> Application<RT> {
app_auth: Arc<ApplicationAuth>,
cache: QueryCache,
) -> anyhow::Result<Self> {
let module_cache = ModuleCache::new(runtime.clone(), modules_storage.clone()).await;
let module_cache =
ModuleCache::new(runtime.clone(), application_storage.modules_storage.clone()).await;
let module_loader = Arc::new(module_cache.clone());

let system_env_vars = btreemap! {
Expand All @@ -622,7 +694,7 @@ impl<RT: Runtime> Application<RT> {
runtime.clone(),
database.clone(),
persistence.reader(),
search_storage.clone(),
application_storage.search_storage.clone(),
searcher,
segment_term_metadata_fetcher,
);
Expand All @@ -639,7 +711,7 @@ impl<RT: Runtime> Application<RT> {
let system_table_cleanup_worker = SystemTableCleanupWorker::new(
runtime.clone(),
database.clone(),
exports_storage.clone(),
application_storage.exports_storage.clone(),
);
let system_table_cleanup_worker = Arc::new(Mutex::new(
runtime.spawn("system_table_cleanup_worker", system_table_cleanup_worker),
Expand All @@ -657,7 +729,7 @@ impl<RT: Runtime> Application<RT> {
function_runner.clone(),
node_actions,
file_storage.transactional_file_storage.clone(),
modules_storage.clone(),
application_storage.modules_storage.clone(),
module_loader,
function_log.clone(),
system_env_vars.clone(),
Expand Down Expand Up @@ -687,8 +759,8 @@ impl<RT: Runtime> Application<RT> {
let export_worker = ExportWorker::new(
runtime.clone(),
database.clone(),
exports_storage.clone(),
files_storage.clone(),
application_storage.exports_storage.clone(),
application_storage.files_storage.clone(),
database.usage_counter().clone(),
instance_name.clone(),
);
Expand All @@ -697,7 +769,7 @@ impl<RT: Runtime> Application<RT> {
let snapshot_import_worker = SnapshotImportWorker::start(
runtime.clone(),
database.clone(),
snapshot_imports_storage.clone(),
application_storage.snapshot_imports_storage.clone(),
file_storage.clone(),
database.usage_counter().clone(),
);
Expand All @@ -709,7 +781,7 @@ impl<RT: Runtime> Application<RT> {
runtime.clone(),
persistence.clone(),
database.clone(),
modules_storage.clone(),
application_storage.modules_storage.clone(),
);
let migration_worker = Arc::new(Mutex::new(Some(
runtime.spawn("migration_worker", migration_worker.go()),
Expand All @@ -721,11 +793,7 @@ impl<RT: Runtime> Application<RT> {
runner,
function_log,
file_storage,
files_storage,
modules_storage,
search_storage,
exports_storage,
snapshot_imports_storage,
application_storage,
usage_tracking,
key_broker,
scheduled_job_runner,
Expand Down Expand Up @@ -754,7 +822,7 @@ impl<RT: Runtime> Application<RT> {
}

pub fn modules_storage(&self) -> &Arc<dyn Storage> {
&self.modules_storage
&self.application_storage.modules_storage
}

pub fn modules_cache(&self) -> &ModuleCache<RT> {
Expand Down Expand Up @@ -1442,14 +1510,15 @@ impl<RT: Runtime> Application<RT> {
},
}
};
let storage_get_stream =
self.exports_storage
.get(&object_key)
.await?
.context(ErrorMetadata::not_found(
"ExportNotFound",
format!("The requested export {snapshot_ts}/{object_key:?} was not found"),
))?;
let storage_get_stream = self
.application_storage
.exports_storage
.get(&object_key)
.await?
.context(ErrorMetadata::not_found(
"ExportNotFound",
format!("The requested export {snapshot_ts}/{object_key:?} was not found"),
))?;

let filename = format!(
// This should match the format in SnapshotExport.tsx.
Expand All @@ -1461,7 +1530,9 @@ impl<RT: Runtime> Application<RT> {

/// Returns the cloud export key - fully qualified to the instance.
pub fn cloud_export_key(&self, zip_export_key: ObjectKey) -> FullyQualifiedObjectKey {
self.exports_storage.fully_qualified_key(&zip_export_key)
self.application_storage
.exports_storage
.fully_qualified_key(&zip_export_key)
}

pub async fn update_environment_variables(
Expand Down Expand Up @@ -2035,6 +2106,7 @@ impl<RT: Runtime> Application<RT> {
));
}
let upload = self
.application_storage
.snapshot_imports_storage
.start_client_driven_upload()
.await?;
Expand All @@ -2055,6 +2127,7 @@ impl<RT: Runtime> Application<RT> {
));
}
let part_token = self
.application_storage
.snapshot_imports_storage
.upload_part(upload_token, part_number, part)
.await?;
Expand All @@ -2077,10 +2150,12 @@ impl<RT: Runtime> Application<RT> {
));
}
let object_key = self
.application_storage
.snapshot_imports_storage
.finish_client_driven_upload(upload_token, part_tokens)
.await?;
let fq_key = self
.application_storage
.snapshot_imports_storage
.fully_qualified_key(&object_key);
start_stored_import(
Expand All @@ -2099,13 +2174,18 @@ impl<RT: Runtime> Application<RT> {
&self,
body_stream: BoxStream<'_, anyhow::Result<Bytes>>,
) -> anyhow::Result<FullyQualifiedObjectKey> {
let mut upload: Box<BufferedUpload> = self.snapshot_imports_storage.start_upload().await?;
let mut upload: Box<BufferedUpload> = self
.application_storage
.snapshot_imports_storage
.start_upload()
.await?;
// unclear why this reassignment is necessary
let mut body_stream = body_stream;
upload.try_write_parallel(&mut body_stream).await?;
drop(body_stream);
let object_key = upload.complete().await?;
Ok(self
.application_storage
.snapshot_imports_storage
.fully_qualified_key(&object_key))
}
Expand Down Expand Up @@ -2148,7 +2228,7 @@ impl<RT: Runtime> Application<RT> {
};
let (storage_key, sha256, package_size) = upload_package(
package,
self.modules_storage.clone(),
self.application_storage.modules_storage.clone(),
external_deps_pkg.map(|pkg| pkg.storage_key),
)
.await?;
Expand Down Expand Up @@ -3016,7 +3096,7 @@ impl<RT: Runtime> Application<RT> {
}

pub fn files_storage(&self) -> Arc<dyn Storage> {
self.files_storage.clone()
self.application_storage.files_storage.clone()
}

/// Add hidden primary key indexes for the given tables. Developers do not
Expand Down
Loading

0 comments on commit 161e326

Please sign in to comment.