Skip to content

Commit

Permalink
fix: background mview without backfilling should not tracked in barri…
Browse files Browse the repository at this point in the history
…er runtime info (#19355)
  • Loading branch information
yezizp2012 authored Nov 12, 2024
1 parent 337235d commit 98aa20b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
10 changes: 8 additions & 2 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl GlobalBarrierWorkerContextImpl {
Ok(())
}

// FIXME: didn't consider Values here
async fn recover_background_mv_progress(
&self,
) -> MetaResult<HashMap<TableId, (String, TableFragments)>> {
Expand All @@ -82,7 +81,14 @@ impl GlobalBarrierWorkerContextImpl {
.get_job_fragments_by_id(mview.table_id)
.await?;
let table_fragments = TableFragments::from_protobuf(table_fragments);
mview_map.insert(table_id, (mview.definition.clone(), table_fragments));
if table_fragments.tracking_progress_actor_ids().is_empty() {
// If there's no tracking actor in the mview, we can finish the job directly.
mgr.catalog_controller
.finish_streaming_job(mview.table_id, None)
.await?;
} else {
mview_map.insert(table_id, (mview.definition.clone(), table_fragments));
}
}

// If failed, enter recovery mode.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ impl MetadataManager {
}

impl MetadataManager {
/// Wait for job finishing notification in `TrackingJob::pre_finish`.
/// Wait for job finishing notification in `TrackingJob::finish`.
/// The progress is updated per barrier.
pub(crate) async fn wait_streaming_job_finished(
&self,
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,6 @@ impl TableFragments {
.cloned()
}

/// Returns actors that contains backfill executors.
pub fn backfill_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0
})
.collect()
}

pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |mask| {
(mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
Expand Down

0 comments on commit 98aa20b

Please sign in to comment.