diff --git a/services/tunnelbroker/src/amqp.rs b/services/tunnelbroker/src/amqp.rs --- a/services/tunnelbroker/src/amqp.rs +++ b/services/tunnelbroker/src/amqp.rs @@ -134,6 +134,14 @@ } } +pub fn is_connection_error(err: &lapin::Error) -> bool { + matches!( + err, + lapin::Error::InvalidChannelState(_) + | lapin::Error::InvalidConnectionState(_) + ) +} + fn from_env(var_name: &str) -> Option { std::env::var(var_name).ok().filter(|s| !s.is_empty()) } diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -19,7 +19,7 @@ use std::pin::Pin; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::{ ConnectionInitializationStatus, DeviceToTunnelbrokerRequestStatus, Heartbeat, MessageSentStatus, @@ -229,16 +229,28 @@ loop { trace!("Polling for messages from: {}", addr); tokio::select! { - Some(Ok(delivery)) = session.next_amqp_message() => { - if let Ok(message) = std::str::from_utf8(&delivery.data) { - if message == WS_SESSION_CLOSE_AMQP_MSG { - debug!("Connection to {} closed by server.", addr); - break; - } else { - session.send_message_to_device(Message::Text(message.to_string())).await; + Some(delivery_result) = session.next_amqp_message() => { + match delivery_result { + Ok(delivery) => { + if let Ok(message) = std::str::from_utf8(&delivery.data) { + if message == WS_SESSION_CLOSE_AMQP_MSG { + debug!("Connection to {} closed by server.", addr); + break; + } else { + session.send_message_to_device(Message::Text(message.to_string())).await; + } + } else { + error!("Invalid payload"); + } + }, + Err(err) => { + warn!("Session AMQP error: {:?}", err); + if let Err(e) = session.reset_failed_amqp().await { + warn!("Connection to {} closed due to failed AMQP restoration: {:?}", addr, e); + break; + } + continue; } - } else { - error!("Invalid payload"); } }, device_message = incoming.next() => { diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -62,6 +62,7 @@ tx: SplitSink, Message>, db_client: DatabaseClient, pub device_info: DeviceInfo, + amqp: AmqpConnection, amqp_channel: lapin::Channel, // Stream of messages from AMQP endpoint amqp_consumer: lapin::Consumer, @@ -234,6 +235,7 @@ tx, db_client, device_info, + amqp, amqp_channel, amqp_consumer, notif_client, @@ -275,6 +277,21 @@ Ok((amqp_channel, amqp_consumer)) } + pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> { + debug!( + "Resetting failed amqp for session with {}", + &self.device_info.device_id + ); + + let (amqp_channel, amqp_consumer) = + Self::init_amqp(&self.device_info, &self.db_client, &self.amqp).await?; + + self.amqp_channel = amqp_channel; + self.amqp_consumer = amqp_consumer; + + Ok(()) + } + pub async fn handle_message_to_device( &self, message_request: &MessageToDeviceRequest,