diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -74,6 +74,7 @@ newNotifToken: &str, ) -> Result<()>; pub fn getMessagesFromDatabase(deviceID: &str) -> Result>; + pub fn sendMessages(messages: &Vec) -> Result>; pub fn eraseMessagesFromAMQP(deviceID: &str) -> Result<()>; pub fn ackMessageFromAMQP(deliveryTag: u64) -> Result<()>; pub fn waitMessageFromDeliveryBroker(deviceID: &str) diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -19,6 +19,7 @@ void updateSessionItemIsOnline(rust::Str sessionID, bool isOnline); void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken); rust::Vec getMessagesFromDatabase(rust::Str deviceID); +rust::Vec sendMessages(const rust::Vec &messages); void eraseMessagesFromAMQP(rust::Str deviceID); void ackMessageFromAMQP(uint64_t deliveryTag); MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -239,3 +239,26 @@ comm::network::DeliveryBroker::DeliveryBroker::getInstance() .deleteQueueIfEmpty(stringDeviceID); } + +rust::Vec sendMessages(const rust::Vec &messages) { + std::vector vectorOfMessages; + rust::Vec messagesIDs; + for (auto &message : messages) { + std::string messageID = comm::network::tools::generateUUID(); + vectorOfMessages.push_back(comm::network::database::MessageItem{ + comm::network::database::MessageItem{ + messageID, + std::string{message.fromDeviceID}, + std::string{message.toDeviceID}, + std::string{message.payload}, + std::string{message.blobHashes}, + }}); + messagesIDs.push_back(rust::String{messageID}); + }; + comm::network::database::DatabaseManager::getInstance() + .putMessageItemsByBatch(vectorOfMessages); + for (auto message : vectorOfMessages) { + comm::network::AmqpManager::getInstance().send(&message); + } + return messagesIDs; +} diff --git a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.h b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.h --- a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.h +++ b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.h @@ -57,7 +57,7 @@ void removePublicKeyItem(const std::string &deviceID); void putMessageItem(const MessageItem &item); - void putMessageItemsByBatch(std::vector &messageItems); + void putMessageItemsByBatch(const std::vector &messageItems); std::shared_ptr findMessageItem(const std::string &toDeviceID, const std::string &messageID); std::vector> diff --git a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp --- a/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp +++ b/services/tunnelbroker/src/libcpp/src/Database/DatabaseManager.cpp @@ -255,9 +255,9 @@ } void DatabaseManager::putMessageItemsByBatch( - std::vector &messageItems) { + const std::vector &messageItems) { std::vector writeRequests; - for (MessageItem &messageItem : messageItems) { + for (MessageItem messageItem : messageItems) { Aws::DynamoDB::Model::PutRequest putRequest; putRequest = this->populatePutRequestFromMessageItem(putRequest, messageItem); diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,9 +1,11 @@ +use crate::cxx_bridge::ffi::MessageItem; + use super::constants; use super::cxx_bridge::ffi::{ ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase, - getSessionItem, newSessionHandler, removeMessages, sessionSignatureHandler, - updateSessionItemDeviceToken, updateSessionItemIsOnline, - waitMessageFromDeliveryBroker, GRPCStatusCodes, + getSessionItem, newSessionHandler, removeMessages, sendMessages, + sessionSignatureHandler, updateSessionItemDeviceToken, + updateSessionItemIsOnline, waitMessageFromDeliveryBroker, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; @@ -300,7 +302,45 @@ }; } } - MessagesToSend(_) => (), + MessagesToSend(messages_to_send) => { + let mut messages_vec = vec![]; + for message in messages_to_send.messages { + messages_vec.push(MessageItem { + messageID: String::new(), + fromDeviceID: session_item.deviceID.clone(), + toDeviceID: message.to_device_id, + payload: message.payload, + blobHashes: String::new(), + deliveryTag: 0, + }); + } + let messages_ids = match sendMessages(&messages_vec) { + Err(err) => { + error!("Error on sending messages: {}", err.what()); + return; + } + Ok(ids) => ids, + }; + if let Err(err) = tx_writer( + &session_id, + &tx, + Ok(tunnelbroker::MessageToClient { + data: Some( + tunnelbroker::message_to_client::Data::ProcessedMessages( + tunnelbroker::ProcessedMessages { + message_id: messages_ids, + }, + ), + ), + }), + ) + .await + { + debug!( + "Error on sending back processed messages IDs to the stream: {}", + err); + }; + } ProcessedMessages(processed_messages) => { if let Err(err) = removeMessages( &session_item.deviceID,