diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -10,6 +10,11 @@ #include +#include +#include +#include +#include + namespace comm { namespace network { @@ -167,6 +172,38 @@ grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { + + std::mutex writerMutex; + std::atomic_bool writerIsReady{true}; + + // Thread-safe response writer + auto respondToWriter = [&](tunnelbroker::GetResponse response) { + std::lock_guard lock(writerMutex); + if (!writerIsReady) { + return false; + } + if (!writer->Write(response)) { + writerIsReady = false; + return false; + } + return true; + }; + + // Keep-alive detection pinging thread + tunnelbroker::GetResponse pingResponse; + pingResponse.mutable_ping(); + auto sendingPings = [&]() { + while (true) { + std::this_thread::sleep_for( + std::chrono::milliseconds(DEVICE_ONLINE_PING_INTERVAL_MS)); + if (!respondToWriter(pingResponse)) { + LOG(INFO) + << "gRPC 'Get' handler write error on sending ping to the client"; + return; + } + } + }; + try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { @@ -194,27 +231,36 @@ // DeliveryBroker. DeliveryBroker::getInstance().erase(clientDeviceID); } - tunnelbroker::GetResponse response; - auto respondToWriter = - [&writer, &response](std::string fromDeviceID, std::string payload) { - response.mutable_responsemessage()->set_fromdeviceid(fromDeviceID); - response.mutable_responsemessage()->set_payload(payload); - if (!writer->Write(response)) { - throw std::runtime_error( - "gRPC: 'Get' writer error on sending data to the client"); - } - response.Clear(); - }; + for (auto &messageFromDatabase : messagesFromDatabase) { - respondToWriter( - messageFromDatabase->getFromDeviceID(), + tunnelbroker::GetResponse response; + response.mutable_responsemessage()->set_fromdeviceid( + messageFromDatabase->getFromDeviceID()); + response.mutable_responsemessage()->set_payload( messageFromDatabase->getPayload()); + if (!respondToWriter(response)) { + return grpc::Status( + grpc::StatusCode::INTERNAL, "Channel writer is unavailable"); + } database::DatabaseManager::getInstance().removeMessageItem( clientDeviceID, messageFromDatabase->getMessageID()); } + // We are starting the pinging thread and sending pings only after + // messages from the database was delivered and we are waiting for the new + // messages to come to check is connection alive. + std::thread pingThread(sendingPings); + while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); - respondToWriter(messageToDeliver.fromDeviceID, messageToDeliver.payload); + tunnelbroker::GetResponse response; + response.mutable_responsemessage()->set_fromdeviceid( + messageToDeliver.fromDeviceID); + response.mutable_responsemessage()->set_payload(messageToDeliver.payload); + if (!respondToWriter(response)) { + pingThread.join(); + return grpc::Status( + grpc::StatusCode::INTERNAL, "Channel writer is unavailable"); + } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); database::DatabaseManager::getInstance().removeMessageItem( @@ -226,9 +272,12 @@ DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { - LOG(ERROR) << "gRPC: " - << "Error while processing 'Get' request: " << e.what(); + LOG(ERROR) << "gRPC: Runtime error while processing 'Get' request: " + << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } catch (...) { + LOG(ERROR) << "gRPC: Unknown error while processing 'Get' request"; + return grpc::Status(grpc::StatusCode::INTERNAL, "Unknown error"); } return grpc::Status::OK; };