diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -166,6 +166,18 @@ self.inner.read().unwrap().get_channel(id_hash) } + /// Triggers reconnecting in background, without awaiting + pub fn trigger_reconnect(&self) { + if !self.is_connected() && !self.is_connecting() { + let this = self.clone(); + tokio::spawn(async move { + if let Err(err) = this.reset_conn().await { + tracing::warn!("AMQP background reconnect failed: {:?}", err); + } + }); + } + } + async fn reset_conn(&self) -> Result<(), lapin::Error> { if let Ok(false) = self.is_connecting.compare_exchange( false, diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -1,4 +1,4 @@ -use crate::amqp::AmqpConnection; +use crate::amqp::{is_connection_error, AmqpConnection}; use crate::constants::{ error_types, CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY, MAX_RMQ_MSG_PRIORITY, RMQ_CONSUMER_TAG, @@ -756,10 +756,15 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to cancel consumer: {}", e - ); + if is_connection_error(&e) { + warn!("AMQP connection dead when closing WS session."); + self.amqp.trigger_reconnect(); + } else { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to cancel consumer: {}", e + ); + } } if let Err(e) = self @@ -770,10 +775,15 @@ ) .await { - error!( - errorType = error_types::AMQP_ERROR, - "Failed to delete queue: {}", e - ); + if is_connection_error(&e) { + warn!("AMQP connection dead when closing WS session."); + self.amqp.trigger_reconnect(); + } else { + error!( + errorType = error_types::AMQP_ERROR, + "Failed to delete queue: {}", e + ); + } } }