diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -78,5 +78,9 @@ pub fn ackMessageFromAMQP(deliveryTag: u64) -> Result<()>; pub fn waitMessageFromDeliveryBroker(deviceID: &str) -> Result; + pub fn removeMessages( + deviceID: &str, + messagesIDs: &Vec, + ) -> Result<()>; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -22,3 +22,6 @@ void eraseMessagesFromAMQP(rust::Str deviceID); void ackMessageFromAMQP(uint64_t deliveryTag); MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID); +void removeMessages( + rust::Str deviceID, + const rust::Vec &messagesIDs); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -219,3 +219,23 @@ .payload = message.payload, .deliveryTag = message.deliveryTag}; } + +void removeMessages( + rust::Str deviceID, + const rust::Vec &messagesIDs) { + std::vector vectorOfmessagesIDs; + std::string stringDeviceID = std::string{deviceID}; + for (auto id : messagesIDs) { + vectorOfmessagesIDs.push_back(std::string{id}); + }; + comm::network::database::DatabaseManager::getInstance() + .removeMessageItemsByIDsForDeviceID(vectorOfmessagesIDs, stringDeviceID); + + // If messages queue for `deviceID` is empty we don't need to store + // `folly::MPMCQueue` for it and need to free memory to fix possible + // 'ghost' queues in DeliveryBroker. + // We call `deleteQueueIfEmpty()` for this purpose here after removing + // messages. + comm::network::DeliveryBroker::DeliveryBroker::getInstance() + .deleteQueueIfEmpty(stringDeviceID); +} diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -1,7 +1,7 @@ use super::constants; use super::cxx_bridge::ffi::{ ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase, - getSessionItem, newSessionHandler, sessionSignatureHandler, + getSessionItem, newSessionHandler, removeMessages, sessionSignatureHandler, updateSessionItemDeviceToken, updateSessionItemIsOnline, waitMessageFromDeliveryBroker, GRPCStatusCodes, }; @@ -301,7 +301,17 @@ } } MessagesToSend(_) => (), - ProcessedMessages(_) => (), + ProcessedMessages(processed_messages) => { + if let Err(err) = removeMessages( + &session_item.deviceID, + &processed_messages.message_id, + ) { + error!( + "Error removing messages from the database: {}", + err.what() + ); + }; + } } } }