Page MenuHomePhabricator

D5431.id17839.diff
No OneTemporary

D5431.id17839.diff

diff --git a/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.h b/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.h
deleted file mode 100644
--- a/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include <tunnelbroker.grpc.pb.h>
-#include <tunnelbroker.pb.h>
-
-#include <grpcpp/grpcpp.h>
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-class TunnelBrokerServiceImpl final
- : public tunnelbroker::TunnelbrokerService::Service {
-
-public:
- TunnelBrokerServiceImpl();
- virtual ~TunnelBrokerServiceImpl();
-
- grpc::Status SessionSignature(
- grpc::ServerContext *context,
- const tunnelbroker::SessionSignatureRequest *request,
- tunnelbroker::SessionSignatureResponse *reply) override;
-
- grpc::Status NewSession(
- grpc::ServerContext *context,
- const tunnelbroker::NewSessionRequest *request,
- tunnelbroker::NewSessionResponse *reply) override;
-
- grpc::Status Send(
- grpc::ServerContext *context,
- const tunnelbroker::SendRequest *request,
- google::protobuf::Empty *reply) override;
-
- grpc::Status
- Get(grpc::ServerContext *context,
- const tunnelbroker::GetRequest *request,
- grpc::ServerWriter<tunnelbroker::GetResponse> *stream) override;
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.cpp
deleted file mode 100644
--- a/services/tunnelbroker/src/libcpp/src/Service/TunnelbrokerServiceImpl.cpp
+++ /dev/null
@@ -1,361 +0,0 @@
-#include "TunnelbrokerServiceImpl.h"
-#include "AmqpManager.h"
-#include "AwsTools.h"
-#include "ConfigManager.h"
-#include "CryptoTools.h"
-#include "DatabaseManager.h"
-#include "DeliveryBroker.h"
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include "rust-lib/src/lib.rs.h"
-#include "rust/cxx.h"
-
-#include <glog/logging.h>
-
-#include <atomic>
-#include <chrono>
-#include <mutex>
-#include <thread>
-
-namespace comm {
-namespace network {
-
-TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() {
- Aws::InitAPI({});
- // List of AWS DynamoDB tables to check if they are created and can be
- // accessed before any AWS API methods
- const std::list<std::string> tablesList = {
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE),
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE),
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE),
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)};
- for (const std::string &table : tablesList) {
- if (!database::DatabaseManager::getInstance().isTableAvailable(table)) {
- throw std::runtime_error(
- "Error: AWS DynamoDB table '" + table + "' is not available");
- }
- };
-};
-
-TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() {
- Aws::ShutdownAPI({});
-};
-
-grpc::Status TunnelBrokerServiceImpl::SessionSignature(
- grpc::ServerContext *context,
- const tunnelbroker::SessionSignatureRequest *request,
- tunnelbroker::SessionSignatureResponse *reply) {
- const std::string deviceID = request->deviceid();
- if (!tools::validateDeviceID(deviceID)) {
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Format validation failed for deviceID");
- }
- const std::string toSign =
- tools::generateRandomString(SIGNATURE_REQUEST_LENGTH);
- std::shared_ptr<database::SessionSignItem> SessionSignItem =
- std::make_shared<database::SessionSignItem>(toSign, deviceID);
- database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem);
- reply->set_tosign(toSign);
- return grpc::Status::OK;
-};
-
-grpc::Status TunnelBrokerServiceImpl::NewSession(
- grpc::ServerContext *context,
- const tunnelbroker::NewSessionRequest *request,
- tunnelbroker::NewSessionResponse *reply) {
-
- std::shared_ptr<database::DeviceSessionItem> deviceSessionItem;
- std::shared_ptr<database::SessionSignItem> sessionSignItem;
- std::shared_ptr<database::PublicKeyItem> publicKeyItem;
- const std::string deviceID = request->deviceid();
- if (!tools::validateDeviceID(deviceID)) {
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Format validation failed for deviceID");
- }
- const std::string signature = request->signature();
- const std::string publicKey = request->publickey();
- const std::string newSessionID = tools::generateUUID();
- try {
- sessionSignItem =
- database::DatabaseManager::getInstance().findSessionSignItem(deviceID);
- if (sessionSignItem == nullptr) {
- return grpc::Status(
- grpc::StatusCode::NOT_FOUND, "Session sign request not found");
- }
- publicKeyItem =
- database::DatabaseManager::getInstance().findPublicKeyItem(deviceID);
- if (publicKeyItem == nullptr) {
- std::shared_ptr<database::PublicKeyItem> newPublicKeyItem =
- std::make_shared<database::PublicKeyItem>(deviceID, publicKey);
- database::DatabaseManager::getInstance().putPublicKeyItem(
- *newPublicKeyItem);
- } else if (publicKey != publicKeyItem->getPublicKey()) {
- return grpc::Status(
- grpc::StatusCode::PERMISSION_DENIED,
- "The public key doesn't match for deviceID");
- }
- const std::string verificationMessage = sessionSignItem->getSign();
- if (!comm::network::crypto::rsaVerifyString(
- publicKey, verificationMessage, signature)) {
- return grpc::Status(
- grpc::StatusCode::PERMISSION_DENIED,
- "Signature for the verification message is not valid");
- }
- database::DatabaseManager::getInstance().removeSessionSignItem(deviceID);
-
- deviceSessionItem = std::make_shared<database::DeviceSessionItem>(
- newSessionID,
- deviceID,
- request->publickey(),
- request->notifytoken(),
- tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()),
- request->deviceappversion(),
- request->deviceos());
- database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem);
- } catch (std::runtime_error &e) {
- LOG(ERROR) << "gRPC: "
- << "Error while processing 'NewSession' request: " << e.what();
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- reply->set_sessionid(newSessionID);
- return grpc::Status::OK;
-};
-
-grpc::Status TunnelBrokerServiceImpl::Send(
- grpc::ServerContext *context,
- const tunnelbroker::SendRequest *request,
- google::protobuf::Empty *reply) {
- try {
- const std::string sessionID = request->sessionid();
- if (!tools::validateSessionID(sessionID)) {
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Format validation failed for sessionID");
- }
- std::shared_ptr<database::DeviceSessionItem> sessionItem =
- database::DatabaseManager::getInstance().findSessionItem(sessionID);
- if (sessionItem == nullptr) {
- return grpc::Status(
- grpc::StatusCode::PERMISSION_DENIED,
- "No such session found. SessionID: " + sessionID);
- }
- const std::string clientDeviceID = sessionItem->getDeviceID();
- const std::string messageID = tools::generateUUID();
- const std::string notifyToken = sessionItem->getNotifyToken();
- const std::string deviceOs = sessionItem->getDeviceOs();
-
- const database::MessageItem message(
- messageID,
- clientDeviceID,
- request->todeviceid(),
- request->payload(),
- "");
- database::DatabaseManager::getInstance().putMessageItem(message);
- if (!AmqpManager::getInstance().send(&message)) {
- LOG(ERROR) << "gRPC: "
- << "Error while publish the message to AMQP";
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Error while publish the message to AMQP");
- }
-
- if (!sessionItem->getIsOnline()) {
- const std::string notificationMessageTitle = "New message";
- const std::string notificationMessageText = "You have a new message";
- if (deviceOs == "iOS" && !notifyToken.empty()) {
- typedef rust::notifications::apnsReturnStatus apnsReturnStatus;
- const apnsReturnStatus apnsResult =
- rust::notifications::sendNotifToAPNS(
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_NOTIFS_APNS_P12_CERT_PATH),
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::
- OPTION_NOTIFS_APNS_P12_CERT_PASSWORD),
- notifyToken,
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_NOTIFS_APNS_TOPIC),
- notificationMessageText,
- false);
- if ((apnsResult == apnsReturnStatus::Unregistered ||
- apnsResult == apnsReturnStatus::BadDeviceToken) &&
- !database::DatabaseManager::getInstance()
- .updateSessionItemDeviceToken(sessionID, "")) {
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Can't clear the device token in database");
- }
- } else if (deviceOs == "Android" && !notifyToken.empty()) {
- typedef rust::notifications::fcmReturnStatus fcmReturnStatus;
- const fcmReturnStatus fcmResult = rust::notifications::sendNotifToFCM(
- config::ConfigManager::getInstance().getParameter(
- config::ConfigManager::OPTION_NOTIFS_FCM_SERVER_KEY),
- notifyToken,
- notificationMessageTitle,
- notificationMessageText);
- if ((fcmResult == fcmReturnStatus::InvalidRegistration ||
- fcmResult == fcmReturnStatus::NotRegistered) &&
- !database::DatabaseManager::getInstance()
- .updateSessionItemDeviceToken(sessionID, "")) {
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Can't clear the device token in database");
- }
- }
- }
- } catch (std::runtime_error &e) {
- LOG(ERROR) << "gRPC: "
- << "Error while processing 'Send' request: " << e.what();
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- return grpc::Status::OK;
-};
-
-grpc::Status TunnelBrokerServiceImpl::Get(
- grpc::ServerContext *context,
- const tunnelbroker::GetRequest *request,
- grpc::ServerWriter<tunnelbroker::GetResponse> *writer) {
-
- std::mutex writerMutex;
- std::atomic_bool writerIsReady{true};
-
- const std::string sessionID = request->sessionid();
- if (!tools::validateSessionID(sessionID)) {
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Format validation failed for sessionID");
- }
- // Thread-safe response writer
- auto respondToWriter = [&](tunnelbroker::GetResponse response) {
- std::lock_guard<std::mutex> lock(writerMutex);
- if (!writerIsReady) {
- return false;
- }
- if (!writer->Write(response)) {
- writerIsReady = false;
- database::DatabaseManager::getInstance().updateSessionItemIsOnline(
- sessionID, false);
- return false;
- }
- return true;
- };
-
- // Keep-alive detection pinging thread
- tunnelbroker::GetResponse pingResponse;
- pingResponse.mutable_ping();
- auto sendingPings = [&]() {
- while (respondToWriter(pingResponse)) {
- std::this_thread::sleep_for(
- std::chrono::milliseconds(DEVICE_ONLINE_PING_INTERVAL_MS));
- }
- LOG(INFO) << "gRPC 'Get' handler write error on sending ping to the client";
- };
-
- try {
- std::shared_ptr<database::DeviceSessionItem> sessionItem =
- database::DatabaseManager::getInstance().findSessionItem(sessionID);
- if (sessionItem == nullptr) {
- return grpc::Status(
- grpc::StatusCode::PERMISSION_DENIED,
- "No such session found. SessionID: " + sessionID);
- }
-
- tunnelbroker::GetResponse response;
- // Handling of device notification token expiration and update
- if (request->has_newnotifytoken()) {
- if (!database::DatabaseManager::getInstance()
- .updateSessionItemDeviceToken(
- sessionID, request->newnotifytoken())) {
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Can't update device token in the database");
- }
- sessionItem =
- database::DatabaseManager::getInstance().findSessionItem(sessionID);
- } else if (sessionItem->getNotifyToken().empty()) {
- response.mutable_newnotifytokenrequired();
- if (!writer->Write(response)) {
- throw std::runtime_error(
- "gRPC writer error on sending data to the client");
- }
- response.Clear();
- }
-
- const std::string clientDeviceID = sessionItem->getDeviceID();
- DeliveryBrokerMessage messageToDeliver;
-
- std::vector<std::shared_ptr<database::MessageItem>> messagesFromDatabase =
- database::DatabaseManager::getInstance().findMessageItemsByReceiver(
- clientDeviceID);
- if (messagesFromDatabase.size() > 0) {
- // When a client connects and requests GET for the messages first we check
- // if there are undelivered messages in the database. If so, we are
- // erasing the messages to deliver from rabbitMQ which are handled by
- // DeliveryBroker.
- DeliveryBroker::getInstance().erase(clientDeviceID);
- }
- database::DatabaseManager::getInstance().updateSessionItemIsOnline(
- sessionID, true);
-
- for (auto &messageFromDatabase : messagesFromDatabase) {
- tunnelbroker::GetResponse response;
- response.mutable_responsemessage()->set_fromdeviceid(
- messageFromDatabase->getFromDeviceID());
- response.mutable_responsemessage()->set_payload(
- messageFromDatabase->getPayload());
- if (!respondToWriter(response)) {
- return grpc::Status(
- grpc::StatusCode::INTERNAL, "Channel writer is unavailable");
- }
- database::DatabaseManager::getInstance().removeMessageItem(
- clientDeviceID, messageFromDatabase->getMessageID());
- }
- // We are starting the pinging thread and sending pings only after
- // messages from the database was delivered and we are waiting for the new
- // messages to come to check is connection alive.
- std::thread pingThread(sendingPings);
-
- while (1) {
- messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID);
- tunnelbroker::GetResponse response;
- response.mutable_responsemessage()->set_fromdeviceid(
- messageToDeliver.fromDeviceID);
- response.mutable_responsemessage()->set_payload(messageToDeliver.payload);
- if (!respondToWriter(response)) {
- pingThread.join();
- return grpc::Status(
- grpc::StatusCode::INTERNAL, "Channel writer is unavailable");
- }
- comm::network::AmqpManager::getInstance().ack(
- messageToDeliver.deliveryTag);
- database::DatabaseManager::getInstance().removeMessageItem(
- clientDeviceID, messageToDeliver.messageID);
- // If messages queue for `clientDeviceID` is empty we don't need to store
- // `folly::MPMCQueue` for it and need to free memory to fix possible
- // 'ghost' queues in DeliveryBroker.
- // We call `deleteQueueIfEmpty()` for this purpose here.
- DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID);
- }
- } catch (std::runtime_error &e) {
- LOG(ERROR) << "gRPC: Runtime error while processing 'Get' request: "
- << e.what();
- database::DatabaseManager::getInstance().updateSessionItemIsOnline(
- sessionID, false);
- return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- } catch (...) {
- LOG(ERROR) << "gRPC: Unknown error while processing 'Get' request";
- database::DatabaseManager::getInstance().updateSessionItemIsOnline(
- sessionID, false);
- return grpc::Status(grpc::StatusCode::INTERNAL, "Unknown error");
- }
- return grpc::Status::OK;
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/tunnelbroker/src/libcpp/src/server.cpp b/services/tunnelbroker/src/libcpp/src/server.cpp
deleted file mode 100644
--- a/services/tunnelbroker/src/libcpp/src/server.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-#include "AmqpManager.h"
-#include "ConfigManager.h"
-#include "GlobalTools.h"
-#include "TunnelbrokerServiceImpl.h"
-
-#include "GlobalConstants.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <string>
-#include <thread>
-
-namespace comm {
-namespace network {
-
-void RunServer() {
- TunnelBrokerServiceImpl service;
- grpc::EnableDefaultHealthCheckService(true);
- grpc::ServerBuilder builder;
- // Listen on the given address without any authentication mechanism.
- builder.AddListeningPort(
- SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials());
- // Register "service" as the instance through which we'll communicate with
- // clients. In this case it corresponds to an *synchronous* service.
- builder.RegisterService(&service);
- std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
- LOG(INFO) << "server listening at :" << SERVER_LISTEN_ADDRESS;
- // Wait for the server to shutdown. Note that some other thread must be
- // responsible for shutting down the server for this call to ever return.
- server->Wait();
-}
-
-} // namespace network
-} // namespace comm
-
-int main(int argc, char **argv) {
- comm::network::tools::InitLogging("tunnelbroker");
- comm::network::config::ConfigManager::getInstance().load();
- comm::network::AmqpManager::getInstance().init();
- std::thread grpcThread(comm::network::RunServer);
- grpcThread.join();
- return 0;
-}

File Metadata

Mime Type
text/plain
Expires
Mon, Oct 7, 2:35 AM (20 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2252174
Default Alt Text
D5431.id17839.diff (17 KB)

Event Timeline