Page MenuHomePhabricator

D13606.id44886.diff
No OneTemporary

D13606.id44886.diff

diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs
--- a/services/tunnelbroker/src/amqp.rs
+++ b/services/tunnelbroker/src/amqp.rs
@@ -206,6 +206,14 @@
}
}
+pub fn is_connection_error(err: &lapin::Error) -> bool {
+ matches!(
+ err,
+ lapin::Error::InvalidChannelState(_)
+ | lapin::Error::InvalidConnectionState(_)
+ )
+}
+
fn from_env(var_name: &str) -> Option<String> {
std::env::var(var_name).ok().filter(|s| !s.is_empty())
}
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -19,7 +19,7 @@
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
-use tracing::{debug, error, info, trace};
+use tracing::{debug, error, info, trace, warn};
use tunnelbroker_messages::{
ConnectionInitializationStatus, DeviceToTunnelbrokerRequestStatus, Heartbeat,
MessageSentStatus,
@@ -229,16 +229,31 @@
loop {
trace!("Polling for messages from: {}", addr);
tokio::select! {
- Some(Ok(delivery)) = session.next_amqp_message() => {
- if let Ok(message) = std::str::from_utf8(&delivery.data) {
- if message == WS_SESSION_CLOSE_AMQP_MSG {
- debug!("Connection to {} closed by server.", addr);
+ Some(delivery_result) = session.next_amqp_message() => {
+ match delivery_result {
+ Ok(delivery) => {
+ if let Ok(message) = std::str::from_utf8(&delivery.data) {
+ if message == WS_SESSION_CLOSE_AMQP_MSG {
+ debug!("Connection to {} closed by server.", addr);
+ break;
+ } else {
+ session.send_message_to_device(Message::Text(message.to_string())).await;
+ }
+ } else {
+ error!("Invalid payload");
+ }
+ },
+ Err(ref err) if crate::amqp::is_connection_error(err) => {
+ if let Err(e) = session.reset_failed_amqp().await {
+ warn!("Connection to {} closed due to failed AMQP restoration: {:?}", addr, e);
+ break;
+ }
+ continue;
+ }
+ Err(err) => {
+ warn!("Connection to {} closed due to AMQP error: {:?}", addr, err);
break;
- } else {
- session.send_message_to_device(Message::Text(message.to_string())).await;
}
- } else {
- error!("Invalid payload");
}
},
device_message = incoming.next() => {
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -62,6 +62,7 @@
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
pub device_info: DeviceInfo,
+ amqp: AmqpConnection,
amqp_channel: lapin::Channel,
// Stream of messages from AMQP endpoint
amqp_consumer: lapin::Consumer,
@@ -234,6 +235,7 @@
tx,
db_client,
device_info,
+ amqp,
amqp_channel,
amqp_consumer,
notif_client,
@@ -287,6 +289,21 @@
Ok((amqp_channel, amqp_consumer))
}
+ pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> {
+ debug!(
+ "Resetting failed amqp for session with {}",
+ &self.device_info.device_id
+ );
+
+ let (amqp_channel, amqp_consumer) =
+ Self::init_amqp(&self.device_info, &self.db_client, &self.amqp).await?;
+
+ self.amqp_channel = amqp_channel;
+ self.amqp_consumer = amqp_consumer;
+
+ Ok(())
+ }
+
pub async fn handle_message_to_device(
&self,
message_request: &MessageToDeviceRequest,

File Metadata

Mime Type
text/plain
Expires
Sat, Oct 5, 5:47 AM (10 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2236198
Default Alt Text
D13606.id44886.diff (3 KB)

Event Timeline