diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::constants::{error_types, NUM_AMQP_CHANNELS}; use crate::CONFIG; @@ -150,6 +150,28 @@ Ok(Self { inner }) } + pub async fn channel( + &self, + id_hash: impl std::hash::Hash, + ) -> Result { + if !self.is_connected().await { + warn!("AMQP disconnected while retrieving channel"); + self.reset_conn().await?; + } + self.inner.read().await.get_channel(id_hash) + } + + async fn reset_conn(&self) -> Result<(), lapin::Error> { + debug!("Resetting connection..."); + let mut inner = self.inner.write().await; + if !inner.is_connected() { + let new_conn = ConnectionInner::new().await?; + *inner = new_conn; + info!("AMQP Connection restored."); + } + Ok(()) + } + async fn is_connected(&self) -> bool { self.inner.read().await.is_connected() }