Page MenuHomePhabricator

D13601/new/.diff
No OneTemporary

D13601/new/.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -1,8 +1,10 @@
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
+use std::sync::Arc;
use std::time::Duration;
-use tracing::info;
+use tokio::sync::RwLock;
+use tracing::{debug, error, info};
use crate::constants::error_types;
use crate::CONFIG;
@@ -69,6 +71,50 @@
conn
}
+/// Inner connection that is a direct wrapper over [`lapin::Connection`]
+/// This should be instantiated only once to establish connection
+/// New instances can be created to reconnect
+struct ConnectionInner {
+ conn: lapin::Connection,
+}
+
+impl ConnectionInner {
+ async fn new() -> Result<Self, lapin::Error> {
+ let conn = create_connection().await?;
+ conn.on_error(|err| {
+ // TODO: we should filter out some IOErrors here to avoid spamming alerts
+ error!(errorType = error_types::AMQP_ERROR, "Lapin error: {err:?}");
+ });
+
+ Ok(Self { conn })
+ }
+
+ fn is_connected(&self) -> bool {
+ self.conn.status().connected()
+ }
+}
+
+/// Thread safe connection wrapper that is `Clone + Send + Sync`
+/// and can be shared wherever needed.
+#[derive(Clone)]
+pub struct AmqpConnection {
+ inner: Arc<RwLock<ConnectionInner>>,
+}
+
+impl AmqpConnection {
+ pub async fn connect() -> Result<Self, lapin::Error> {
+ let conn = ConnectionInner::new().await?;
+ let inner = Arc::new(RwLock::new(conn));
+ info!("Connected to AMQP endpoint: {}", &CONFIG.amqp_uri);
+
+ Ok(Self { inner })
+ }
+
+ async fn is_connected(&self) -> bool {
+ self.inner.read().await.is_connected()
+ }
+}
+
fn from_env(var_name: &str) -> Option<String> {
std::env::var(var_name).ok().filter(|s| !s.is_empty())
}

File Metadata

Mime Type
text/plain
Expires
Sat, Oct 5, 8:34 AM (11 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2241635
Default Alt Text
D13601/new/.diff (1 KB)

Event Timeline