diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::constants::error_types; use crate::CONFIG; @@ -110,6 +110,25 @@ Ok(Self { inner }) } + pub async fn new_channel(&self) -> Result { + if !self.is_connected().await { + warn!("AMQP disconnected while retrieving channel"); + self.reset_conn().await?; + } + self.inner.read().await.conn.create_channel().await + } + + async fn reset_conn(&self) -> Result<(), lapin::Error> { + let mut inner = self.inner.write().await; + if !inner.is_connected() { + debug!("Resetting AMQP connection..."); + let new_conn = ConnectionInner::new().await?; + *inner = new_conn; + info!("AMQP Connection restored."); + } + Ok(()) + } + async fn is_connected(&self) -> bool { self.inner.read().await.is_connected() }