diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -132,6 +132,19 @@ async fn is_connected(&self) -> bool { self.inner.read().await.is_connected() } + + pub fn maybe_reconnect_in_background(&self) { + let this = self.clone(); + tokio::spawn(async move { this.reset_conn().await }); + } +} + +pub fn is_connection_error(err: &lapin::Error) -> bool { + matches!( + err, + lapin::Error::InvalidChannelState(_) + | lapin::Error::InvalidConnectionState(_) + ) } fn from_env(var_name: &str) -> Option { diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -1,4 +1,4 @@ -use crate::amqp::AmqpConnection; +use crate::amqp::{is_connection_error, AmqpConnection}; use crate::constants::{ error_types, CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY, RMQ_CONSUMER_TAG, @@ -25,7 +25,7 @@ use reqwest::Url; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::bad_device_token::BadDeviceToken; use tunnelbroker_messages::Platform; use tunnelbroker_messages::{ @@ -277,6 +277,10 @@ Ok((amqp_channel, amqp_consumer)) } + fn is_amqp_channel_dead(&self) -> bool { + !self.amqp_channel.status().connected() + } + pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> { if self.amqp_channel.status().connected() && self.amqp_consumer.state().is_active() @@ -741,6 +745,12 @@ debug!("Failed to close WebSocket session: {}", e); } + if self.is_amqp_channel_dead() { + warn!("AMQP channel or connection dead when closing WS session."); + self.amqp.maybe_reconnect_in_background(); + return; + } + if let Err(e) = self .amqp_channel .basic_cancel( @@ -749,10 +759,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to cancel consumer: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to cancel consumer: {}", e + ); + } } if let Err(e) = self @@ -763,10 +775,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to delete queue: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to delete queue: {}", e + ); + } } }