Page MenuHomePhabricator

D5580.diff
No OneTemporary

D5580.diff

diff --git a/services/tunnelbroker/src/cxx_bridge.rs b/services/tunnelbroker/src/cxx_bridge.rs
--- a/services/tunnelbroker/src/cxx_bridge.rs
+++ b/services/tunnelbroker/src/cxx_bridge.rs
@@ -46,6 +46,7 @@
toDeviceID: String,
payload: String,
blobHashes: String,
+ deliveryTag: u64,
}
unsafe extern "C++" {
@@ -74,5 +75,8 @@
) -> Result<()>;
pub fn getMessagesFromDatabase(deviceID: &str) -> Result<Vec<MessageItem>>;
pub fn eraseMessagesFromAMQP(deviceID: &str) -> Result<()>;
+ pub fn ackMessageFromAMQP(deliveryTag: u64) -> Result<()>;
+ pub fn waitMessageFromDeliveryBroker(deviceID: &str)
+ -> Result<MessageItem>;
}
}
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.h b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.h
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.h
@@ -20,3 +20,5 @@
void updateSessionItemDeviceToken(rust::Str sessionID, rust::Str newNotifToken);
rust::Vec<MessageItem> getMessagesFromDatabase(rust::Str deviceID);
void eraseMessagesFromAMQP(rust::Str deviceID);
+void ackMessageFromAMQP(uint64_t deliveryTag);
+MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID);
diff --git a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
--- a/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
+++ b/services/tunnelbroker/src/libcpp/Tunnelbroker.cpp
@@ -205,3 +205,17 @@
void eraseMessagesFromAMQP(rust::Str deviceID) {
comm::network::DeliveryBroker::getInstance().erase(std::string{deviceID});
}
+
+void ackMessageFromAMQP(uint64_t deliveryTag) {
+ comm::network::AmqpManager::getInstance().ack(deliveryTag);
+}
+
+MessageItem waitMessageFromDeliveryBroker(rust::Str deviceID) {
+ const auto message =
+ comm::network::DeliveryBroker::getInstance().pop(std::string{deviceID});
+ return MessageItem{
+ .messageID = message.messageID,
+ .fromDeviceID = message.fromDeviceID,
+ .payload = message.payload,
+ .deliveryTag = message.deliveryTag};
+}
diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs
--- a/services/tunnelbroker/src/server/mod.rs
+++ b/services/tunnelbroker/src/server/mod.rs
@@ -1,8 +1,9 @@
use super::constants;
use super::cxx_bridge::ffi::{
- eraseMessagesFromAMQP, getMessagesFromDatabase, getSessionItem,
- newSessionHandler, sessionSignatureHandler, updateSessionItemDeviceToken,
- updateSessionItemIsOnline, GRPCStatusCodes,
+ ackMessageFromAMQP, eraseMessagesFromAMQP, getMessagesFromDatabase,
+ getSessionItem, newSessionHandler, sessionSignatureHandler,
+ updateSessionItemDeviceToken, updateSessionItemIsOnline,
+ waitMessageFromDeliveryBroker, GRPCStatusCodes,
};
use anyhow::Result;
use futures::Stream;
@@ -215,6 +216,55 @@
}
});
+ // Spawning asynchronous Tokio task to deliver new messages
+ // to the client from delivery broker
+ tokio::spawn({
+ let device_id = session_item.deviceID.clone();
+ let session_id = session_id.clone();
+ let tx = tx.clone();
+ async move {
+ loop {
+ let message_to_deliver =
+ match waitMessageFromDeliveryBroker(&device_id) {
+ Ok(message_item) => message_item,
+ Err(err) => {
+ error!(
+ "Error on waiting messages from DeliveryBroker: {}",
+ err.what()
+ );
+ return;
+ }
+ };
+ let writer_result = tx_writer(
+ &session_id,
+ &tx,
+ Ok(tunnelbroker::MessageToClient {
+ data: Some(
+ tunnelbroker::message_to_client::Data::MessagesToDeliver(
+ tunnelbroker::MessagesToDeliver {
+ messages: vec![tunnelbroker::MessageToClientStruct {
+ message_id: message_to_deliver.messageID,
+ from_device_id: message_to_deliver.fromDeviceID,
+ payload: message_to_deliver.payload,
+ blob_hashes: vec![message_to_deliver.blobHashes],
+ }],
+ },
+ ),
+ ),
+ }),
+ );
+ if let Err(err) = writer_result.await {
+ debug!("Error on writing to the stream: {}", err);
+ return;
+ };
+ if let Err(err) = ackMessageFromAMQP(message_to_deliver.deliveryTag) {
+ debug!("Error on message acknowledgement in AMQP queue: {}", err);
+ return;
+ };
+ }
+ }
+ });
+
let mut input_stream = request.into_inner();
// Spawning asynchronous Tokio task for handling incoming messages from the client
tokio::spawn(async move {

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 8:15 PM (8 h, 19 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2564622
Default Alt Text
D5580.diff (4 KB)

Event Timeline