diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -96,12 +96,37 @@ fn get_channel( &self, id_hash: impl std::hash::Hash, - ) -> Result { + ) -> Option { let channel_idx = Self::channel_idx_for_hash(id_hash); let channel = self.channels[channel_idx].clone(); let channel_id = channel.id(); tracing::trace!(channel_id, channel_idx, "Retrieving AMQP Channel"); - Ok(channel) + + if channel.status().connected() { + return Some(channel); + } + warn!(channel_id, channel_idx, "Channel is dead!"); + None + } + + async fn reset_channel( + &mut self, + id_hash: impl std::hash::Hash, + ) -> Result { + let channel_idx = Self::channel_idx_for_hash(id_hash); + + let existing_channel = &self.channels[channel_idx]; + if existing_channel.status().connected() { + return Ok(existing_channel.clone()); + } + + let new_channel = self.conn.create_channel().await?; + debug!( + "Channel for idx={channel_idx} was recreated, new Id={}", + new_channel.id() + ); + self.channels[channel_idx] = new_channel.clone(); + Ok(new_channel) } fn is_connected(&self) -> bool { @@ -140,13 +165,21 @@ pub async fn channel( &self, - id_hash: impl std::hash::Hash, + id_hash: impl std::hash::Hash + Clone, ) -> Result { if !self.is_connected().await { warn!("AMQP disconnected while retrieving channel"); self.reset_conn().await?; } - self.inner.read().await.get_channel(id_hash) + + { + let inner = self.inner.read().await; + if let Some(channel) = inner.get_channel(id_hash.clone()) { + return Ok(channel); + } + } + + self.inner.write().await.reset_channel(id_hash).await } async fn reset_conn(&self) -> Result<(), lapin::Error> {