Page MenuHomePhabricator

D13600.diff
No OneTemporary

D13600.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -1,5 +1,5 @@
use comm_lib::database::batch_operations::ExponentialBackoffConfig;
-use lapin::{uri::AMQPUri, Connection, ConnectionProperties};
+use lapin::{uri::AMQPUri, ConnectionProperties};
use once_cell::sync::Lazy;
use std::time::Duration;
use tracing::info;
@@ -24,7 +24,7 @@
amqp_uri
});
-pub async fn connect() -> Connection {
+async fn create_connection() -> Result<lapin::Connection, lapin::Error> {
let options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
@@ -37,23 +37,27 @@
let mut retry_counter = retry_config.new_counter();
tracing::debug!("Attempting to connect to AMQP...");
- let conn_result = loop {
+ loop {
let amqp_uri = Lazy::force(&AMQP_URI).clone();
match lapin::Connection::connect_uri(amqp_uri, options.clone()).await {
- Ok(conn) => break Ok(conn),
+ Ok(conn) => return Ok(conn),
Err(err) => {
let attempt = retry_counter.attempt();
tracing::warn!(attempt, "AMQP connection attempt failed: {err}.");
if retry_counter.sleep_and_retry().await.is_err() {
tracing::error!("Unable to connect to AMQP: {err}");
- break Err(err);
+ return Err(err);
}
}
}
- };
+ }
+}
- let conn = conn_result.expect("Unable to connect to AMQP. Exiting.");
+pub async fn connect() -> lapin::Connection {
+ let conn = create_connection()
+ .await
+ .expect("Unable to connect to AMQP. Exiting.");
conn.on_error(|error| {
tracing::error!(
errorType = error_types::AMQP_ERROR,

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 5:25 PM (17 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2563332
Default Alt Text
D13600.diff (1 KB)

Event Timeline