Page MenuHomePhabricator

D5174.diff
No OneTemporary

D5174.diff

diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
--- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
+++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
@@ -13,6 +13,11 @@
#include <glog/logging.h>
+#include <atomic>
+#include <chrono>
+#include <mutex>
+#include <thread>
+
namespace comm {
namespace network {
@@ -212,6 +217,34 @@
grpc::ServerContext *context,
const tunnelbroker::GetRequest *request,
grpc::ServerWriter<tunnelbroker::GetResponse> *writer) {
+
+ std::mutex writerMutex;
+ std::atomic_bool writerIsReady{true};
+
+ // Thread-safe response writer
+ auto respondToWriter = [&](tunnelbroker::GetResponse response) {
+ std::lock_guard<std::mutex> lock(writerMutex);
+ if (!writerIsReady) {
+ return false;
+ }
+ if (!writer->Write(response)) {
+ writerIsReady = false;
+ return false;
+ }
+ return true;
+ };
+
+ // Keep-alive detection pinging thread
+ tunnelbroker::GetResponse pingResponse;
+ pingResponse.mutable_ping();
+ auto sendingPings = [&]() {
+ while (respondToWriter(pingResponse)) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(DEVICE_ONLINE_PING_INTERVAL_MS));
+ }
+ LOG(INFO) << "gRPC 'Get' handler write error on sending ping to the client";
+ };
+
try {
const std::string sessionID = request->sessionid();
if (!tools::validateSessionID(sessionID)) {
@@ -262,26 +295,35 @@
DeliveryBroker::getInstance().erase(clientDeviceID);
}
- auto respondToWriter =
- [&writer, &response](std::string fromDeviceID, std::string payload) {
- response.mutable_responsemessage()->set_fromdeviceid(fromDeviceID);
- response.mutable_responsemessage()->set_payload(payload);
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC: 'Get' writer error on sending data to the client");
- }
- response.Clear();
- };
for (auto &messageFromDatabase : messagesFromDatabase) {
- respondToWriter(
- messageFromDatabase->getFromDeviceID(),
+ tunnelbroker::GetResponse response;
+ response.mutable_responsemessage()->set_fromdeviceid(
+ messageFromDatabase->getFromDeviceID());
+ response.mutable_responsemessage()->set_payload(
messageFromDatabase->getPayload());
+ if (!respondToWriter(response)) {
+ return grpc::Status(
+ grpc::StatusCode::INTERNAL, "Channel writer is unavailable");
+ }
database::DatabaseManager::getInstance().removeMessageItem(
clientDeviceID, messageFromDatabase->getMessageID());
}
+ // We are starting the pinging thread and sending pings only after
+ // messages from the database was delivered and we are waiting for the new
+ // messages to come to check is connection alive.
+ std::thread pingThread(sendingPings);
+
while (1) {
messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
- respondToWriter(messageToDeliver.fromDeviceID, messageToDeliver.payload);
+ tunnelbroker::GetResponse response;
+ response.mutable_responsemessage()->set_fromdeviceid(
+ messageToDeliver.fromDeviceID);
+ response.mutable_responsemessage()->set_payload(messageToDeliver.payload);
+ if (!respondToWriter(response)) {
+ pingThread.join();
+ return grpc::Status(
+ grpc::StatusCode::INTERNAL, "Channel writer is unavailable");
+ }
comm::network::AmqpManager::getInstance().ack(
messageToDeliver.deliveryTag);
database::DatabaseManager::getInstance().removeMessageItem(
@@ -293,9 +335,12 @@
DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID);
}
} catch (std::runtime_error &e) {
- LOG(ERROR) << "gRPC: "
- << "Error while processing 'Get' request: " << e.what();
+ LOG(ERROR) << "gRPC: Runtime error while processing 'Get' request: "
+ << e.what();
return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
+ } catch (...) {
+ LOG(ERROR) << "gRPC: Unknown error while processing 'Get' request";
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Unknown error");
}
return grpc::Status::OK;
};

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 2:32 AM (6 h, 43 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2592466
Default Alt Text
D5174.diff (4 KB)

Event Timeline