Skip to content

Commit

Permalink
[Crash Fix] Shutdown and drain completion queue when stopping server (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruhao Gao authored May 26, 2023
1 parent 1a65f92 commit b2f5ba0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
46 changes: 35 additions & 11 deletions src/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace grpc_labview
LabVIEWgRPCServer::LabVIEWgRPCServer() :
_shutdown(false),
_genericMethodEvent(0)
{
{
}

//---------------------------------------------------------------------
Expand Down Expand Up @@ -89,7 +89,7 @@ namespace grpc_labview
//---------------------------------------------------------------------
//---------------------------------------------------------------------
int LabVIEWgRPCServer::ListeningPort()
{
{
return _listeningPort;
}

Expand All @@ -98,7 +98,7 @@ namespace grpc_labview
int LabVIEWgRPCServer::Run(std::string address, std::string serverCertificatePath, std::string serverKeyPath)
{
FinalizeMetadata();

auto serverStarted = new ServerStartEventData;
_runThread = std::make_unique<std::thread>(StaticRunServer, this, address, serverCertificatePath, serverKeyPath, serverStarted);
serverStarted->WaitForComplete();
Expand All @@ -108,7 +108,10 @@ namespace grpc_labview
{
// If we weren't able to start the gRPC server then the _runThread has nothing to do.
// So do an immediate join on the thread.
_runThread->join();
if (_runThread->joinable())
{
_runThread->join();
}
}
return result;
}
Expand Down Expand Up @@ -139,15 +142,16 @@ namespace grpc_labview
bool ok;
while (true)
{
if (_shutdown)
{
break;
}

// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq->Next(&tag, &ok);
static_cast<CallDataBase*>(tag)->Proceed(ok);
if (_shutdown)
{
break;
}
}
}

Expand Down Expand Up @@ -211,15 +215,15 @@ namespace grpc_labview

_rpcService = std::unique_ptr<grpc::AsyncGenericService>(new grpc::AsyncGenericService());
builder.RegisterAsyncGenericService(_rpcService.get());
auto cq = builder.AddCompletionQueue();
_cq = builder.AddCompletionQueue();

_server = builder.BuildAndStart();
if (_server != nullptr)
{
std::cout << "Server listening on " << server_address << std::endl;
serverStarted->NotifyComplete();

HandleRpcs(cq.get());
HandleRpcs(_cq.get());
_server->Wait();
}
else
Expand All @@ -239,7 +243,27 @@ namespace grpc_labview
// We need shutdown passing a deadline so that any RPC calls in progress are terminated as well.
_server->Shutdown(std::chrono::system_clock::now());
_server->Wait();
_runThread->join();

// Always shutdown the completion queue after the server.
if (_cq != nullptr)
{
_cq->Shutdown();
}

if (_runThread->joinable())
{
_runThread->join();
}

// Drain the complete queue before deleting the server.
// Otherwise, server might fail on the assertion that the completion queue must be empty.
if (_cq != nullptr)
{
void *tag;
bool ok;
while (_cq->Next(&tag, &ok)) {}
}

_server = nullptr;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ using grpc::Server;
using grpc::ServerBuilder;
using grpc::Status;

namespace grpc_labview
namespace grpc_labview
{
//---------------------------------------------------------------------
//---------------------------------------------------------------------
Expand Down Expand Up @@ -103,6 +103,7 @@ namespace grpc_labview
private:
std::mutex _mutex;
std::unique_ptr<Server> _server;
std::unique_ptr<grpc::ServerCompletionQueue> _cq;
std::map<std::string, LVEventData> _registeredServerMethods;
LVUserEventRef _genericMethodEvent;
std::unique_ptr<grpc::AsyncGenericService> _rpcService;
Expand Down

0 comments on commit b2f5ba0

Please sign in to comment.