diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -3,7 +3,7 @@ use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, info, warn}; use crate::constants::error_types; @@ -127,6 +127,40 @@ } } +/// Wrapper over [`lapin::Channel`] that automatically recreates AMQP channel +/// in case of errors. The channel is initialized on first use. +/// +/// TODO: Add support for restoring channel topology (queues and consumers) +/// (`lapin` has this built-in, but it's internal crate feature) +pub struct AmqpChannel { + conn: AmqpConnection, + channel: Arc>>, +} + +impl AmqpChannel { + pub fn new(amqp_connection: &AmqpConnection) -> Self { + let channel = Arc::new(Mutex::new(None)); + Self { + conn: amqp_connection.clone(), + channel, + } + } + + pub async fn get(&self) -> Result { + let mut channel = self.channel.lock().await; + match channel.as_ref() { + Some(ch) if ch.status().connected() => Ok(ch.clone()), + _ => { + let new_channel = self.conn.new_channel().await?; + let channel_id = new_channel.id(); + debug!(channel_id, "Instantiated lazy AMQP channel."); + *channel = Some(new_channel.clone()); + Ok(new_channel) + } + } + } +} + fn should_ignore_error(err: &lapin::Error) -> bool { use lapin::Error as E; use std::io::ErrorKind; diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -11,14 +11,14 @@ use tracing::debug; use tunnelbroker_messages::MessageToDevice; -use crate::amqp::AmqpConnection; +use crate::amqp::{AmqpChannel, AmqpConnection}; use crate::constants::{CLIENT_RMQ_MSG_PRIORITY, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::{handle_ddb_error, DatabaseClient}; use crate::{constants, CONFIG}; struct TunnelbrokerGRPC { client: DatabaseClient, - amqp: AmqpConnection, + amqp_channel: AmqpChannel, } pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status { @@ -58,8 +58,8 @@ .map_err(|_| tonic::Status::invalid_argument("Invalid argument"))?; self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( @@ -85,8 +85,8 @@ debug!("Connection close request for device {}", &message.device_id); self - .amqp - .new_channel() + .amqp_channel + .get() .await .map_err(handle_amqp_error)? .basic_publish( @@ -141,7 +141,7 @@ .http2_keepalive_timeout(Some(constants::GRPC_KEEP_ALIVE_PING_TIMEOUT)) .add_service(TunnelbrokerServiceServer::new(TunnelbrokerGRPC { client, - amqp: amqp_connection.clone(), + amqp_channel: AmqpChannel::new(amqp_connection), })) .serve(addr) .await