diff --git a/services/tunnelbroker/src/amqp_client/utils.rs b/services/tunnelbroker/src/amqp_client/utils.rs --- a/services/tunnelbroker/src/amqp_client/utils.rs +++ b/services/tunnelbroker/src/amqp_client/utils.rs @@ -56,14 +56,38 @@ &self, recipient_device_id: &str, payload: String, + persist: bool, ) -> Result<(), SendMessageError> { - self - .send_message_to_device(&tunnelbroker_messages::MessageToDeviceRequest { - client_message_id: uuid::Uuid::new_v4().to_string(), - device_id: recipient_device_id.to_string(), - payload, - }) - .await + if persist { + self + .send_message_to_device( + &tunnelbroker_messages::MessageToDeviceRequest { + client_message_id: uuid::Uuid::new_v4().to_string(), + device_id: recipient_device_id.to_string(), + payload, + }, + ) + .await + } else { + self + .send_message_to_device_without_persist( + recipient_device_id.to_string(), + payload, + ) + .await + } + } + + async fn send_message_to_device_without_persist( + &self, + device_id: String, + payload: String, + ) -> Result<(), SendMessageError> { + tracing::debug!("Sending message for {} without persistence", &device_id); + + let message_id = uuid::Uuid::new_v4().to_string(); + + publish_to_amqp(&self.amqp_channel, device_id, payload, message_id).await } } @@ -84,6 +108,15 @@ .persist_message(&device_id, &payload, &client_message_id) .await?; + publish_to_amqp(amqp_channel, device_id, payload, message_id).await +} + +async fn publish_to_amqp( + amqp_channel: &AmqpChannel, + device_id: String, + payload: String, + message_id: String, +) -> Result<(), SendMessageError> { let message_to_device = MessageToDevice { device_id: device_id.clone(), payload, diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -614,7 +614,11 @@ let message_future = async || { self .message_sender - .simple_send_message_to_device(device_id, message_payload.clone()) + .simple_send_message_to_device( + device_id, + message_payload.clone(), + true, + ) .await .map_err(|e| { TokenConnectionError::MessageHandlingFailed(format!( @@ -760,7 +764,11 @@ for (device_id, _) in &recipient_devices { self .message_sender - .simple_send_message_to_device(device_id, message_payload.clone()) + .simple_send_message_to_device( + device_id, + message_payload.clone(), + false, + ) .await .map_err(|e| { TokenConnectionError::MessageHandlingFailed(format!(