Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Internal Query Execution Status (Issue #1555) (#1561)
Browse files Browse the repository at this point in the history
  • Loading branch information
17zhangw authored May 5, 2021
1 parent 63eafbd commit e8fd9b0
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 25 deletions.
18 changes: 14 additions & 4 deletions src/include/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ class TaskManager;
/** Enum class used to describe the types of tasks available */
enum class TaskType : uint8_t { DDL_TASK, DML_TASK };

/**
* Type meant to represent a dummy result value.
* A Future<> cannot be specified with (void), hence why
* this dummy result type is used.
*/
struct DummyResult {};

/**
* Abstract class for defining a task
*/
Expand Down Expand Up @@ -52,8 +59,10 @@ class TaskDDL : public Task {
* TaskDDL constructor
* @param db_oid Database to execute task within
* @param query_text Query text of DDL/SET
* @param sync Future for the caller to block on
*/
TaskDDL(catalog::db_oid_t db_oid, std::string query_text) : db_oid_(db_oid), query_text_(std::move(query_text)) {}
TaskDDL(catalog::db_oid_t db_oid, std::string query_text, common::ManagedPointer<common::Future<DummyResult>> sync)
: db_oid_(db_oid), query_text_(std::move(query_text)), sync_(sync) {}

void Execute(common::ManagedPointer<util::QueryExecUtil> query_exec_util,
common::ManagedPointer<task::TaskManager> task_manager) override;
Expand All @@ -64,6 +73,7 @@ class TaskDDL : public Task {
private:
catalog::db_oid_t db_oid_;
std::string query_text_;
common::ManagedPointer<common::Future<DummyResult>> sync_;
};

/**
Expand Down Expand Up @@ -113,7 +123,7 @@ class TaskDML : public Task {
std::vector<std::vector<parser::ConstantValueExpression>> &&params, std::vector<type::TypeId> &&param_types,
util::TupleFunction tuple_fn, common::ManagedPointer<metrics::MetricsManager> metrics_manager,
bool force_abort, bool skip_query_cache, std::optional<execution::query_id_t> override_qid,
common::ManagedPointer<common::Future<bool>> sync)
common::ManagedPointer<common::Future<DummyResult>> sync)
: db_oid_(db_oid),
query_text_(std::move(query_text)),
cost_model_(std::move(cost_model)),
Expand All @@ -138,7 +148,7 @@ class TaskDML : public Task {
* @param sync Future for the caller to block on
*/
TaskDML(catalog::db_oid_t db_oid, std::string query_text, std::unique_ptr<optimizer::AbstractCostModel> cost_model,
bool skip_query_cache, util::TupleFunction tuple_fn, common::ManagedPointer<common::Future<bool>> sync)
bool skip_query_cache, util::TupleFunction tuple_fn, common::ManagedPointer<common::Future<DummyResult>> sync)
: db_oid_(db_oid),
query_text_(std::move(query_text)),
cost_model_(std::move(cost_model)),
Expand Down Expand Up @@ -168,7 +178,7 @@ class TaskDML : public Task {
bool force_abort_;
bool skip_query_cache_;
std::optional<execution::query_id_t> override_qid_;
common::ManagedPointer<common::Future<bool>> sync_;
common::ManagedPointer<common::Future<DummyResult>> sync_;
};

} // namespace noisepage::task
12 changes: 12 additions & 0 deletions src/include/util/query_exec_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,14 @@ class QueryExecUtil {
/** Erases all cached plans */
void ClearPlans();

/**
* Returns the most recent error message.
* Should only be invoked if PlanStatement or Execute has failed.
*/
std::string GetError() { return error_msg_; }

private:
void ResetError();
void SetDatabase(catalog::db_oid_t db_oid);

common::ManagedPointer<transaction::TransactionManager> txn_manager_;
Expand All @@ -222,6 +229,11 @@ class QueryExecUtil {
*/
std::unordered_map<std::string, std::unique_ptr<planner::OutputSchema>> schemas_;
std::unordered_map<std::string, std::unique_ptr<execution::compiler::ExecutableQuery>> exec_queries_;

/**
* Stores the most recently encountered error.
*/
std::string error_msg_;
};

} // namespace noisepage::util
12 changes: 9 additions & 3 deletions src/main/db_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#include "settings/settings_defs.h" // NOLINT
#undef __SETTING_GFLAGS_DEFINE__ // NOLINT

#include "common/future.h"
#include "execution/execution_util.h"
#include "loggers/common_logger.h"
#include "optimizer/cost_model/trivial_cost_model.h"
#include "storage/recovery/replication_log_provider.h"
#include "task/task.h"

namespace noisepage {

Expand Down Expand Up @@ -39,10 +41,14 @@ void DBMain::TryLoadStartupDDL() {

if (!startup_ddls.empty() && task_manager_ != nullptr) {
for (auto &ddl : startup_ddls) {
task_manager_->AddTask(std::make_unique<task::TaskDDL>(catalog::INVALID_DATABASE_OID, ddl));
}
common::Future<task::DummyResult> sync;
task_manager_->AddTask(
std::make_unique<task::TaskDDL>(catalog::INVALID_DATABASE_OID, ddl, common::ManagedPointer(&sync)));

task_manager_->WaitForFlush();
auto future_result = sync.DangerousWait();
NOISEPAGE_ASSERT(future_result.second, "Error encountered executing startup DDL.");
(void)future_result;
}
} else if (task_manager_ == nullptr) {
COMMON_LOG_WARN("TryLoadStartupDDL() invoked without TaskManager");
}
Expand Down
12 changes: 6 additions & 6 deletions src/self_driving/planning/pilot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ std::pair<WorkloadMetadata, bool> Pilot::RetrieveWorkloadMetadata(

bool result = true;
{
common::Future<bool> sync;
common::Future<task::DummyResult> sync;

// Metadata query
auto to_row_fn = [&metadata, types_conv](const std::vector<execution::sql::Val *> &values) {
Expand Down Expand Up @@ -187,11 +187,11 @@ std::pair<WorkloadMetadata, bool> Pilot::RetrieveWorkloadMetadata(
if (!future_result.has_value()) {
throw PILOT_EXCEPTION("Future timed out.", common::ErrorCode::ERRCODE_IO_ERROR);
}
result &= future_result->first;
result &= future_result->second;
}

{
common::Future<bool> sync;
common::Future<task::DummyResult> sync;
auto to_row_fn = [&metadata, cves_conv](const std::vector<execution::sql::Val *> &values) {
auto qid = execution::query_id_t(static_cast<execution::sql::Integer *>(values[1])->val_);
auto *param_val = static_cast<execution::sql::StringVal *>(values[2]);
Expand All @@ -213,7 +213,7 @@ std::pair<WorkloadMetadata, bool> Pilot::RetrieveWorkloadMetadata(
if (!future_result.has_value()) {
throw PILOT_EXCEPTION("Future timed out.", common::ErrorCode::ERRCODE_IO_ERROR);
}
result &= future_result->first;
result &= future_result->second;
}

return std::make_pair(std::move(metadata), result);
Expand Down Expand Up @@ -247,7 +247,7 @@ std::unordered_map<int64_t, std::vector<double>> Pilot::GetSegmentInformation(st
auto query = fmt::format("SELECT * FROM noisepage_forecast_frequencies WHERE ts >= {} AND ts <= {} ORDER BY ts",
bounds.first, bounds.second);

common::Future<bool> sync;
common::Future<task::DummyResult> sync;
task_manager_->AddTask(std::make_unique<task::TaskDML>(catalog::INVALID_DATABASE_OID, query,
std::make_unique<optimizer::TrivialCostModel>(), false,
to_row_fn, common::ManagedPointer(&sync)));
Expand All @@ -256,7 +256,7 @@ std::unordered_map<int64_t, std::vector<double>> Pilot::GetSegmentInformation(st
if (!future_result.has_value()) {
throw PILOT_EXCEPTION("Future timed out.", common::ErrorCode::ERRCODE_IO_ERROR);
}
*success = future_result->first;
*success = future_result->second;

NOISEPAGE_ASSERT(segment_number <= (((bounds.second - bounds.first) / interval) + 1),
"Incorrect data retrieved from internal tables");
Expand Down
2 changes: 1 addition & 1 deletion src/self_driving/planning/pilot_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ const std::list<metrics::PipelineMetricRawData::PipelineData> &PilotUtil::Collec
pipeline_qids->push_back(qid);

// Forcefully reoptimize all the queries and set the query identifier to use
common::Future<bool> sync;
common::Future<task::DummyResult> sync;
pilot->task_manager_->AddTask(
std::make_unique<task::TaskDML>(db_oid, query_text, std::make_unique<optimizer::TrivialCostModel>(),
std::move(params), std::move(param_types), nullptr, metrics_manager, true, true,
Expand Down
12 changes: 11 additions & 1 deletion src/task/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ void TaskDDL::Execute(common::ManagedPointer<util::QueryExecUtil> query_exec_uti
query_exec_util->BeginTransaction(db_oid_);
bool status = query_exec_util->ExecuteDDL(query_text_);
query_exec_util->EndTransaction(status);

if (sync_) {
if (status)
sync_->Success(DummyResult{});
else
sync_->Fail(query_exec_util->GetError());
}
}

void TaskDML::Execute(common::ManagedPointer<util::QueryExecUtil> query_exec_util,
Expand Down Expand Up @@ -57,7 +64,10 @@ void TaskDML::Execute(common::ManagedPointer<util::QueryExecUtil> query_exec_uti

query_exec_util->EndTransaction(result && !force_abort_);
if (sync_) {
sync_->Success(result);
if (result)
sync_->Success(DummyResult{});
else
sync_->Fail(query_exec_util->GetError());
}
}

Expand Down
33 changes: 31 additions & 2 deletions src/util/query_exec_util.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "util/query_exec_util.h"

#include <mutex> // NOLINT
#include <sstream>

#include "binder/bind_node_visitor.h"
#include "catalog/catalog.h"
Expand Down Expand Up @@ -53,6 +54,8 @@ void QueryExecUtil::ClearPlan(const std::string &query) {
exec_queries_.erase(query);
}

void QueryExecUtil::ResetError() { error_msg_ = ""; }

void QueryExecUtil::SetDatabase(catalog::db_oid_t db_oid) {
if (db_oid != catalog::INVALID_DATABASE_OID) {
db_oid_ = db_oid;
Expand Down Expand Up @@ -93,6 +96,7 @@ std::pair<std::unique_ptr<network::Statement>, std::unique_ptr<planner::Abstract
const std::string &query, common::ManagedPointer<std::vector<parser::ConstantValueExpression>> params,
common::ManagedPointer<std::vector<type::TypeId>> param_types, std::unique_ptr<optimizer::AbstractCostModel> cost) {
NOISEPAGE_ASSERT(txn_ != nullptr, "Transaction must have been started");
ResetError();
auto txn = common::ManagedPointer<transaction::TransactionContext>(txn_);
auto accessor = catalog_->GetAccessor(txn, db_oid_, DISABLED);

Expand All @@ -102,8 +106,13 @@ std::pair<std::unique_ptr<network::Statement>, std::unique_ptr<planner::Abstract
auto parse_tree = parser::PostgresParser::BuildParseTree(query_tmp);
statement = std::make_unique<network::Statement>(std::move(query_tmp), std::move(parse_tree));
} catch (std::exception &e) {
std::ostringstream ss;
ss << "QueryExecUtil::PlanStatement caught error ";
ss << e.what() << " when parsing " << query << "\n";
error_msg_ = ss.str();

// Catched a parsing error
COMMON_LOG_ERROR("QueryExecUtil::PlanStatement caught error {} when parsing {}", e.what(), query);
COMMON_LOG_ERROR(error_msg_);
return {nullptr, nullptr};
}

Expand All @@ -117,8 +126,13 @@ std::pair<std::unique_ptr<network::Statement>, std::unique_ptr<planner::Abstract
auto binder = binder::BindNodeVisitor(common::ManagedPointer(accessor), db_oid_);
binder.BindNameToNode(statement->ParseResult(), params, param_types);
} catch (std::exception &e) {
std::ostringstream ss;
ss << "QueryExecUtil::PlanStatement caught error ";
ss << e.what() << " when binding " << query << "\n";
error_msg_ = ss.str();

// Caught a binding exception
COMMON_LOG_ERROR("QueryExecUtil::PlanStatement caught error {} when binding {}", e.what(), query);
COMMON_LOG_ERROR(error_msg_);
return {nullptr, nullptr};
}

Expand All @@ -130,6 +144,7 @@ std::pair<std::unique_ptr<network::Statement>, std::unique_ptr<planner::Abstract

bool QueryExecUtil::ExecuteDDL(const std::string &query) {
NOISEPAGE_ASSERT(txn_ != nullptr, "Requires BeginTransaction() or UseTransaction()");
ResetError();
auto txn = common::ManagedPointer<transaction::TransactionContext>(txn_);
auto accessor = catalog_->GetAccessor(txn, db_oid_, DISABLED);
auto result = PlanStatement(query, nullptr, nullptr, std::make_unique<optimizer::TrivialCostModel>());
Expand Down Expand Up @@ -186,6 +201,11 @@ bool QueryExecUtil::ExecuteDDL(const std::string &query) {
}
}

if (!status) {
// Construct an error message indicating the query has failed.
error_msg_ = query + " failed to execute.";
}

return status;
}

Expand All @@ -195,6 +215,7 @@ bool QueryExecUtil::CompileQuery(const std::string &statement,
std::unique_ptr<optimizer::AbstractCostModel> cost,
std::optional<execution::query_id_t> override_qid,
const execution::exec::ExecutionSettings &exec_settings) {
ResetError();
if (exec_queries_.find(statement) != exec_queries_.end()) {
// We have already optimized and compiled this query before
return true;
Expand Down Expand Up @@ -225,6 +246,7 @@ bool QueryExecUtil::ExecuteQuery(const std::string &statement, TupleFunction tup
const execution::exec::ExecutionSettings &exec_settings) {
NOISEPAGE_ASSERT(exec_queries_.find(statement) != exec_queries_.end(), "Cached query not found");
NOISEPAGE_ASSERT(txn_ != nullptr, "Requires BeginTransaction() or UseTransaction()");
ResetError();
auto txn = common::ManagedPointer<transaction::TransactionContext>(txn_);
planner::OutputSchema *schema = schemas_[statement].get();

Expand Down Expand Up @@ -259,7 +281,14 @@ bool QueryExecUtil::ExecuteQuery(const std::string &statement, TupleFunction tup

exec_ctx->SetParams(common::ManagedPointer<const std::vector<parser::ConstantValueExpression>>(params.Get()));

NOISEPAGE_ASSERT(!txn->MustAbort(), "Transaction should not be in must-abort state prior to executing");
exec_queries_[statement]->Run(common::ManagedPointer(exec_ctx), execution::vm::ExecutionMode::Interpret);
if (txn->MustAbort()) {
// Return false to indicate that the query encountered a runtime error.
error_msg_ = statement + " encountered runtime exception.";
return false;
}

return true;
}

Expand Down
4 changes: 2 additions & 2 deletions test/self_driving/query_trace_logging_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ TEST_F(QueryTraceLogging, BasicLogging) {
std::vector<std::vector<parser::ConstantValueExpression>> params;
std::vector<type::TypeId> param_types;

common::Future<bool> sync;
common::Future<task::DummyResult> sync;
task_manager->AddTask(std::make_unique<task::TaskDML>(
catalog::INVALID_DATABASE_OID, "SELECT * FROM noisepage_forecast_frequencies",
std::make_unique<optimizer::TrivialCostModel>(), std::move(params), std::move(param_types), freq_check, nullptr,
false, true, std::nullopt, common::ManagedPointer(&sync)));

auto sync_result = sync.DangerousWait();
bool result = sync_result.first;
bool result = sync_result.second;
EXPECT_TRUE(result && "SELECT frequencies should have succeeded");
EXPECT_TRUE(seen == combined && "Incorrect number recorded");
EXPECT_TRUE(qid_map.size() == qids.size() && "Incorrect number qids recorded");
Expand Down
Loading

0 comments on commit e8fd9b0

Please sign in to comment.