diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -10,6 +10,10 @@ #include +#include +#include +#include + namespace comm { namespace network { @@ -194,27 +198,51 @@ // DeliveryBroker. DeliveryBroker::getInstance().erase(clientDeviceID); } - tunnelbroker::GetResponse response; - auto respondToWriter = - [&writer, &response](std::string fromDeviceID, std::string payload) { - response.mutable_messageresponse()->set_fromdeviceid(fromDeviceID); - response.mutable_messageresponse()->set_payload(payload); - if (!writer->Write(response)) { - throw std::runtime_error( - "gRPC: 'Get' writer error on sending data to the client"); - } - response.Clear(); - }; + + std::mutex writerMutex; + auto respondToWriter = [&](tunnelbroker::GetResponse response) { + std::lock_guard lock(writerMutex); + if (context->IsCancelled()) { + return false; + } + if (!writer->Write(response)) { + LOG(ERROR) << "gRPC: 'Get' writer error on sending data to the client"; + return false; + } + return true; + }; + + auto sendPings = [&]() { + tunnelbroker::GetResponse response; + response.set_ping(true); + while (true) { + if (!respondToWriter(response)) { + context->TryCancel(); + return; + } + std::this_thread::sleep_for( + std::chrono::milliseconds(DEVICE_ONLINE_PING_INTERVAL_MS)); + } + }; + std::thread ping_thread(sendPings); + for (auto &messageFromDatabase : messagesFromDatabase) { - respondToWriter( - messageFromDatabase->getFromDeviceID(), + tunnelbroker::GetResponse response; + response.mutable_messageresponse()->set_fromdeviceid( + messageFromDatabase->getFromDeviceID()); + response.mutable_messageresponse()->set_payload( messageFromDatabase->getPayload()); + respondToWriter(response); database::DatabaseManager::getInstance().removeMessageItem( clientDeviceID, messageFromDatabase->getMessageID()); } while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); - respondToWriter(messageToDeliver.fromDeviceID, messageToDeliver.payload); + tunnelbroker::GetResponse response; + response.mutable_messageresponse()->set_fromdeviceid( + messageToDeliver.fromDeviceID); + response.mutable_messageresponse()->set_payload(messageToDeliver.payload); + respondToWriter(response); comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); database::DatabaseManager::getInstance().removeMessageItem( @@ -225,6 +253,7 @@ // We call `deleteQueueIfEmpty()` for this purpose here. DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } + ping_thread.join(); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Get' request: " << e.what();