Page MenuHomePhabricator

D13601.id44891.diff
No OneTemporary

D13601.id44891.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -1,10 +1,12 @@
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
+use std::sync::Arc;
use std::time::Duration;
-use tracing::info;
+use tokio::sync::RwLock;
+use tracing::{debug, error, info};
-use crate::constants::error_types;
+use crate::constants::{error_types, NUM_AMQP_CHANNELS};
use crate::CONFIG;
static AMQP_URI: Lazy<AMQPUri> = Lazy::new(|| {
@@ -69,6 +71,65 @@
conn
}
+/// Inner connection that is a direct wrapper over [`lapin::Connection`]
+/// This should be instantiated only once to establish connection
+/// New instances can be created to reconnect
+struct ConnectionInner {
+ conn: lapin::Connection,
+ // channel pool
+ channels: [lapin::Channel; NUM_AMQP_CHANNELS],
+}
+
+impl ConnectionInner {
+ async fn new() -> Result<Self, lapin::Error> {
+ let conn = create_connection().await?;
+ conn.on_error(|err| {
+ // TODO: we should filter out some IOErrors here to avoid spamming alerts
+ error!(errorType = error_types::AMQP_ERROR, "Lapin error: {err:?}");
+ });
+
+ debug!("Creating channels...");
+ let mut channels = Vec::with_capacity(NUM_AMQP_CHANNELS);
+ for idx in 0..NUM_AMQP_CHANNELS {
+ let channel = conn.create_channel().await?;
+ tracing::trace!("Creating channel ID={} at index={}", channel.id(), idx);
+ channels.push(channel);
+ }
+
+ Ok(Self {
+ conn,
+ channels: channels
+ .try_into()
+ .expect("Channels vec size doesn't match array size"),
+ })
+ }
+
+ fn is_connected(&self) -> bool {
+ self.conn.status().connected()
+ }
+}
+
+/// Thread safe connection wrapper that is `Clone + Send + Sync`
+/// and can be shared wherever needed.
+#[derive(Clone)]
+pub struct AmqpConnection {
+ inner: Arc<RwLock<ConnectionInner>>,
+}
+
+impl AmqpConnection {
+ pub async fn connect() -> Result<Self, lapin::Error> {
+ let conn = ConnectionInner::new().await?;
+ let inner = Arc::new(RwLock::new(conn));
+ info!("Connected to AMQP endpoint: {}", &CONFIG.amqp_uri);
+
+ Ok(Self { inner })
+ }
+
+ async fn is_connected(&self) -> bool {
+ self.inner.read().await.is_connected()
+ }
+}
+
fn from_env(var_name: &str) -> Option<String> {
std::env::var(var_name).ok().filter(|s| !s.is_empty())
}
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -7,6 +7,7 @@
pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3);
+pub const NUM_AMQP_CHANNELS: usize = 8;
pub const MAX_RMQ_MSG_PRIORITY: u8 = 10;
pub const DDB_RMQ_MSG_PRIORITY: u8 = 10;
pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1;

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 11:11 PM (17 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2566363
Default Alt Text
D13601.id44891.diff (2 KB)

Event Timeline