diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -1,4 +1,4 @@ -use crate::amqp::AmqpConnection; +use crate::amqp::{is_connection_error, AmqpConnection}; use crate::constants::{ error_types, CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY, RMQ_CONSUMER_TAG, @@ -289,6 +289,10 @@ Ok((amqp_channel, amqp_consumer)) } + fn is_amqp_channel_dead(&self) -> bool { + !self.amqp_channel.status().connected() + } + pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> { debug!( "Resetting failed amqp for session with {}", @@ -748,6 +752,24 @@ debug!("Failed to close WebSocket session: {}", e); } + if self.is_amqp_channel_dead() { + warn!( + "AMQP channel or connection dead when closing WS session. \ + Attempting to restore..." + ); + if let Err(error) = self.reset_failed_amqp().await { + error!( + ?error, + errorType = error_types::AMQP_ERROR, + "Could not restore AMQP after closing WS session. \ + Queue and consumer for device '{}' won't be properly removed.", + &self.device_info.device_id + ); + return; + } + info!("AMQP restored."); + } + if let Err(e) = self .amqp_channel .basic_cancel( @@ -756,10 +778,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to cancel consumer: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to cancel consumer: {}", e + ); + } } if let Err(e) = self @@ -770,10 +794,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to delete queue: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to delete queue: {}", e + ); + } } }