Skip to content

Commit

Permalink
Make module sync rely on local cache only
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
nickgerace committed Feb 20, 2025
1 parent 4184ba7 commit 479ff6e
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 150 deletions.
1 change: 1 addition & 0 deletions app/web/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export default (opts: { mode: string }) => {
},
},
server: {
allowedHosts: ["arch"],
host: config.DEV_HOST,
port: parseInt(config.DEV_PORT),
strictPort: true,
Expand Down
134 changes: 108 additions & 26 deletions lib/dal/src/cached_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use si_pkg::{SiPkg, SiPkgError};

pub use si_id::CachedModuleId;

const PLACEHOLDER_OWNER_USER_ID: &str = "-";

#[remain::sorted]
#[derive(Error, Debug)]
pub enum CachedModuleError {
Expand Down Expand Up @@ -82,6 +84,25 @@ impl From<CachedModule> for si_frontend_types::UninstalledVariant {
}
}

// NOTE(nick): the frontend type's shape might be able to be refactored now that syncing only
// relies on the cache.
impl From<CachedModule> for si_frontend_types::LatestModule {
fn from(value: CachedModule) -> Self {
Self {
id: value.id.to_string(),
name: value.schema_name,
description: value.description,
owner_user_id: PLACEHOLDER_OWNER_USER_ID.to_string(),
owner_display_name: None,
metadata: serde_json::Value::Null,
latest_hash: value.latest_hash,
latest_hash_created_at: value.created_at,
created_at: value.created_at,
schema_id: Some(value.schema_id.to_string()),
}
}
}

impl TryFrom<PgRow> for CachedModule {
type Error = CachedModuleError;

Expand Down Expand Up @@ -167,16 +188,31 @@ impl CachedModule {
Ok(result)
}

/// Backfills all cached modules that are not present. This is useful for determining a lineage
/// of modules back to their origin.
pub async fn backfill_cached_modules(ctx: &DalContext) -> CachedModuleResult<()> {
Self::update_cached_modules_inner(ctx, true).await?;
Ok(())
}

/// Calls out to the module index server to fetch the latest module set, and
/// updates the cache for any new builtin modules
pub async fn update_cached_modules(ctx: &DalContext) -> CachedModuleResult<Vec<CachedModule>> {
let services_context = ctx.services_context();
let module_index_url = services_context
.module_index_url()
.ok_or(CachedModuleError::ModuleIndexUrlNotSet)?;
Self::update_cached_modules_inner(ctx, false).await
}

let module_index_client =
ModuleIndexClient::unauthenticated_client(module_index_url.try_into()?);
async fn update_cached_modules_inner(
ctx: &DalContext,
backfill: bool,
) -> CachedModuleResult<Vec<CachedModule>> {
let module_index_client = {
let services_context = ctx.services_context();
let module_index_url = services_context
.module_index_url()
.ok_or(CachedModuleError::ModuleIndexUrlNotSet)?;

ModuleIndexClient::unauthenticated_client(module_index_url.try_into()?)
};

let modules: HashMap<_, _> = module_index_client
.list_builtins()
Expand All @@ -187,23 +223,7 @@ impl CachedModule {
.collect();

// We need to remove any schemas that are in the cache but no longer in the builtin list
let builtin_schema_ids: HashSet<SchemaId> = modules
.values()
.filter_map(|module| {
module
.schema_id
.as_ref()
.and_then(|id_string| Ulid::from_string(id_string.as_str()).ok())
})
.map(Into::into)
.collect();

let current_modules = CachedModule::latest_modules(ctx).await?;
for cached in current_modules {
if !builtin_schema_ids.contains(&cached.schema_id) {
CachedModule::remove_by_schema_id(ctx, cached.schema_id).await?;
}
}
Self::remove_unused(ctx, &modules).await?;

let ctx_clone = ctx.clone();
ctx_clone.commit_no_rebase().await?;
Expand Down Expand Up @@ -252,7 +272,37 @@ impl CachedModule {
Ok(new_modules)
}

pub async fn latest_by_schema_id(
async fn remove_unused(
ctx: &DalContext,
module_details_by_hash: &HashMap<String, ModuleDetailsResponse>,
) -> CachedModuleResult<()> {
let builtin_schema_ids: HashSet<SchemaId> = module_details_by_hash
.values()
.filter_map(|module| {
module
.schema_id
.as_ref()
.and_then(|id_string| Ulid::from_string(id_string.as_str()).ok())
})
.map(Into::into)
.collect();
let cached_schema_ids: Vec<SchemaId> = CachedModule::latest_modules(ctx)
.await?
.iter()
.map(|lm| lm.schema_id)
.collect();

// Look at all schema IDs in the cache and determine if any of them are no longer builtins.
// If they aren't, ALL modules corresponding to them get remove.
for schema_id in cached_schema_ids {
if !builtin_schema_ids.contains(&schema_id) {
CachedModule::remove_all_for_schema_id(ctx, schema_id).await?;
}
}
Ok(())
}

pub async fn find_latest_for_schema_id(
ctx: &DalContext,
schema_id: SchemaId,
) -> CachedModuleResult<Option<CachedModule>> {
Expand Down Expand Up @@ -288,6 +338,38 @@ impl CachedModule {
})
}

pub async fn list_for_schema_id(
ctx: &DalContext,
schema_id: SchemaId,
) -> CachedModuleResult<Vec<CachedModule>> {
let query = "
SELECT
id,
schema_id,
schema_name,
display_name,
category,
link,
color,
description,
component_type,
latest_hash,
created_at,
NULL::bytea AS package_data
FROM cached_modules
WHERE schema_id = $1
ORDER BY schema_id, created_at DESC
";

let rows = ctx.txns().await?.pg().query(query, &[&schema_id]).await?;

let mut result = Vec::with_capacity(rows.len());
for row in rows {
result.push(row.try_into()?);
}
Ok(result)
}

pub async fn latest_modules(ctx: &DalContext) -> CachedModuleResult<Vec<CachedModule>> {
let query = "
SELECT DISTINCT ON (schema_id)
Expand Down Expand Up @@ -318,7 +400,7 @@ impl CachedModule {
Ok(result)
}

pub async fn insert(
async fn insert(
ctx: &DalContext,
module_details: &ModuleDetailsResponse,
pkg_bytes: Arc<Vec<u8>>,
Expand Down Expand Up @@ -423,7 +505,7 @@ impl CachedModule {
Ok(Some(row.try_into()?))
}

pub async fn remove_by_schema_id(
pub async fn remove_all_for_schema_id(
ctx: &DalContext,
schema_id: SchemaId,
) -> CachedModuleResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/management/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl ManagementPrototype {
Some(schema) => schema.name().to_owned(),
None => {
let Some(cached_module) =
CachedModule::latest_by_schema_id(ctx, schema_id).await?
CachedModule::find_latest_for_schema_id(ctx, schema_id).await?
else {
continue;
};
Expand Down
Loading

0 comments on commit 479ff6e

Please sign in to comment.