From 4fd3a63ad9c24fd1a14dc94d361768106c364594 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Mon, 29 Jan 2024 15:59:08 +0800 Subject: [PATCH] remove mutex --- include/rest_rpc/connection.h | 107 ++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/include/rest_rpc/connection.h b/include/rest_rpc/connection.h index 7219e74..3050aec 100644 --- a/include/rest_rpc/connection.h +++ b/include/rest_rpc/connection.h @@ -46,17 +46,15 @@ class connection : public std::enable_shared_from_this, void response(uint64_t req_id, std::string data, request_type req_type = request_type::req_res) { - auto len = data.size(); - assert(len < MAX_BUF_LEN); - - std::unique_lock lock(write_mtx_); - write_queue_.emplace_back(message_type{ - req_id, req_type, std::make_shared(std::move(data))}); - if (write_queue_.size() > 1) { - return; - } - - write(); + assert(data.size() < MAX_BUF_LEN); + auto sp_data = std::make_shared(std::move(data)); + std::weak_ptr weak = shared_from_this(); + asio::post([this, weak, sp_data, req_id, req_type] { + auto conn = weak.lock(); + if (conn) { + response_interal(req_id, std::move(sp_data), req_type); + } + }); } template void pack_and_response(uint64_t req_id, T data) { @@ -196,48 +194,61 @@ class connection : public std::enable_shared_from_this, void read_body(uint32_t func_id, std::size_t size) { auto self(this->shared_from_this()); - async_read( - size, [this, func_id, self](asio::error_code ec, std::size_t length) { - cancel_timer(); + async_read(size, [this, func_id, self](asio::error_code ec, + std::size_t length) { + cancel_timer(); - if (!socket_.is_open()) { - if (on_net_err_) { - (*on_net_err_)(self, "socket already closed"); - } - return; - } + if (!socket_.is_open()) { + if (on_net_err_) { + (*on_net_err_)(self, "socket already closed"); + } + return; + } - if (!ec) { - read_head(); - if (req_type_ == request_type::req_res) { - route_result_t ret = router_.route( - func_id, nonstd::string_view{body_.data(), length}, - this->shared_from_this()); - if (delay_) { - delay_ = false; - } else { - response(req_id_, std::move(ret.result)); - } - } else if (req_type_ == request_type::sub_pub) { - try { - msgpack_codec codec; - auto p = codec.unpack>( - body_.data(), length); - callback_(std::move(std::get<0>(p)), std::move(std::get<1>(p)), - this->shared_from_this()); - } catch (const std::exception &ex) { - print(ex); - if (on_net_err_) { - (*on_net_err_)(self, ex.what()); - } - } - } + if (!ec) { + read_head(); + if (req_type_ == request_type::req_res) { + route_result_t ret = router_.route( + func_id, nonstd::string_view{body_.data(), length}, + this->shared_from_this()); + if (delay_) { + delay_ = false; } else { + response_interal( + req_id_, std::make_shared(std::move(ret.result))); + } + } else if (req_type_ == request_type::sub_pub) { + try { + msgpack_codec codec; + auto p = codec.unpack>( + body_.data(), length); + callback_(std::move(std::get<0>(p)), std::move(std::get<1>(p)), + this->shared_from_this()); + } catch (const std::exception &ex) { + print(ex); if (on_net_err_) { - (*on_net_err_)(self, ec.message()); + (*on_net_err_)(self, ex.what()); } } - }); + } + } else { + if (on_net_err_) { + (*on_net_err_)(self, ec.message()); + } + } + }); + } + + void response_interal(uint64_t req_id, std::shared_ptr data, + request_type req_type = request_type::req_res) { + assert(data->size() < MAX_BUF_LEN); + + write_queue_.emplace_back(message_type{req_id, req_type, std::move(data)}); + if (write_queue_.size() > 1) { + return; + } + + write(); } void write() { @@ -269,7 +280,6 @@ class connection : public std::enable_shared_from_this, return; } - std::unique_lock lock(write_mtx_); write_queue_.pop_front(); if (!write_queue_.empty()) { @@ -414,7 +424,6 @@ class connection : public std::enable_shared_from_this, rpc_header header_; uint32_t write_size_ = 0; - std::mutex write_mtx_; asio::steady_timer timer_; std::size_t timeout_seconds_;