diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -46,6 +46,7 @@ toDeviceID: String, payload: String, blobHashes: String, + deliveryTag: u64, } unsafe extern "C++" { @@ -74,5 +75,8 @@ ) -> Result<()>; pub fn getMessagesFromDatabase(deviceID: &str) -> Result>; pub fn eraseMessagesFromAMQP(deviceID: &str) -> Result<()>; + pub fn ackMessageFromAMQP(deliveryTag: u64) -> Result<()>; + pub fn waitMessageFromDeliveryBroker(deviceID: &str) + -> Result; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -20,3 +20,5 @@ void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken); rust::Vec getMessagesFromDatabase(rust::Str deviceID); void eraseMessagesFromAMQP(rust::Str deviceID); +void ackMessageFromAMQP(uint64_t deliveryTag); +MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -205,3 +205,17 @@ void eraseMessagesFromAMQP(rust::Str deviceID) { comm::network::DeliveryBroker::getInstance().erase(std::string{deviceID}); } + +void ackMessageFromAMQP(uint64_t deliveryTag) { + comm::network::AmqpManager::getInstance().ack(deliveryTag); +} + +MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID) { + const auto message = + comm::network::DeliveryBroker::getInstance().pop(std::string{deviceID}); + return MessageItem{ + .messageID = message.messageID, + .fromDeviceID = message.fromDeviceID, + .payload = message.payload, + .deliveryTag = message.deliveryTag}; +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,8 +1,9 @@ use super::constants; use super::cxx_bridge::ffi::{ - eraseMessagesFromAMQP, getMessagesFromDatabase, getSessionItem, - newSessionHandler, sessionSignatureHandler, updateSessionItemDeviceToken, - updateSessionItemIsOnline, GRPCStatusCodes, + ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase, + getSessionItem, newSessionHandler, sessionSignatureHandler, + updateSessionItemDeviceToken, updateSessionItemIsOnline, + waitMessageFromDeliveryBroker, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; @@ -215,6 +216,55 @@ } }); + // Spawning asynchronous Tokio task to deliver new messages + // to the client from delivery broker + tokio::spawn({ + let device_id = session_item.deviceID.clone(); + let session_id = session_id.clone(); + let tx = tx.clone(); + async move { + loop { + let message_to_deliver = + match waitMessageFromDeliveryBroker(&device_id) { + Ok(message_item) => message_item, + Err(err) => { + error!( + "Error on waiting messages from DeliveryBroker: {}", + err.what() + ); + return; + } + }; + let writer_result = tx_writer( + &session_id, + &tx, + Ok(tunnelbroker::MessageToClient { + data: Some( + tunnelbroker::message_to_client::Data::MessagesToDeliver( + tunnelbroker::MessagesToDeliver { + messages: vec![tunnelbroker::MessageToClientStruct { + message_id: message_to_deliver.messageID, + from_device_id: message_to_deliver.fromDeviceID, + payload: message_to_deliver.payload, + blob_hashes: vec![message_to_deliver.blobHashes], + }], + }, + ), + ), + }), + ); + if let Err(err) = writer_result.await { + debug!("Error on writing to the stream: {}", err); + return; + }; + if let Err(err) = ackMessageFromAMQP(message_to_deliver.deliveryTag) { + debug!("Error on message acknowledgement in AMQP queue: {}", err); + return; + }; + } + } + }); + let mut input_stream = request.into_inner(); // Spawning asynchronous Tokio task for handling incoming messages from the client tokio::spawn(async move {