diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index c0035f0b2bc7d..2121f0a096383 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1447,6 +1447,7 @@ impl CatalogController { .exec(&txn) .await?; + // add new actors for ( PbStreamActor { actor_id, @@ -1554,6 +1555,23 @@ impl CatalogController { actor.update(&txn).await?; } + // Update actor_splits for existing actors + for (actor_id, splits) in actor_splits { + if new_created_actors.contains(&(actor_id as ActorId)) { + continue; + } + + let actor = Actor::find_by_id(actor_id as ActorId) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?; + + let mut actor = actor.into_active_model(); + let splits = splits.iter().map(PbConnectorSplit::from).collect_vec(); + actor.splits = Set(Some((&PbConnectorSplits { splits }).into())); + actor.update(&txn).await?; + } + // fragment update let fragment = Fragment::find_by_id(fragment_id) .one(&txn)