diff --git a/src/grpc_server.cc b/src/grpc_server.cc index dec5c387..30d961c0 100644 --- a/src/grpc_server.cc +++ b/src/grpc_server.cc @@ -22,7 +22,7 @@ namespace grpc_labview LabVIEWgRPCServer::LabVIEWgRPCServer() : _shutdown(false), _genericMethodEvent(0) - { + { } //--------------------------------------------------------------------- @@ -89,7 +89,7 @@ namespace grpc_labview //--------------------------------------------------------------------- //--------------------------------------------------------------------- int LabVIEWgRPCServer::ListeningPort() - { + { return _listeningPort; } @@ -98,7 +98,7 @@ namespace grpc_labview int LabVIEWgRPCServer::Run(std::string address, std::string serverCertificatePath, std::string serverKeyPath) { FinalizeMetadata(); - + auto serverStarted = new ServerStartEventData; _runThread = std::make_unique(StaticRunServer, this, address, serverCertificatePath, serverKeyPath, serverStarted); serverStarted->WaitForComplete(); @@ -108,7 +108,10 @@ namespace grpc_labview { // If we weren't able to start the gRPC server then the _runThread has nothing to do. // So do an immediate join on the thread. - _runThread->join(); + if (_runThread->joinable()) + { + _runThread->join(); + } } return result; } @@ -139,15 +142,16 @@ namespace grpc_labview bool ok; while (true) { + if (_shutdown) + { + break; + } + // Block waiting to read the next event from the completion queue. The // event is uniquely identified by its tag, which in this case is the // memory address of a CallData instance. cq->Next(&tag, &ok); static_cast(tag)->Proceed(ok); - if (_shutdown) - { - break; - } } } @@ -211,7 +215,7 @@ namespace grpc_labview _rpcService = std::unique_ptr(new grpc::AsyncGenericService()); builder.RegisterAsyncGenericService(_rpcService.get()); - auto cq = builder.AddCompletionQueue(); + _cq = builder.AddCompletionQueue(); _server = builder.BuildAndStart(); if (_server != nullptr) @@ -219,7 +223,7 @@ namespace grpc_labview std::cout << "Server listening on " << server_address << std::endl; serverStarted->NotifyComplete(); - HandleRpcs(cq.get()); + HandleRpcs(_cq.get()); _server->Wait(); } else @@ -239,7 +243,27 @@ namespace grpc_labview // We need shutdown passing a deadline so that any RPC calls in progress are terminated as well. _server->Shutdown(std::chrono::system_clock::now()); _server->Wait(); - _runThread->join(); + + // Always shutdown the completion queue after the server. + if (_cq != nullptr) + { + _cq->Shutdown(); + } + + if (_runThread->joinable()) + { + _runThread->join(); + } + + // Drain the complete queue before deleting the server. + // Otherwise, server might fail on the assertion that the completion queue must be empty. + if (_cq != nullptr) + { + void *tag; + bool ok; + while (_cq->Next(&tag, &ok)) {} + } + _server = nullptr; } } diff --git a/src/grpc_server.h b/src/grpc_server.h index 177a0015..3a8c786c 100644 --- a/src/grpc_server.h +++ b/src/grpc_server.h @@ -35,7 +35,7 @@ using grpc::Server; using grpc::ServerBuilder; using grpc::Status; -namespace grpc_labview +namespace grpc_labview { //--------------------------------------------------------------------- //--------------------------------------------------------------------- @@ -103,6 +103,7 @@ namespace grpc_labview private: std::mutex _mutex; std::unique_ptr _server; + std::unique_ptr _cq; std::map _registeredServerMethods; LVUserEventRef _genericMethodEvent; std::unique_ptr _rpcService;