diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -1,10 +1,12 @@ use comm_lib::database::batch_operations::ExponentialBackoffConfig; use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; +use std::sync::Arc; use std::time::Duration; -use tracing::info; +use tokio::sync::RwLock; +use tracing::{debug, error, info}; -use crate::constants::error_types; +use crate::constants::{error_types, NUM_AMQP_CHANNELS}; use crate::CONFIG; static AMQP_URI: Lazy = Lazy::new(|| { @@ -69,6 +71,65 @@ conn } +/// Inner connection that is a direct wrapper over [`lapin::Connection`] +/// This should be instantiated only once to establish connection +/// New instances can be created to reconnect +struct ConnectionInner { + conn: lapin::Connection, + // channel pool + channels: [lapin::Channel; NUM_AMQP_CHANNELS], +} + +impl ConnectionInner { + async fn new() -> Result { + let conn = create_connection().await?; + conn.on_error(|err| { + // TODO: we should filter out some IOErrors here to avoid spamming alerts + error!(errorType = error_types::AMQP_ERROR, "Lapin error: {err:?}"); + }); + + debug!("Creating channels..."); + let mut channels = Vec::with_capacity(NUM_AMQP_CHANNELS); + for idx in 0..NUM_AMQP_CHANNELS { + let channel = conn.create_channel().await?; + tracing::trace!("Creating channel ID={} at index={}", channel.id(), idx); + channels.push(channel); + } + + Ok(Self { + conn, + channels: channels + .try_into() + .expect("Channels vec size doesn't match array size"), + }) + } + + fn is_connected(&self) -> bool { + self.conn.status().connected() + } +} + +/// Thread safe connection wrapper that is `Clone + Send + Sync` +/// and can be shared wherever needed. +#[derive(Clone)] +pub struct AmqpConnection { + inner: Arc>, +} + +impl AmqpConnection { + pub async fn connect() -> Result { + let conn = ConnectionInner::new().await?; + let inner = Arc::new(RwLock::new(conn)); + info!("Connected to AMQP endpoint: {}", &CONFIG.amqp_uri); + + Ok(Self { inner }) + } + + async fn is_connected(&self) -> bool { + self.inner.read().await.is_connected() + } +} + fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -7,6 +7,7 @@ pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); +pub const NUM_AMQP_CHANNELS: usize = 8; pub const MAX_RMQ_MSG_PRIORITY: u8 = 10; pub const DDB_RMQ_MSG_PRIORITY: u8 = 10; pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1;