Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Optimize merge commit sync mode #54676

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,10 @@ CONF_mInt32(batch_write_default_timeout_ms, "600000");
CONF_mInt32(batch_write_rpc_request_retry_num, "10");
CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500");
CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");
CONF_mInt32(merge_commit_txn_state_cache_capacity, "2048");
CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000");
CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2");

CONF_mBool(enable_load_spill, "false");
// Max chunk bytes which allow to spill per flush. Default is 10MB.
Expand Down
10 changes: 10 additions & 0 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "storage/compaction_manager.h"
#include "storage/lake/compaction_scheduler.h"
#include "storage/lake/load_spill_block_manager.h"
Expand Down Expand Up @@ -327,6 +329,14 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
_config_callback.emplace("load_spill_max_merge_bytes", [&]() -> Status {
return StorageEngine::instance()->load_spill_block_merge_executor()->refresh_max_thread_num();
});
_config_callback.emplace("merge_commit_txn_state_cache_capacity", [&]() -> Status {
LOG(INFO) << "set merge_commit_txn_state_cache_capacity: " << config::merge_commit_txn_state_cache_capacity;
auto batch_write_mgr = _exec_env->batch_write_mgr();
if (batch_write_mgr) {
batch_write_mgr->txn_state_cache()->set_capacity(config::merge_commit_txn_state_cache_capacity);
}
return Status::OK();
});

#ifdef USE_STAROS
#define UPDATE_STARLET_CONFIG(BE_CONFIG, STARLET_CONFIG) \
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(RUNTIME_FILES
batch_write/isomorphic_batch_write.cpp
batch_write/batch_write_mgr.cpp
batch_write/batch_write_util.cpp
batch_write/txn_state_cache.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
Expand Down
56 changes: 54 additions & 2 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@

namespace starrocks {

BatchWriteMgr::BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)) {}

Status BatchWriteMgr::init() {
std::unique_ptr<ThreadPoolToken> token =
_executor->get_thread_pool()->new_token(ThreadPool::ExecutionMode::CONCURRENT);
_txn_state_cache = std::make_unique<TxnStateCache>(config::merge_commit_txn_state_cache_capacity, std::move(token));
return _txn_state_cache->init();
}

Status BatchWriteMgr::register_stream_load_pipe(StreamLoadContext* pipe_ctx) {
BatchWriteId batch_write_id = {
.db = pipe_ctx->db, .table = pipe_ctx->table, .load_params = pipe_ctx->load_parameters};
Expand Down Expand Up @@ -78,7 +87,7 @@ StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const st
return it->second;
}

auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get());
auto batch_write = std::make_shared<IsomorphicBatchWrite>(batch_write_id, _executor.get(), _txn_state_cache.get());
Status st = batch_write->init();
if (!st.ok()) {
LOG(ERROR) << "Fail to init batch write, " << batch_write_id << ", status: " << st;
Expand All @@ -105,6 +114,10 @@ void BatchWriteMgr::stop() {
for (auto& batch_write : stop_writes) {
batch_write->stop();
}
if (_txn_state_cache) {
_txn_state_cache->stop();
}
_executor->get_thread_pool()->shutdown();
}

StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
Expand Down Expand Up @@ -224,7 +237,46 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
ctx->buffer->flip();
ctx->receive_bytes = io_buf.size();
ctx->mc_read_data_cost_nanos = MonotonicNanos() - ctx->start_nanos;
ctx->status = exec_env->batch_write_mgr()->append_data(ctx);
ctx->status = append_data(ctx);
}

TTransactionStatus::type to_thrift_txn_status(TransactionStatusPB status) {
switch (status) {
case TRANS_UNKNOWN:
return TTransactionStatus::UNKNOWN;
case TRANS_PREPARE:
return TTransactionStatus::PREPARE;
case TRANS_COMMITTED:
return TTransactionStatus::COMMITTED;
case TRANS_VISIBLE:
return TTransactionStatus::VISIBLE;
case TRANS_ABORTED:
return TTransactionStatus::ABORTED;
case TRANS_PREPARED:
return TTransactionStatus::PREPARED;
default:
return TTransactionStatus::UNKNOWN;
}
}

void BatchWriteMgr::update_transaction_state(ExecEnv* exec_env, brpc::Controller* cntl,
const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response) {
for (int i = 0; i < request->states_size(); i++) {
auto& txn_state = request->states(i);
auto st = _txn_state_cache->push_state(txn_state.txn_id(), to_thrift_txn_status(txn_state.status()),
txn_state.reason());
if (!st.ok()) {
LOG(WARNING) << "Failed to update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason() << ", update error: " << st;
} else {
TRACE_BATCH_WRITE << "Update transaction state, txn_id: " << txn_state.txn_id()
<< ", txn status: " << TransactionStatusPB_Name(txn_state.status())
<< ", status reason: " << txn_state.reason();
}
st.to_protobuf(response->add_results());
}
}

} // namespace starrocks
18 changes: 15 additions & 3 deletions be/src/runtime/batch_write/batch_write_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/statusor.h"
#include "runtime/batch_write/isomorphic_batch_write.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/bthreads/executor.h"
Expand All @@ -32,10 +33,13 @@ class ExecEnv;
class PStreamLoadRequest;
class PStreamLoadResponse;
class StreamLoadContext;
class PUpdateTransactionStateRequest;
class PUpdateTransactionStateResponse;

class BatchWriteMgr {
public:
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor) : _executor(std::move(executor)){};
BatchWriteMgr(std::unique_ptr<bthreads::ThreadPoolExecutor> executor);
Status init();

Status register_stream_load_pipe(StreamLoadContext* pipe_ctx);
void unregister_stream_load_pipe(StreamLoadContext* pipe_ctx);
Expand All @@ -45,19 +49,27 @@ class BatchWriteMgr {

void stop();

bthreads::ThreadPoolExecutor* executor() { return _executor.get(); }
TxnStateCache* txn_state_cache() { return _txn_state_cache.get(); }

static StatusOr<StreamLoadContext*> create_and_register_pipe(
ExecEnv* exec_env, BatchWriteMgr* batch_write_mgr, const string& db, const string& table,
const std::map<std::string, std::string>& load_parameters, const string& label, long txn_id,
const TUniqueId& load_id, int32_t batch_write_interval_ms);

static void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);
void receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl, const PStreamLoadRequest* request,
PStreamLoadResponse* response);

void update_transaction_state(ExecEnv* exec_env, brpc::Controller* cntl,
const PUpdateTransactionStateRequest* request,
PUpdateTransactionStateResponse* response);

private:
StatusOr<IsomorphicBatchWriteSharedPtr> _get_batch_write(const BatchWriteId& batch_write_id,
bool create_if_missing);

std::unique_ptr<bthreads::ThreadPoolExecutor> _executor;
std::unique_ptr<TxnStateCache> _txn_state_cache;
bthreads::BThreadSharedMutex _rw_mutex;
std::unordered_map<BatchWriteId, IsomorphicBatchWriteSharedPtr, BatchWriteIdHash, BatchWriteIdEqual>
_batch_write_map;
Expand Down
99 changes: 30 additions & 69 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ class AsyncAppendDataContext {
std::atomic_int num_retries{-1};
};

IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor)
: _batch_write_id(std::move(batch_write_id)), _executor(executor) {}
IsomorphicBatchWrite::IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor,
TxnStateCache* txn_state_cache)
: _batch_write_id(std::move(batch_write_id)), _executor(executor), _txn_state_cache(txn_state_cache) {}

Status IsomorphicBatchWrite::init() {
TEST_ERROR_POINT("IsomorphicBatchWrite::init::error");
Expand Down Expand Up @@ -220,7 +221,6 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
int64_t start_ts = MonotonicNanos();
AsyncAppendDataContext* async_ctx = new AsyncAppendDataContext(data_ctx);
async_ctx->ref();
async_ctx->create_time_ts.store(MonotonicNanos());
Expand Down Expand Up @@ -258,10 +258,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_batch_write_async) {
return Status::OK();
}
int64_t timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::batch_write_default_timeout_ms;
int64_t left_timeout_ns = std::max((int64_t)0, timeout_ms * 1000 * 1000 - (MonotonicNanos() - start_ts));
return _wait_for_load_status(data_ctx, left_timeout_ns);
return _wait_for_load_finish(data_ctx);
}

int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator<Task>& iter) {
Expand Down Expand Up @@ -418,78 +415,42 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
return st.ok() ? Status(response.status) : st;
}

bool is_final_load_status(const TTransactionStatus::type& status) {
switch (status) {
case TTransactionStatus::VISIBLE:
case TTransactionStatus::ABORTED:
case TTransactionStatus::UNKNOWN:
return true;
default:
return false;
Status IsomorphicBatchWrite::_wait_for_load_finish(StreamLoadContext* data_ctx) {
int64_t total_timeout_ms =
data_ctx->timeout_second > 0 ? data_ctx->timeout_second * 1000 : config::batch_write_default_timeout_ms;
int64_t left_timeout_ms =
std::max((int64_t)0, total_timeout_ms - (MonotonicNanos() - data_ctx->start_nanos) / 1000000);
StatusOr<TxnStateSubscriberPtr> subscriber_status = _txn_state_cache->subscribe_state(
data_ctx->txn_id, data_ctx->label, data_ctx->db, data_ctx->table, data_ctx->auth);
if (!subscriber_status.ok()) {
return Status::InternalError("Failed to create txn state subscriber, " +
subscriber_status.status().to_string());
}
}

// TODO just poll the load status periodically. improve it later, such as cache the label, and FE notify the BE
Status IsomorphicBatchWrite::_wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns) {
TxnStateSubscriberPtr subscriber = std::move(subscriber_status.value());
int64_t start_ts = MonotonicNanos();
int64_t wait_load_finish_ns = std::max((int64_t)0, data_ctx->mc_left_merge_time_nanos) + 1000000;
bthread_usleep(std::min(wait_load_finish_ns, timeout_ns) / 1000);
TGetLoadTxnStatusRequest request;
request.__set_db(_batch_write_id.db);
request.__set_tbl(_batch_write_id.table);
request.__set_txnId(data_ctx->txn_id);
set_request_auth(&request, data_ctx->auth);
TGetLoadTxnStatusResult response;
Status st;
do {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
#ifndef BE_TEST
int64_t rpc_ts = MonotonicNanos();
TNetworkAddress master_addr = get_master_address();
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) {
client->getLoadTxnStatus(response, request);
},
config::batch_write_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive getLoadTxnStatus response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", txn_id: " << data_ctx->txn_id
<< ", label: " << data_ctx->batch_write_label << ", master: " << master_addr
<< ", cost: " << ((MonotonicNanos() - rpc_ts) / 1000) << "us, status: " << st
<< ", response: " << response;
#else
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::request", &request);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::status", &st);
TEST_SYNC_POINT_CALLBACK("IsomorphicBatchWrite::_wait_for_load_status::response", &response);
#endif
if (st.ok() && is_final_load_status(response.status)) {
break;
}
int64_t left_timeout_ns = timeout_ns - (MonotonicNanos() - start_ts);
if (left_timeout_ns <= 0) {
break;
}
bthread_usleep(
std::min(config::batch_write_poll_load_status_interval_ms * (int64_t)1000, left_timeout_ns / 1000));
} while (true);
StatusOr<TxnState> status_or = subscriber->wait_finished_state(left_timeout_ms * 1000);
data_ctx->mc_wait_finish_cost_nanos = MonotonicNanos() - start_ts;
if (!st.ok()) {
return Status::InternalError("Failed to get load status, " + st.to_string());
TRACE_BATCH_WRITE << "finish to wait load, " << _batch_write_id << ", user label: " << data_ctx->label
<< ", txn_id: " << data_ctx->txn_id << ", load label: " << data_ctx->batch_write_label
<< ", cost: " << (data_ctx->mc_wait_finish_cost_nanos / 1000)
<< "us, wait status: " << status_or.status() << ", "
<< (status_or.ok() ? status_or.value() : subscriber->current_state());
if (!status_or.ok()) {
TxnState current_state = subscriber->current_state();
return Status::InternalError(fmt::format("Failed to get load final status, current status: {}, error: {}",
to_string(current_state.txn_status), status_or.status().to_string()));
}
switch (response.status) {
case TTransactionStatus::PREPARE:
case TTransactionStatus::PREPARED:
return Status::TimedOut("load timeout, txn status: " + to_string(response.status));
switch (status_or.value().txn_status) {
case TTransactionStatus::COMMITTED:
return Status::PublishTimeout("Load has not been published before timeout");
case TTransactionStatus::VISIBLE:
return Status::OK();
case TTransactionStatus::ABORTED:
return Status::InternalError("Load is aborted, reason: " + response.reason);
return Status::InternalError("Load is aborted, reason: " + status_or.value().reason);
case TTransactionStatus::UNKNOWN:
return Status::InternalError("Can't find the transaction, reason: " + status_or.value().reason);
default:
return Status::InternalError("Load status is unknown: " + to_string(response.status));
return Status::InternalError("Load status is not final: " + to_string(status_or.value().txn_status));
}
}

Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/batch_write/isomorphic_batch_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/statusor.h"
#include "runtime/batch_write/batch_write_util.h"
#include "runtime/batch_write/txn_state_cache.h"
#include "util/countdown_latch.h"

namespace starrocks {
Expand All @@ -44,7 +45,8 @@ struct Task {

class IsomorphicBatchWrite {
public:
explicit IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor);
explicit IsomorphicBatchWrite(BatchWriteId batch_write_id, bthreads::ThreadPoolExecutor* executor,
TxnStateCache* txn_state_cache);

Status init();

Expand All @@ -65,10 +67,11 @@ class IsomorphicBatchWrite {
Status _execute_write(AsyncAppendDataContext* async_ctx);
Status _write_data_to_pipe(AsyncAppendDataContext* data_ctx);
Status _send_rpc_request(StreamLoadContext* data_ctx);
Status _wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns);
Status _wait_for_load_finish(StreamLoadContext* data_ctx);

BatchWriteId _batch_write_id;
bthreads::ThreadPoolExecutor* _executor;
TxnStateCache* _txn_state_cache;
bool _batch_write_async{false};

bthread::Mutex _mutex;
Expand Down
Loading
Loading