diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -1,5 +1,5 @@ use comm_lib::database::batch_operations::ExponentialBackoffConfig; -use lapin::{uri::AMQPUri, Connection, ConnectionProperties}; +use lapin::{uri::AMQPUri, ConnectionProperties}; use once_cell::sync::Lazy; use std::time::Duration; use tracing::info; @@ -24,7 +24,7 @@ amqp_uri }); -pub async fn connect() -> Connection { +async fn create_connection() -> Result { let options = ConnectionProperties::default() .with_executor(tokio_executor_trait::Tokio::current()) .with_reactor(tokio_reactor_trait::Tokio); @@ -37,23 +37,27 @@ let mut retry_counter = retry_config.new_counter(); tracing::debug!("Attempting to connect to AMQP..."); - let conn_result = loop { + loop { let amqp_uri = Lazy::force(&AMQP_URI).clone(); match lapin::Connection::connect_uri(amqp_uri, options.clone()).await { - Ok(conn) => break Ok(conn), + Ok(conn) => return Ok(conn), Err(err) => { let attempt = retry_counter.attempt(); tracing::warn!(attempt, "AMQP connection attempt failed: {err}."); if retry_counter.sleep_and_retry().await.is_err() { tracing::error!("Unable to connect to AMQP: {err}"); - break Err(err); + return Err(err); } } } - }; + } +} - let conn = conn_result.expect("Unable to connect to AMQP. Exiting."); +pub async fn connect() -> lapin::Connection { + let conn = create_connection() + .await + .expect("Unable to connect to AMQP. Exiting."); conn.on_error(|error| { tracing::error!( errorType = error_types::AMQP_ERROR,