Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dal,sdf): Auto-enqueue update functions for impacted components #5511

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,85 @@ impl Component {
Ok(modified)
}

/// Based on the attribute value being updated, enqueue update actions for components
/// impacted if they have an update action and they have a resource
pub async fn enqueue_relevant_update_actions(
ctx: &DalContext,
attribute_value_id: AttributeValueId,
) -> ComponentResult<Vec<Action>> {
let mut enqueued_actions = Vec::new();

// first check the initial component
let component_id = AttributeValue::component_id(ctx, attribute_value_id).await?;
let actions = Self::enqueue_update_action_if_applicable(ctx, component_id).await?;
enqueued_actions.extend(actions);

// Next, calculate dependency graph to get all of the downstream components that are now also
// potentially needing an update action
// NOTE: We have no way to guarantee that a component's value will actually be different after DVU
// so for now, just enqueue if the value is enqueued to change. More work here is likely needed
let dependency_graph = DependentValueGraph::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can likely be refined to look for updates only to domain, or something to that effect, but as it's behind a flag, we can iterate here to work out the permutations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I'm changing this up now. stay tuned.

ctx,
vec![DependentValueRoot::Unfinished(attribute_value_id.into())],
)
.await?;
for impacted_attribute_value_id in dependency_graph.all_value_ids() {
// check if the attribute value is in the domain tree for the component first
if let Some(prop_for_value) =
AttributeValue::prop_opt(ctx, impacted_attribute_value_id).await?
{
if prop_for_value
.path(ctx)
.await?
.is_descendant_of(&PropPath::new(["root", "domain"]))
{
// first get the component for this attribute value
let component_id =
AttributeValue::component_id(ctx, impacted_attribute_value_id).await?;
let actions =
Self::enqueue_update_action_if_applicable(ctx, component_id).await?;
enqueued_actions.extend(actions);
}
}
}
Ok(enqueued_actions)
}

async fn enqueue_update_action_if_applicable(
ctx: &DalContext,
component_id: ComponentId,
) -> ComponentResult<Vec<Action>> {
let mut enqueued_actions = Vec::new();
if Component::resource_by_id(ctx, component_id)
.await?
.is_some()
{
// then if the current component has an update action, enqueue it
let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?;
let prototypes_for_variant = SchemaVariant::find_action_prototypes_by_kind(
ctx,
schema_variant_id,
ActionKind::Update,
)
.await?;

for prototype_id in prototypes_for_variant {
// don't enqueue the same action twice!
if Action::find_equivalent(ctx, prototype_id, Some(component_id))
.await
.map_err(|err| ComponentError::Action(Box::new(err)))?
.is_none()
{
let new_action = Action::new(ctx, prototype_id, Some(component_id))
.await
.map_err(|err| ComponentError::Action(Box::new(err)))?;
enqueued_actions.push(new_action);
}
}
}
Ok(enqueued_actions)
}

/// `AttributeValueId`s of all input sockets connected to any output socket of this component.
async fn downstream_attribute_value_ids(
&self,
Expand Down
57 changes: 10 additions & 47 deletions lib/dal/tests/integration_test/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,8 @@ async fn auto_queue_creation(ctx: &mut DalContext) {
assert!(action_ids.is_empty());
}

// TODO This test is a stub that should be fixed after actions v2 is done
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODONE

// Right now, the workspace for tests does not have the actions flag set so this won't yield any results
// The tests cases are valid
#[test]
async fn auto_queue_update_and_destroy(ctx: &mut DalContext) {
async fn auto_queue_update(ctx: &mut DalContext) {
// ======================================================
// Creating a component should enqueue a create action
// ======================================================
Expand All @@ -252,6 +249,11 @@ async fn auto_queue_update_and_destroy(ctx: &mut DalContext) {
.await
.expect("apply changeset to base");

// wait for actions to run
ChangeSetTestHelpers::wait_for_actions_to_run(ctx)
.await
.expect("deadline for actions to run exceeded");

ChangeSetTestHelpers::fork_from_head_change_set(ctx)
.await
.expect("fork from head");
Expand All @@ -271,6 +273,9 @@ async fn auto_queue_update_and_destroy(ctx: &mut DalContext) {
AttributeValue::update(ctx, av_id, Some(serde_json::json!("whomever")))
.await
.expect("override domain/name attribute value");
Component::enqueue_relevant_update_actions(ctx, av_id)
.await
.expect("could enqueue update func");

let action_ids = Action::list_topologically(ctx)
.await
Expand All @@ -296,49 +301,7 @@ async fn auto_queue_update_and_destroy(ctx: &mut DalContext) {
};
}
}

// TODO: fix this, update actions have been disabled for now so they wont be automatically enqueued
// As they were being enqueued in the wrong place in AttributeValue, causing actions to be enqueued and immediately run by DVU's running on headg
assert_eq!(update_action_count, 0);

// ======================================================
// Deleting a component with resource should queue the Destroy action
// ======================================================
component.delete(ctx).await.expect("delete component");
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update snapshot to visibility");

// TODO Fix the following section
// Since the creation action never actually runs on the test (or at least we can't wait for it)
// The resource never gets created. A Destroy action only gets queued
// (implicitly by component.delete above) if the component has a resource,
// So the check below is failing

// let action_ids = Action::list_topologically(ctx)
// .await
// .expect("find action ids");
//
// let mut deletion_action_count = 0;
// for action_id in action_ids {
// let action = dbg!(Action::get_by_id(ctx, action_id)
// .await
// .expect("find action by id"));
// if action.state() == ActionState::Queued {
// let prototype_id = Action::prototype_id(ctx, action_id)
// .await
// .expect("get prototype id from action");
// let prototype = ActionPrototype::get_by_id(ctx, prototype_id)
// .await
// .expect("get action prototype by id");
//
// if prototype.kind == ActionKind::Destroy {
// deletion_action_count += 1;
// }
// }
// }

// assert_eq!(deletion_action_count, 1);
assert_eq!(update_action_count, 1);
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions lib/sdf-server/src/service/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use dal::{
};
use dal::{attribute::value::AttributeValueError, component::debug::ComponentDebugViewError};
use dal::{ChangeSetError, TransactionsError};
use si_posthog::PosthogError;
use telemetry::prelude::*;
use thiserror::Error;
use tokio::task::JoinError;
Expand Down Expand Up @@ -82,6 +83,8 @@ pub enum ComponentError {
NotFound(ComponentId),
#[error(transparent)]
ParseInt(#[from] ParseIntError),
#[error("posthog error: {0}")]
Posthog(#[from] PosthogError),
#[error(transparent)]
Prop(#[from] PropError),
#[error("property editor error: {0}")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ pub async fn update_property_editor_value(
}
} else {
AttributeValue::update(&ctx, request.attribute_value_id, request.value.to_owned()).await?;
// this is feature flagged! Check if the current workspace has the flag enabled, if so, do this!
// If anything goes wrong, just skip this and let the route handler continue
if posthog_client
.check_feature_flag(
"auto-enqueue-update-function".to_owned(),
ctx.workspace_pk()?.to_string(),
)
.await
.unwrap_or(false)
{
Component::enqueue_relevant_update_actions(&ctx, request.attribute_value_id).await?;
}
}

let component = Component::get_by_id(&ctx, request.component_id).await?;
Expand Down
1 change: 1 addition & 0 deletions lib/si-posthog-rs/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl PosthogApiClient {
"{api_endpoint}/decide?v=3",
api_endpoint = self.api_endpoint
))
.header("Content-Type", "application/json")
.json(&serde_json::json!(payload))
.send()
.await?;
Expand Down