diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -1,9 +1,13 @@ -use crate::constants::error_types; -use crate::CONFIG; +use comm_lib::database::batch_operations::ExponentialBackoffConfig; use lapin::{uri::AMQPUri, Connection, ConnectionProperties}; +use once_cell::sync::Lazy; +use std::time::Duration; use tracing::info; -pub async fn connect() -> Connection { +use crate::constants::error_types; +use crate::CONFIG; + +static AMQP_URI: Lazy = Lazy::new(|| { let mut amqp_uri = CONFIG .amqp_uri .parse::() @@ -17,13 +21,39 @@ amqp_uri.authority.userinfo.password = amqp_pass; } + amqp_uri +}); + +pub async fn connect() -> Connection { let options = ConnectionProperties::default() .with_executor(tokio_executor_trait::Tokio::current()) .with_reactor(tokio_reactor_trait::Tokio); - let conn = Connection::connect_uri(amqp_uri, options) - .await - .expect("Unable to connect to AMQP endpoint"); + let retry_config = ExponentialBackoffConfig { + max_attempts: 5, + base_duration: Duration::from_millis(500), + ..Default::default() + }; + let mut retry_counter = retry_config.new_counter(); + + tracing::debug!("Attempting to connect to AMQP..."); + let conn_result = loop { + let amqp_uri = Lazy::force(&AMQP_URI).clone(); + match lapin::Connection::connect_uri(amqp_uri, options.clone()).await { + Ok(conn) => break Ok(conn), + Err(err) => { + let attempt = retry_counter.attempt(); + tracing::warn!(attempt, "AMQP connection attempt failed: {err}."); + + if retry_counter.sleep_and_retry().await.is_err() { + tracing::error!("Unable to connect to AMQP: {err}"); + break Err(err); + } + } + } + }; + + let conn = conn_result.expect("Unable to connect to AMQP. Exiting."); conn.on_error(|error| { tracing::error!( errorType = error_types::AMQP_ERROR, diff --git a/shared/comm-lib/src/database.rs b/shared/comm-lib/src/database.rs --- a/shared/comm-lib/src/database.rs +++ b/shared/comm-lib/src/database.rs @@ -706,6 +706,11 @@ self.attempt = 0; } + /// Returns 1 before the first retry, then 2,3,... after subsequent retries + pub fn attempt(&self) -> u32 { + self.attempt + 1 + } + /// increase counter and sleep in case of failure pub async fn sleep_and_retry(&mut self) -> Result<(), super::Error> { let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor));