Skip to content

Commit

Permalink
Fix bug for limit sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Jan 5, 2024
1 parent 7166c28 commit 997b735
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,6 @@ void FragmentContext::MakeSinkState(i64 parallel_count) {
}
case PhysicalOperatorType::kParallelAggregate:
case PhysicalOperatorType::kHash:
case PhysicalOperatorType::kLimit:
case PhysicalOperatorType::kTop: {
if (fragment_type_ != FragmentType::kParallelStream) {
Error<SchedulerException>(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type())));
Expand All @@ -711,6 +710,22 @@ void FragmentContext::MakeSinkState(i64 parallel_count) {
}
break;
}
case PhysicalOperatorType::kLimit: {
if (fragment_type_ != FragmentType::kParallelStream) {
Error<SchedulerException>(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type())));
}

if ((i64)tasks_.size() != parallel_count) {
Error<SchedulerException>(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type())));
}

for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) {
auto sink_state = MakeUnique<QueueSinkState>(fragment_ptr_->FragmentID(), task_id);

tasks_[task_id]->sink_state_ = Move(sink_state);
}
break;
}
case PhysicalOperatorType::kMergeParallelAggregate:
case PhysicalOperatorType::kMergeAggregate:
case PhysicalOperatorType::kMergeHash:
Expand Down

0 comments on commit 997b735

Please sign in to comment.