diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -281,6 +281,27 @@ !self.amqp_channel.status().connected() } + async fn publish_amqp_message_to_device( + &mut self, + device_id: &str, + payload: &[u8], + ) -> Result { + if self.is_amqp_channel_dead() { + self.reset_failed_amqp().await?; + } + let publish_result = self + .amqp_channel + .basic_publish( + "", + device_id, + BasicPublishOptions::default(), + payload, + BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), + ) + .await?; + Ok(publish_result) + } + pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> { if self.amqp_channel.status().connected() && self.amqp_consumer.state().is_active() @@ -302,7 +323,7 @@ } pub async fn handle_message_to_device( - &self, + &mut self, message_request: &MessageToDeviceRequest, ) -> Result<(), SessionError> { let message_id = self @@ -323,23 +344,19 @@ let serialized_message = serde_json::to_string(&message_to_device)?; let publish_result = self - .amqp_channel - .basic_publish( - "", + .publish_amqp_message_to_device( &message_request.device_id, - BasicPublishOptions::default(), serialized_message.as_bytes(), - BasicProperties::default().with_priority(CLIENT_RMQ_MSG_PRIORITY), ) .await; - if let Err(publish_error) = publish_result { + if let Err(amqp_session_error) = publish_result { self .db_client .delete_message(&self.device_info.device_id, &message_id) .await .expect("Error deleting message"); - return Err(SessionError::AmqpError(publish_error)); + return Err(amqp_session_error); } Ok(()) } @@ -830,7 +847,7 @@ } async fn invalidate_device_token( - &self, + &mut self, device_id: String, invalidated_token: String, ) -> Result<(), SessionError> {