Skip to content

Commit

Permalink
[BugFix] Capture resource group for scan task (backport #51121) (#51493)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu authored Sep 27, 2024
1 parent 464d29c commit c672fc6
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
int32_t driver_id = CurrentThread::current().get_driver_id();

workgroup::ScanTask task;
task.workgroup = _workgroup.get();
task.workgroup = _workgroup;
// TODO: consider more factors, such as scan bytes and i/o time.
task.priority = OlapScanNode::compute_priority(_submit_task_counter->value());
const auto io_task_start_nano = MonotonicNanos();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/workgroup/scan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void ScanExecutor::worker_thread() {
if (current_thread != nullptr) {
current_thread->inc_finished_tasks();
}
_task_queue->update_statistics(task.workgroup, time_spent_ns);
_task_queue->update_statistics(task.workgroup.get(), time_spent_ns);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/workgroup/scan_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ StatusOr<ScanTask> WorkGroupScanTaskQueue::take() {
bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {
std::lock_guard<std::mutex> lock(_global_mutex);

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false);

Expand All @@ -112,7 +112,6 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {

void WorkGroupScanTaskQueue::update_statistics(WorkGroup* wg, int64_t runtime_ns) {
std::lock_guard<std::mutex> lock(_global_mutex);

auto* wg_entity = _sched_entity(wg);

// Update bandwidth control information.
Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/workgroup/scan_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <queue>
#include <set>
#include <unordered_set>
#include <utility>

#include "common/statusor.h"
#include "exec/workgroup/work_group_fwd.h"
Expand All @@ -33,8 +34,8 @@ struct ScanTask {

ScanTask() : ScanTask(nullptr, nullptr) {}
explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function)
: workgroup(workgroup), work_function(std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function)
: workgroup(std::move(workgroup)), work_function(std::move(work_function)) {}
~ScanTask() = default;

DISALLOW_COPY(ScanTask);
Expand All @@ -49,7 +50,7 @@ struct ScanTask {
}

public:
WorkGroup* workgroup;
WorkGroupPtr workgroup;
WorkFunction work_function;
int priority = 0;
};
Expand Down

0 comments on commit c672fc6

Please sign in to comment.