Page MenuHomePhabricator

D13607.diff
No OneTemporary

D13607.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -132,6 +132,19 @@
async fn is_connected(&self) -> bool {
self.inner.read().await.is_connected()
}
+
+ pub fn maybe_reconnect_in_background(&self) {
+ let this = self.clone();
+ tokio::spawn(async move { this.reset_conn().await });
+ }
+}
+
+pub fn is_connection_error(err: &lapin::Error) -> bool {
+ matches!(
+ err,
+ lapin::Error::InvalidChannelState(_)
+ | lapin::Error::InvalidConnectionState(_)
+ )
}
fn from_env(var_name: &str) -> Option<String> {
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -1,4 +1,4 @@
-use crate::amqp::AmqpConnection;
+use crate::amqp::{is_connection_error, AmqpConnection};
use crate::constants::{
error_types, CLIENT_RMQ_MSG_PRIORITY, DDB_RMQ_MSG_PRIORITY,
MAX_RMQ_MSG_PRIORITY, RMQ_CONSUMER_TAG,
@@ -25,7 +25,7 @@
use reqwest::Url;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
-use tracing::{debug, error, info, trace};
+use tracing::{debug, error, info, trace, warn};
use tunnelbroker_messages::bad_device_token::BadDeviceToken;
use tunnelbroker_messages::Platform;
use tunnelbroker_messages::{
@@ -277,6 +277,10 @@
Ok((amqp_channel, amqp_consumer))
}
+ fn is_amqp_channel_dead(&self) -> bool {
+ !self.amqp_channel.status().connected()
+ }
+
pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> {
if self.amqp_channel.status().connected()
&& self.amqp_consumer.state().is_active()
@@ -741,6 +745,12 @@
debug!("Failed to close WebSocket session: {}", e);
}
+ if self.is_amqp_channel_dead() {
+ warn!("AMQP channel or connection dead when closing WS session.");
+ self.amqp.maybe_reconnect_in_background();
+ return;
+ }
+
if let Err(e) = self
.amqp_channel
.basic_cancel(
@@ -749,10 +759,12 @@
)
.await
{
- error!(
- errorType = error_types::AMQP_ERROR,
- "Failed to cancel consumer: {}", e
- );
+ if !is_connection_error(&e) {
+ error!(
+ errorType = error_types::AMQP_ERROR,
+ "Failed to cancel consumer: {}", e
+ );
+ }
}
if let Err(e) = self
@@ -763,10 +775,12 @@
)
.await
{
- error!(
- errorType = error_types::AMQP_ERROR,
- "Failed to delete queue: {}", e
- );
+ if !is_connection_error(&e) {
+ error!(
+ errorType = error_types::AMQP_ERROR,
+ "Failed to delete queue: {}", e
+ );
+ }
}
}

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 6:09 PM (12 h, 19 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2563654
Default Alt Text
D13607.diff (2 KB)

Event Timeline