diff --git a/services/tunnelbroker/rust-lib/src/lib.rs b/services/tunnelbroker/rust-lib/src/lib.rs index ca5179527..a8734e606 100644 --- a/services/tunnelbroker/rust-lib/src/lib.rs +++ b/services/tunnelbroker/rust-lib/src/lib.rs @@ -1,82 +1,85 @@ use crate::ffi::{apns_status, fcm_status}; use anyhow::Result; use env_logger; use lazy_static::lazy_static; use log::info; use tokio::runtime::Runtime; pub mod notifications; #[cxx::bridge] mod ffi { + #[namespace = "rust::notifications"] #[cxx_name = "apnsReturnStatus"] enum apns_status { Ok, Unregistered, BadDeviceToken, } + #[namespace = "rust::notifications"] #[cxx_name = "fcmReturnStatus"] enum fcm_status { Ok, InvalidRegistration, NotRegistered, } + #[namespace = "rust::notifications"] extern "Rust" { #[cxx_name = "sendNotifToAPNS"] fn send_notif_to_apns( certificate_path: &str, certificate_password: &str, device_token: &str, topic: &str, message: &str, sandbox: bool, ) -> Result; #[cxx_name = "sendNotifToFCM"] fn send_notif_to_fcm( fcm_api_key: &str, device_registration_id: &str, message_title: &str, message_body: &str, ) -> Result; } } lazy_static! { pub static ref RUNTIME: Runtime = { env_logger::init(); info!("Tokio runtime initialization"); Runtime::new().unwrap() }; } pub fn send_notif_to_apns( certificate_path: &str, certificate_password: &str, device_token: &str, topic: &str, message: &str, sandbox: bool, ) -> Result { RUNTIME.block_on(notifications::apns::send_by_a2_client( certificate_path, certificate_password, device_token, topic, message, sandbox, )) } pub fn send_notif_to_fcm( fcm_api_key: &str, device_registration_id: &str, message_title: &str, message_body: &str, ) -> Result { RUNTIME.block_on(notifications::fcm::send_by_fcm_client( fcm_api_key, device_registration_id, message_title, message_body, )) } diff --git a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp index 36f6f59b2..1578e552d 100644 --- a/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp +++ b/services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp @@ -1,357 +1,361 @@ #include "TunnelbrokerServiceImpl.h" #include "AmqpManager.h" #include "AwsTools.h" #include "ConfigManager.h" #include "CryptoTools.h" #include "DatabaseManager.h" #include "DeliveryBroker.h" #include "GlobalTools.h" #include "Tools.h" #include "rust-lib/src/lib.rs.h" #include "rust/cxx.h" #include #include #include #include #include namespace comm { namespace network { TunnelBrokerServiceImpl::TunnelBrokerServiceImpl() { Aws::InitAPI({}); // List of AWS DynamoDB tables to check if they are created and can be // accessed before any AWS API methods const std::list tablesList = { config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_VERIFICATION_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_PUBLIC_KEY_TABLE), config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_MESSAGES_TABLE)}; for (const std::string &table : tablesList) { if (!database::DatabaseManager::getInstance().isTableAvailable(table)) { throw std::runtime_error( "Error: AWS DynamoDB table '" + table + "' is not available"); } }; }; TunnelBrokerServiceImpl::~TunnelBrokerServiceImpl() { Aws::ShutdownAPI({}); }; grpc::Status TunnelBrokerServiceImpl::SessionSignature( grpc::ServerContext *context, const tunnelbroker::SessionSignatureRequest *request, tunnelbroker::SessionSignatureResponse *reply) { const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string toSign = tools::generateRandomString(SIGNATURE_REQUEST_LENGTH); std::shared_ptr SessionSignItem = std::make_shared(toSign, deviceID); database::DatabaseManager::getInstance().putSessionSignItem(*SessionSignItem); reply->set_tosign(toSign); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::NewSession( grpc::ServerContext *context, const tunnelbroker::NewSessionRequest *request, tunnelbroker::NewSessionResponse *reply) { std::shared_ptr deviceSessionItem; std::shared_ptr sessionSignItem; std::shared_ptr publicKeyItem; const std::string deviceID = request->deviceid(); if (!tools::validateDeviceID(deviceID)) { return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for deviceID"); } const std::string signature = request->signature(); const std::string publicKey = request->publickey(); const std::string newSessionID = tools::generateUUID(); try { sessionSignItem = database::DatabaseManager::getInstance().findSessionSignItem(deviceID); if (sessionSignItem == nullptr) { return grpc::Status( grpc::StatusCode::NOT_FOUND, "Session sign request not found"); } publicKeyItem = database::DatabaseManager::getInstance().findPublicKeyItem(deviceID); if (publicKeyItem == nullptr) { std::shared_ptr newPublicKeyItem = std::make_shared(deviceID, publicKey); database::DatabaseManager::getInstance().putPublicKeyItem( *newPublicKeyItem); } else if (publicKey != publicKeyItem->getPublicKey()) { return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "The public key doesn't match for deviceID"); } const std::string verificationMessage = sessionSignItem->getSign(); if (!comm::network::crypto::rsaVerifyString( publicKey, verificationMessage, signature)) { return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "Signature for the verification message is not valid"); } database::DatabaseManager::getInstance().removeSessionSignItem(deviceID); deviceSessionItem = std::make_shared( newSessionID, deviceID, request->publickey(), request->notifytoken(), tunnelbroker::NewSessionRequest_DeviceTypes_Name(request->devicetype()), request->deviceappversion(), request->deviceos()); database::DatabaseManager::getInstance().putSessionItem(*deviceSessionItem); } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'NewSession' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } reply->set_sessionid(newSessionID); return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Send( grpc::ServerContext *context, const tunnelbroker::SendRequest *request, google::protobuf::Empty *reply) { try { const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } const std::string clientDeviceID = sessionItem->getDeviceID(); const std::string messageID = tools::generateUUID(); const std::string notifyToken = sessionItem->getNotifyToken(); const std::string deviceOs = sessionItem->getDeviceOs(); const database::MessageItem message( messageID, clientDeviceID, request->todeviceid(), request->payload(), ""); database::DatabaseManager::getInstance().putMessageItem(message); if (!AmqpManager::getInstance().send(&message)) { LOG(ERROR) << "gRPC: " << "Error while publish the message to AMQP"; return grpc::Status( grpc::StatusCode::INTERNAL, "Error while publish the message to AMQP"); } if (!sessionItem->getIsOnline()) { const std::string notificationMessageTitle = "New message"; const std::string notificationMessageText = "You have a new message"; if (deviceOs == "iOS" && !notifyToken.empty()) { - const apnsReturnStatus apnsResult = sendNotifToAPNS( - config::ConfigManager::getInstance().getParameter( - config::ConfigManager::OPTION_NOTIFS_APNS_P12_CERT_PATH), - config::ConfigManager::getInstance().getParameter( - config::ConfigManager::OPTION_NOTIFS_APNS_P12_CERT_PASSWORD), - notifyToken, - config::ConfigManager::getInstance().getParameter( - config::ConfigManager::OPTION_NOTIFS_APNS_TOPIC), - notificationMessageText, - false); + typedef rust::notifications::apnsReturnStatus apnsReturnStatus; + const apnsReturnStatus apnsResult = + rust::notifications::sendNotifToAPNS( + config::ConfigManager::getInstance().getParameter( + config::ConfigManager::OPTION_NOTIFS_APNS_P12_CERT_PATH), + config::ConfigManager::getInstance().getParameter( + config::ConfigManager:: + OPTION_NOTIFS_APNS_P12_CERT_PASSWORD), + notifyToken, + config::ConfigManager::getInstance().getParameter( + config::ConfigManager::OPTION_NOTIFS_APNS_TOPIC), + notificationMessageText, + false); if ((apnsResult == apnsReturnStatus::Unregistered || apnsResult == apnsReturnStatus::BadDeviceToken) && !database::DatabaseManager::getInstance() .updateSessionItemDeviceToken(sessionID, "")) { return grpc::Status( grpc::StatusCode::INTERNAL, "Can't clear the device token in database"); } } else if (deviceOs == "Android" && !notifyToken.empty()) { - const fcmReturnStatus fcmResult = sendNotifToFCM( + typedef rust::notifications::fcmReturnStatus fcmReturnStatus; + const fcmReturnStatus fcmResult = rust::notifications::sendNotifToFCM( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_NOTIFS_FCM_SERVER_KEY), notifyToken, notificationMessageTitle, notificationMessageText); if ((fcmResult == fcmReturnStatus::InvalidRegistration || fcmResult == fcmReturnStatus::NotRegistered) && !database::DatabaseManager::getInstance() .updateSessionItemDeviceToken(sessionID, "")) { return grpc::Status( grpc::StatusCode::INTERNAL, "Can't clear the device token in database"); } } } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: " << "Error while processing 'Send' request: " << e.what(); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; }; grpc::Status TunnelBrokerServiceImpl::Get( grpc::ServerContext *context, const tunnelbroker::GetRequest *request, grpc::ServerWriter *writer) { std::mutex writerMutex; std::atomic_bool writerIsReady{true}; const std::string sessionID = request->sessionid(); if (!tools::validateSessionID(sessionID)) { return grpc::Status( grpc::StatusCode::INVALID_ARGUMENT, "Format validation failed for sessionID"); } // Thread-safe response writer auto respondToWriter = [&](tunnelbroker::GetResponse response) { std::lock_guard lock(writerMutex); if (!writerIsReady) { return false; } if (!writer->Write(response)) { writerIsReady = false; database::DatabaseManager::getInstance().updateSessionItemIsOnline( sessionID, false); return false; } return true; }; // Keep-alive detection pinging thread tunnelbroker::GetResponse pingResponse; pingResponse.mutable_ping(); auto sendingPings = [&]() { while (respondToWriter(pingResponse)) { std::this_thread::sleep_for( std::chrono::milliseconds(DEVICE_ONLINE_PING_INTERVAL_MS)); } LOG(INFO) << "gRPC 'Get' handler write error on sending ping to the client"; }; try { std::shared_ptr sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); if (sessionItem == nullptr) { return grpc::Status( grpc::StatusCode::PERMISSION_DENIED, "No such session found. SessionID: " + sessionID); } tunnelbroker::GetResponse response; // Handling of device notification token expiration and update if (request->has_newnotifytoken()) { if (!database::DatabaseManager::getInstance() .updateSessionItemDeviceToken( sessionID, request->newnotifytoken())) { return grpc::Status( grpc::StatusCode::INTERNAL, "Can't update device token in the database"); } sessionItem = database::DatabaseManager::getInstance().findSessionItem(sessionID); } else if (sessionItem->getNotifyToken().empty()) { response.mutable_newnotifytokenrequired(); if (!writer->Write(response)) { throw std::runtime_error( "gRPC writer error on sending data to the client"); } response.Clear(); } const std::string clientDeviceID = sessionItem->getDeviceID(); DeliveryBrokerMessage messageToDeliver; std::vector> messagesFromDatabase = database::DatabaseManager::getInstance().findMessageItemsByReceiver( clientDeviceID); if (messagesFromDatabase.size() > 0) { // When a client connects and requests GET for the messages first we check // if there are undelivered messages in the database. If so, we are // erasing the messages to deliver from rabbitMQ which are handled by // DeliveryBroker. DeliveryBroker::getInstance().erase(clientDeviceID); } database::DatabaseManager::getInstance().updateSessionItemIsOnline( sessionID, true); for (auto &messageFromDatabase : messagesFromDatabase) { tunnelbroker::GetResponse response; response.mutable_responsemessage()->set_fromdeviceid( messageFromDatabase->getFromDeviceID()); response.mutable_responsemessage()->set_payload( messageFromDatabase->getPayload()); if (!respondToWriter(response)) { return grpc::Status( grpc::StatusCode::INTERNAL, "Channel writer is unavailable"); } database::DatabaseManager::getInstance().removeMessageItem( clientDeviceID, messageFromDatabase->getMessageID()); } // We are starting the pinging thread and sending pings only after // messages from the database was delivered and we are waiting for the new // messages to come to check is connection alive. std::thread pingThread(sendingPings); while (1) { messageToDeliver = DeliveryBroker::getInstance().pop(clientDeviceID); tunnelbroker::GetResponse response; response.mutable_responsemessage()->set_fromdeviceid( messageToDeliver.fromDeviceID); response.mutable_responsemessage()->set_payload(messageToDeliver.payload); if (!respondToWriter(response)) { pingThread.join(); return grpc::Status( grpc::StatusCode::INTERNAL, "Channel writer is unavailable"); } comm::network::AmqpManager::getInstance().ack( messageToDeliver.deliveryTag); database::DatabaseManager::getInstance().removeMessageItem( clientDeviceID, messageToDeliver.messageID); // If messages queue for `clientDeviceID` is empty we don't need to store // `folly::MPMCQueue` for it and need to free memory to fix possible // 'ghost' queues in DeliveryBroker. // We call `deleteQueueIfEmpty()` for this purpose here. DeliveryBroker::getInstance().deleteQueueIfEmpty(clientDeviceID); } } catch (std::runtime_error &e) { LOG(ERROR) << "gRPC: Runtime error while processing 'Get' request: " << e.what(); database::DatabaseManager::getInstance().updateSessionItemIsOnline( sessionID, false); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } catch (...) { LOG(ERROR) << "gRPC: Unknown error while processing 'Get' request"; database::DatabaseManager::getInstance().updateSessionItemIsOnline( sessionID, false); return grpc::Status(grpc::StatusCode::INTERNAL, "Unknown error"); } return grpc::Status::OK; }; } // namespace network } // namespace comm