diff --git a/services/tunnelbroker/src/amqp_client/amqp.rs b/services/tunnelbroker/src/amqp_client/amqp.rs --- a/services/tunnelbroker/src/amqp_client/amqp.rs +++ b/services/tunnelbroker/src/amqp_client/amqp.rs @@ -1,18 +1,12 @@ -use crate::constants::{error_types, CLIENT_RMQ_MSG_PRIORITY}; -use crate::database::DatabaseClient; +use crate::constants::error_types; use crate::CONFIG; -use comm_lib::aws::ddb::error::SdkError; -use comm_lib::aws::ddb::operation::put_item::PutItemError; use comm_lib::database::batch_operations::ExponentialBackoffConfig; -use lapin::{options::BasicPublishOptions, BasicProperties}; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; -use tunnelbroker_messages::MessageToDevice; -use uuid; static AMQP_URI: Lazy = Lazy::new(|| { let mut amqp_uri = CONFIG @@ -137,6 +131,7 @@ /// /// TODO: Add support for restoring channel topology (queues and consumers) /// (`lapin` has this built-in, but it's internal crate feature) +#[derive(Clone)] pub struct AmqpChannel { conn: AmqpConnection, channel: Arc>>, @@ -204,48 +199,3 @@ fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } - -#[derive( - Debug, derive_more::Display, derive_more::From, derive_more::Error, -)] -pub enum SendMessageError { - PersistenceError(SdkError), - SerializationError(serde_json::Error), - AmqpError(lapin::Error), -} -pub async fn send_message_to_device( - database_client: &DatabaseClient, - amqp_channel: &AmqpChannel, - device_id: String, - payload: String, -) -> Result<(), SendMessageError> { - debug!("Received message for {}", &device_id); - - let client_message_id = uuid::Uuid::new_v4().to_string(); - - let message_id = database_client - .persist_message(&device_id, &payload, &client_message_id) - .await?; - - let message_to_device = MessageToDevice { - device_id: device_id.clone(), - payload, - message_id, - }; - - let serialized_message = serde_json::to_string(&message_to_device)?; - - amqp_channel - .get() - .await? - .basic_publish( - "", - &device_id, - BasicPublishOptions::default(), - serialized_message.as_bytes(), - BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), - ) - .await?; - - Ok(()) -} diff --git a/services/tunnelbroker/src/amqp_client/mod.rs b/services/tunnelbroker/src/amqp_client/mod.rs --- a/services/tunnelbroker/src/amqp_client/mod.rs +++ b/services/tunnelbroker/src/amqp_client/mod.rs @@ -17,6 +17,7 @@ use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest}; pub mod amqp; +pub mod utils; pub struct AmqpClient { db_client: DatabaseClient, diff --git a/services/tunnelbroker/src/amqp_client/utils.rs b/services/tunnelbroker/src/amqp_client/utils.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/amqp_client/utils.rs @@ -0,0 +1,108 @@ +use comm_lib::aws::ddb::{error::SdkError, operation::put_item::PutItemError}; +use lapin::{options::BasicPublishOptions, BasicProperties}; +use tunnelbroker_messages::{MessageToDevice, MessageToDeviceRequest}; + +use crate::{constants::CLIENT_RMQ_MSG_PRIORITY, database::DatabaseClient}; + +use super::amqp::AmqpChannel; + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum SendMessageError { + PersistenceError(SdkError), + SerializationError(serde_json::Error), + AmqpError(lapin::Error), +} + +#[derive(Clone)] +pub struct BasicMessageSender { + db_client: DatabaseClient, + amqp_channel: AmqpChannel, +} + +impl BasicMessageSender { + pub fn new( + database_client: &DatabaseClient, + amqp_channel: AmqpChannel, + ) -> Self { + Self { + db_client: database_client.clone(), + amqp_channel, + } + } + + pub async fn send_message_to_device( + &self, + message_request: &MessageToDeviceRequest, + ) -> Result<(), SendMessageError> { + let MessageToDeviceRequest { + client_message_id, + device_id, + payload, + } = message_request; + + send_message_to_device( + &self.db_client, + &self.amqp_channel, + device_id.clone(), + payload.clone(), + Some(client_message_id), + ) + .await + } + + pub async fn simple_send_message_to_device( + &self, + recipient_device_id: &str, + payload: String, + ) -> Result<(), SendMessageError> { + self + .send_message_to_device(&tunnelbroker_messages::MessageToDeviceRequest { + client_message_id: uuid::Uuid::new_v4().to_string(), + device_id: recipient_device_id.to_string(), + payload, + }) + .await + } +} + +pub async fn send_message_to_device( + database_client: &DatabaseClient, + amqp_channel: &AmqpChannel, + device_id: String, + payload: String, + client_message_id: Option<&String>, +) -> Result<(), SendMessageError> { + tracing::debug!("Received message for {}", &device_id); + + let client_message_id = client_message_id + .cloned() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + let message_id = database_client + .persist_message(&device_id, &payload, &client_message_id) + .await?; + + let message_to_device = MessageToDevice { + device_id: device_id.clone(), + payload, + message_id, + }; + + let serialized_message = serde_json::to_string(&message_to_device)?; + + amqp_channel + .get() + .await? + .basic_publish( + "", + &device_id, + BasicPublishOptions::default(), + serialized_message.as_bytes(), + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), + ) + .await?; + + Ok(()) +} diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -2,9 +2,8 @@ tonic::include_proto!("tunnelbroker"); } -use crate::amqp_client::amqp::{ - send_message_to_device, AmqpChannel, AmqpConnection, SendMessageError, -}; +use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection}; +use crate::amqp_client::utils::{send_message_to_device, SendMessageError}; use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; @@ -44,6 +43,7 @@ &self.amqp_channel, message.device_id, message.payload, + None, ) .await .map_err(|e| match e { diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -1,6 +1,5 @@ -use crate::amqp_client::amqp::{ - send_message_to_device, AmqpChannel, AmqpConnection, -}; +use crate::amqp_client::amqp::{AmqpChannel, AmqpConnection}; +use crate::amqp_client::utils::BasicMessageSender; use crate::config::CONFIG; use crate::constants::MEDIA_MIRROR_TIMEOUT; use crate::database::DatabaseClient; @@ -31,7 +30,7 @@ token_data: String, amqp_connection: AmqpConnection, grpc_client: ChainedInterceptedServicesAuthClient, - amqp_channel: AmqpChannel, + message_sender: BasicMessageSender, blob_client: S2SAuthedBlobClient, } @@ -46,6 +45,9 @@ grpc_client: ChainedInterceptedServicesAuthClient, auth_service: &AuthService, ) { + let message_sender = + BasicMessageSender::new(&db, AmqpChannel::new(&amqp_connection)); + let connection = Self { db: db.clone(), config: config.clone(), @@ -53,11 +55,11 @@ token_data, amqp_connection: amqp_connection.clone(), grpc_client, - amqp_channel: AmqpChannel::new(&amqp_connection), blob_client: S2SAuthedBlobClient::new( auth_service, CONFIG.blob_service_url.clone(), ), + message_sender, }; tokio::spawn(async move { @@ -543,28 +545,24 @@ message: direct_cast_message, }; if let Some(user_devices) = user_devices_option { - for (device_id, platform_details) in - &user_devices.devices_platform_details - { - let payload = serde_json::to_string(&message).map_err(|e| { - TokenConnectionError::MessageHandlingFailed(format!( - "Failed to serialize: {}", - e - )) - })?; - send_message_to_device( - &self.db, - &self.amqp_channel, - device_id.to_string(), - payload, - ) - .await - .map_err(|e| { - TokenConnectionError::MessageHandlingFailed(format!( - "Failed to send a message: {}", - e - )) - })?; + let message_payload = serde_json::to_string(&message).map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to serialize: {}", + e + )) + })?; + + for device_id in user_devices.devices_platform_details.keys() { + self + .message_sender + .simple_send_message_to_device(device_id, message_payload.clone()) + .await + .map_err(|e| { + TokenConnectionError::MessageHandlingFailed(format!( + "Failed to send a message: {}", + e + )) + })?; } }