diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs --- a/services/tunnelbroker/src/cxx_bridge.rs +++ b/services/tunnelbroker/src/cxx_bridge.rs @@ -84,5 +84,6 @@ deviceID: &str, messagesIDs: &Vec, ) -> Result<()>; + pub fn deleteDeliveryBrokerQueueIfEmpty(deviceID: &str) -> Result<()>; } } diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h @@ -27,3 +27,4 @@ void removeMessages( rust::Str deviceID, const rust::Vec &messagesIDs); +void deleteDeliveryBrokerQueueIfEmpty(rust::Str deviceID); diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp --- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp +++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp @@ -231,20 +231,22 @@ rust::Str deviceID, const rust::Vec &messagesIDs) { std::vector vectorOfmessagesIDs; - std::string stringDeviceID = std::string{deviceID}; for (auto id : messagesIDs) { vectorOfmessagesIDs.push_back(std::string{id}); }; comm::network::database::DatabaseManager::getInstance() - .removeMessageItemsByIDsForDeviceID(vectorOfmessagesIDs, stringDeviceID); + .removeMessageItemsByIDsForDeviceID( + vectorOfmessagesIDs, std::string{deviceID}); +} +void deleteDeliveryBrokerQueueIfEmpty(rust::Str deviceID) { // If messages queue for `deviceID` is empty we don't need to store // `folly::MPMCQueue` for it and need to free memory to fix possible // 'ghost' queues in DeliveryBroker. // We call `deleteQueueIfEmpty()` for this purpose here after removing // messages. comm::network::DeliveryBroker::DeliveryBroker::getInstance() - .deleteQueueIfEmpty(stringDeviceID); + .deleteQueueIfEmpty(std::string{deviceID}); } rust::Vec sendMessages(const rust::Vec &messages) { diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -2,11 +2,11 @@ use super::constants; use super::cxx_bridge::ffi::{ - ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase, - getSavedNonceToSign, getSessionItem, isConfigParameterSet, newSessionHandler, - removeMessages, sendMessages, sessionSignatureHandler, - updateSessionItemDeviceToken, updateSessionItemIsOnline, - waitMessageFromDeliveryBroker, GRPCStatusCodes, + ackMessageFromAMQP, deleteDeliveryBrokerQueueIfEmpty, eraseMessagesFromAMQP, + getMessagesFromDatabase, getSavedNonceToSign, getSessionItem, + isConfigParameterSet, newSessionHandler, removeMessages, sendMessages, + sessionSignatureHandler, updateSessionItemDeviceToken, + updateSessionItemIsOnline, waitMessageFromDeliveryBroker, GRPCStatusCodes, }; use anyhow::Result; use futures::Stream; @@ -387,6 +387,14 @@ err.what() ); }; + if let Err(err) = + deleteDeliveryBrokerQueueIfEmpty(&session_item.deviceID) + { + error!( + "Error deleting delivery broker queue when empty: {}", + err.what() + ); + } } } }