Page MenuHomePhabricator

D5584.id18309.diff
No OneTemporary

D5584.id18309.diff

diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -57,5 +57,9 @@
pub fn ackMessageFromAMQP(deliveryTag: u64) -> Result<()>;
pub fn waitMessageFromDeliveryBroker(deviceID: &str)
-> Result<MessageItem>;
+ pub fn removeMessages(
+ deviceID: &str,
+ messagesIDs: &Vec<String>,
+ ) -> Result<()>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -20,3 +20,6 @@
void eraseMessagesFromAMQP(rust::Str deviceID);
void ackMessageFromAMQP(uint64_t deliveryTag);
MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID);
+void removeMessages(
+ rust::Str deviceID,
+ const rust::Vec<rust::String> &messagesIDs);
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -205,3 +205,23 @@
.payload = message.payload,
.deliveryTag = message.deliveryTag};
}
+
+void removeMessages(
+ rust::Str deviceID,
+ const rust::Vec<rust::String> &messagesIDs) {
+ std::vector<std::string> vectorOfmessagesIDs;
+ std::string stringDeviceID = std::string{deviceID};
+ for (auto id : messagesIDs) {
+ vectorOfmessagesIDs.push_back(std::string{id});
+ };
+ comm::network::database::DatabaseManager::getInstance()
+ .removeMessageItemsByIDsForDeviceID(vectorOfmessagesIDs, stringDeviceID);
+
+ // If messages queue for `deviceID` is empty we don't need to store
+ // `folly::MPMCQueue` for it and need to free memory to fix possible
+ // 'ghost' queues in DeliveryBroker.
+ // We call `deleteQueueIfEmpty()` for this purpose here after removing
+ // messages.
+ comm::network::DeliveryBroker::DeliveryBroker::getInstance()
+ .deleteQueueIfEmpty(stringDeviceID);
+}
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,7 +1,7 @@
use super::constants;
use super::cxx_bridge::ffi::{
ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase,
- getSessionItem, newSessionHandler, sessionSignatureHandler,
+ getSessionItem, newSessionHandler, removeMessages, sessionSignatureHandler,
updateSessionItemDeviceToken, updateSessionItemIsOnline,
waitMessageFromDeliveryBroker,
};
@@ -284,7 +284,17 @@
}
}
MessagesToSend(_) => (),
- ProcessedMessages(_) => (),
+ ProcessedMessages(processed_messages) => {
+ if let Err(err) = removeMessages(
+ &session_item.deviceID,
+ &processed_messages.message_id,
+ ) {
+ error!(
+ "Error removing messages from the database: {}",
+ err.what()
+ );
+ };
+ }
}
}
}

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 5:41 PM (21 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2585624
Default Alt Text
D5584.id18309.diff (3 KB)

Event Timeline