Page MenuHomePhabricator

D13594.id44930.diff
No OneTemporary

D13594.id44930.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -1,9 +1,13 @@
-use crate::constants::error_types;
-use crate::CONFIG;
+use comm_lib::database::batch_operations::ExponentialBackoffConfig;
use lapin::{uri::AMQPUri, Connection, ConnectionProperties};
+use once_cell::sync::Lazy;
+use std::time::Duration;
use tracing::info;
-pub async fn connect() -> Connection {
+use crate::constants::error_types;
+use crate::CONFIG;
+
+static AMQP_URI: Lazy<AMQPUri> = Lazy::new(|| {
let mut amqp_uri = CONFIG
.amqp_uri
.parse::<AMQPUri>()
@@ -17,13 +21,39 @@
amqp_uri.authority.userinfo.password = amqp_pass;
}
+ amqp_uri
+});
+
+pub async fn connect() -> Connection {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
- let conn = Connection::connect_uri(amqp_uri, options)
- .await
- .expect("Unable to connect to AMQP endpoint");
+ let retry_config = ExponentialBackoffConfig {
+ max_attempts: 5,
+ base_duration: Duration::from_millis(500),
+ ..Default::default()
+ };
+ let mut retry_counter = retry_config.new_counter();
+
+ tracing::debug!("Attempting to connect to AMQP...");
+ let conn_result = loop {
+ let amqp_uri = Lazy::force(&AMQP_URI).clone();
+ match lapin::Connection::connect_uri(amqp_uri, options.clone()).await {
+ Ok(conn) => break Ok(conn),
+ Err(err) => {
+ let attempt = retry_counter.attempt();
+ tracing::warn!(attempt, "AMQP connection attempt failed: {err}.");
+
+ if retry_counter.sleep_and_retry().await.is_err() {
+ tracing::error!("Unable to connect to AMQP: {err}");
+ break Err(err);
+ }
+ }
+ }
+ };
+
+ let conn = conn_result.expect("Unable to connect to AMQP. Exiting.");
conn.on_error(|error| {
tracing::error!(
errorType = error_types::AMQP_ERROR,
diff --git a/shared/comm-lib/src/database.rs b/shared/comm-lib/src/database.rs
--- a/shared/comm-lib/src/database.rs
+++ b/shared/comm-lib/src/database.rs
@@ -706,6 +706,11 @@
self.attempt = 0;
}
+ /// Returns 1 before the first retry, then 2,3,... after subsequent retries
+ pub fn attempt(&self) -> u32 {
+ self.attempt + 1
+ }
+
/// increase counter and sleep in case of failure
pub async fn sleep_and_retry(&mut self) -> Result<(), super::Error> {
let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor));

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 3:02 AM (20 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2582221
Default Alt Text
D13594.id44930.diff (2 KB)

Event Timeline