diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -132,6 +132,11 @@ async fn is_connected(&self) -> bool { self.inner.read().await.is_connected() } + + pub fn maybe_reconnect_in_background(&self) { + let this = self.clone(); + tokio::spawn(async move { this.reset_conn().await }); + } } pub fn is_connection_error(err: &lapin::Error) -> bool { diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -1,4 +1,4 @@ -use crate::amqp::AmqpConnection; +use crate::amqp::{is_connection_error, AmqpConnection}; use crate::constants::{ error_types, CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY, RMQ_CONSUMER_TAG, @@ -25,7 +25,7 @@ use reqwest::Url; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::bad_device_token::BadDeviceToken; use tunnelbroker_messages::Platform; use tunnelbroker_messages::{ @@ -277,6 +277,10 @@ Ok((amqp_channel, amqp_consumer)) } + fn is_amqp_channel_dead(&self) -> bool { + !self.amqp_channel.status().connected() + } + pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> { debug!( "Resetting failed amqp for session with {}", @@ -736,6 +740,12 @@ debug!("Failed to close WebSocket session: {}", e); } + if self.is_amqp_channel_dead() { + warn!("AMQP channel or connection dead when closing WS session."); + self.amqp.maybe_reconnect_in_background(); + return; + } + if let Err(e) = self .amqp_channel .basic_cancel( @@ -744,10 +754,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to cancel consumer: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to cancel consumer: {}", e + ); + } } if let Err(e) = self @@ -758,10 +770,12 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to delete queue: {}", e - ); + if !is_connection_error(&e) { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to delete queue: {}", e + ); + } } }