Page MenuHomePhorge

D15370.1765044668.diff
No OneTemporary

Size
10 KB
Referenced Files
None
Subscribers
None

D15370.1765044668.diff

diff --git a/services/tunnelbroker/src/amqp_client/amqp.rs b/services/tunnelbroker/src/amqp_client/amqp.rs
--- a/services/tunnelbroker/src/amqp_client/amqp.rs
+++ b/services/tunnelbroker/src/amqp_client/amqp.rs
@@ -1,18 +1,12 @@
-use crate::constants::{error_types, CLIENT_RMQ_MSG_PRIORITY};
-use crate::database::DatabaseClient;
+use crate::constants::error_types;
use crate::CONFIG;
-use comm_lib::aws::ddb::error::SdkError;
-use comm_lib::aws::ddb::operation::put_item::PutItemError;
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
-use lapin::{options::BasicPublishOptions, BasicProperties};
use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
-use tunnelbroker_messages::MessageToDevice;
-use uuid;
static AMQP_URI: Lazy<AMQPUri> = Lazy::new(|| {
let mut amqp_uri = CONFIG
@@ -137,6 +131,7 @@
///
/// TODO: Add support for restoring channel topology (queues and consumers)
/// (`lapin` has this built-in, but it's internal crate feature)
+#[derive(Clone)]
pub struct AmqpChannel {
conn: AmqpConnection,
channel: Arc<Mutex<Option<lapin::Channel>>>,
@@ -204,48 +199,3 @@
fn from_env(var_name: &str) -> Option<String> {
std::env::var(var_name).ok().filter(|s| !s.is_empty())
}
-
-#[derive(
- Debug, derive_more::Display, derive_more::From, derive_more::Error,
-)]
-pub enum SendMessageError {
- PersistenceError(SdkError<PutItemError>),
- SerializationError(serde_json::Error),
- AmqpError(lapin::Error),
-}
-pub async fn send_message_to_device(
- database_client: &DatabaseClient,
- amqp_channel: &AmqpChannel,
- device_id: String,
- payload: String,
-) -> Result<(), SendMessageError> {
- debug!("Received message for {}", &device_id);
-
- let client_message_id = uuid::Uuid::new_v4().to_string();
-
- let message_id = database_client
- .persist_message(&device_id, &payload, &client_message_id)
- .await?;
-
- let message_to_device = MessageToDevice {
- device_id: device_id.clone(),
- payload,
- message_id,
- };
-
- let serialized_message = serde_json::to_string(&message_to_device)?;
-
- amqp_channel
- .get()
- .await?
- .basic_publish(
- "",
- &device_id,
- BasicPublishOptions::default(),
- serialized_message.as_bytes(),
- BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
- )
- .await?;
-
- Ok(())
-}
diff --git a/services/tunnelbroker/src/amqp_client/mod.rs b/services/tunnelbroker/src/amqp_client/mod.rs
--- a/services/tunnelbroker/src/amqp_client/mod.rs
+++ b/services/tunnelbroker/src/amqp_client/mod.rs
@@ -17,6 +17,7 @@
use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest};
pub mod amqp;
+pub mod utils;
pub struct AmqpClient {
db_client: DatabaseClient,
diff --git a/services/tunnelbroker/src/amqp_client/utils.rs b/services/tunnelbroker/src/amqp_client/utils.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/amqp_client/utils.rs
@@ -0,0 +1,108 @@
+use comm_lib::aws::ddb::{error::SdkError, operation::put_item::PutItemError};
+use lapin::{options::BasicPublishOptions, BasicProperties};
+use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest};
+
+use crate::{constants::CLIENT_RMQ_MSG_PRIORITY, database::DatabaseClient};
+
+use super::amqp::AmqpChannel;
+
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum SendMessageError {
+ PersistenceError(SdkError<PutItemError>),
+ SerializationError(serde_json::Error),
+ AmqpError(lapin::Error),
+}
+
+#[derive(Clone)]
+pub struct BasicMessageSender {
+ db_client: DatabaseClient,
+ amqp_channel: AmqpChannel,
+}
+
+impl BasicMessageSender {
+ pub fn new(
+ database_client: &DatabaseClient,
+ amqp_channel: AmqpChannel,
+ ) -> Self {
+ Self {
+ db_client: database_client.clone(),
+ amqp_channel,
+ }
+ }
+
+ pub async fn send_message_to_device(
+ &self,
+ message_request: &MessageToDeviceRequest,
+ ) -> Result<(), SendMessageError> {
+ let MessageToDeviceRequest {
+ client_message_id,
+ device_id,
+ payload,
+ } = message_request;
+
+ send_message_to_device(
+ &self.db_client,
+ &self.amqp_channel,
+ device_id.clone(),
+ payload.clone(),
+ Some(client_message_id),
+ )
+ .await
+ }
+
+ pub async fn simple_send_message_to_device(
+ &self,
+ recipient_device_id: &str,
+ payload: String,
+ ) -> Result<(), SendMessageError> {
+ self
+ .send_message_to_device(&tunnelbroker_messages::MessageToDeviceRequest {
+ client_message_id: uuid::Uuid::new_v4().to_string(),
+ device_id: recipient_device_id.to_string(),
+ payload,
+ })
+ .await
+ }
+}
+
+pub async fn send_message_to_device(
+ database_client: &DatabaseClient,
+ amqp_channel: &AmqpChannel,
+ device_id: String,
+ payload: String,
+ client_message_id: Option<&String>,
+) -> Result<(), SendMessageError> {
+ tracing::debug!("Received message for {}", &device_id);
+
+ let client_message_id = client_message_id
+ .cloned()
+ .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
+
+ let message_id = database_client
+ .persist_message(&device_id, &payload, &client_message_id)
+ .await?;
+
+ let message_to_device = MessageToDevice {
+ device_id: device_id.clone(),
+ payload,
+ message_id,
+ };
+
+ let serialized_message = serde_json::to_string(&message_to_device)?;
+
+ amqp_channel
+ .get()
+ .await?
+ .basic_publish(
+ "",
+ &device_id,
+ BasicPublishOptions::default(),
+ serialized_message.as_bytes(),
+ BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY),
+ )
+ .await?;
+
+ Ok(())
+}
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -2,9 +2,8 @@
tonic::include_proto!("tunnelbroker");
}
-use crate::amqp_client::amqp::{
- send_message_to_device, AmqpChannel, AmqpConnection, SendMessageError,
-};
+use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection};
+use crate::amqp_client::utils::{send_message_to_device, SendMessageError};
use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG};
use crate::database::{handle_ddb_error, DatabaseClient};
use crate::{constants, CONFIG};
@@ -44,6 +43,7 @@
&self.amqp_channel,
message.device_id,
message.payload,
+ None,
)
.await
.map_err(|e| match e {
diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs
--- a/services/tunnelbroker/src/token_distributor/token_connection.rs
+++ b/services/tunnelbroker/src/token_distributor/token_connection.rs
@@ -1,6 +1,5 @@
-use crate::amqp_client::amqp::{
- send_message_to_device, AmqpChannel, AmqpConnection,
-};
+use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection};
+use crate::amqp_client::utils::BasicMessageSender;
use crate::config::CONFIG;
use crate::constants::MEDIA_MIRROR_TIMEOUT;
use crate::database::DatabaseClient;
@@ -13,6 +12,7 @@
use futures_util::{SinkExt, StreamExt};
use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient;
use grpc_clients::identity::protos::auth::PeersDeviceListsRequest;
+use grpc_clients::identity::DeviceType;
use lapin::{options::*, types::FieldTable, ExchangeKind};
use std::time::Duration;
use tokio::time::{interval, Instant};
@@ -31,7 +31,7 @@
token_data: String,
amqp_connection: AmqpConnection,
grpc_client: ChainedInterceptedServicesAuthClient,
- amqp_channel: AmqpChannel,
+ message_sender: BasicMessageSender,
blob_client: S2SAuthedBlobClient,
}
@@ -46,6 +46,9 @@
grpc_client: ChainedInterceptedServicesAuthClient,
auth_service: &AuthService,
) {
+ let message_sender =
+ BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection));
+
let connection = Self {
db: db.clone(),
config: config.clone(),
@@ -53,11 +56,11 @@
token_data,
amqp_connection: amqp_connection.clone(),
grpc_client,
- amqp_channel: AmqpChannel::new(&amqp_connection),
blob_client: S2SAuthedBlobClient::new(
auth_service,
CONFIG.blob_service_url.clone(),
),
+ message_sender,
};
tokio::spawn(async move {
@@ -543,28 +546,35 @@
message: direct_cast_message,
};
if let Some(user_devices) = user_devices_option {
- for (device_id, platform_details) in
- &user_devices.devices_platform_details
- {
- let payload = serde_json::to_string(&message).map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to serialize: {}",
- e
- ))
- })?;
- send_message_to_device(
- &self.db,
- &self.amqp_channel,
- device_id.to_string(),
- payload,
- )
- .await
- .map_err(|e| {
- TokenConnectionError::MessageHandlingFailed(format!(
- "Failed to send a message: {}",
- e
- ))
- })?;
+ let message_payload = serde_json::to_string(&message).map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to serialize: {}",
+ e
+ ))
+ })?;
+
+ // Filter out keyservers
+ let device_ids = user_devices.devices_platform_details.iter().filter_map(
+ |(device_id, platform_details)| {
+ if platform_details.device_type() == DeviceType::Keyserver {
+ None
+ } else {
+ Some(device_id)
+ }
+ },
+ );
+
+ for device_id in device_ids {
+ self
+ .message_sender
+ .simple_send_message_to_device(device_id, message_payload.clone())
+ .await
+ .map_err(|e| {
+ TokenConnectionError::MessageHandlingFailed(format!(
+ "Failed to send a message: {}",
+ e
+ ))
+ })?;
}
}

File Metadata

Mime Type
text/plain
Expires
Sat, Dec 6, 6:11 PM (16 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5840027
Default Alt Text
D15370.1765044668.diff (10 KB)

Event Timeline