Page MenuHomePhabricator

D13606.diff
No OneTemporary

D13606.diff

diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -19,7 +19,7 @@
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
-use tracing::{debug, error, info, trace};
+use tracing::{debug, error, info, trace, warn};
use tunnelbroker_messages::{
ConnectionInitializationStatus, DeviceToTunnelbrokerRequestStatus, Heartbeat,
MessageSentStatus,
@@ -229,16 +229,28 @@
loop {
trace!("Polling for messages from: {}", addr);
tokio::select! {
- Some(Ok(delivery)) = session.next_amqp_message() => {
- if let Ok(message) = std::str::from_utf8(&delivery.data) {
- if message == WS_SESSION_CLOSE_AMQP_MSG {
- debug!("Connection to {} closed by server.", addr);
- break;
- } else {
- session.send_message_to_device(Message::Text(message.to_string())).await;
+ Some(delivery_result) = session.next_amqp_message() => {
+ match delivery_result {
+ Ok(delivery) => {
+ if let Ok(message) = std::str::from_utf8(&delivery.data) {
+ if message == WS_SESSION_CLOSE_AMQP_MSG {
+ debug!("Connection to {} closed by server.", addr);
+ break;
+ } else {
+ session.send_message_to_device(Message::Text(message.to_string())).await;
+ }
+ } else {
+ error!("Invalid payload");
+ }
+ },
+ Err(err) => {
+ warn!("Session AMQP error: {:?}", err);
+ if let Err(e) = session.reset_failed_amqp().await {
+ warn!("Connection to {} closed due to failed AMQP restoration: {:?}", addr, e);
+ break;
+ }
+ continue;
}
- } else {
- error!("Invalid payload");
}
},
device_message = incoming.next() => {
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -62,6 +62,7 @@
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
pub device_info: DeviceInfo,
+ amqp: AmqpConnection,
amqp_channel: lapin::Channel,
// Stream of messages from AMQP endpoint
amqp_consumer: lapin::Consumer,
@@ -234,6 +235,7 @@
tx,
db_client,
device_info,
+ amqp,
amqp_channel,
amqp_consumer,
notif_client,
@@ -275,6 +277,26 @@
Ok((amqp_channel, amqp_consumer))
}
+ pub async fn reset_failed_amqp(&mut self) -> Result<(), SessionError> {
+ if self.amqp_channel.status().connected()
+ && self.amqp_consumer.state().is_active()
+ {
+ return Ok(());
+ }
+ debug!(
+ "Resetting failed amqp for session with {}",
+ &self.device_info.device_id
+ );
+
+ let (amqp_channel, amqp_consumer) =
+ Self::init_amqp(&self.device_info, &self.db_client, &self.amqp).await?;
+
+ self.amqp_channel = amqp_channel;
+ self.amqp_consumer = amqp_consumer;
+
+ Ok(())
+ }
+
pub async fn handle_message_to_device(
&self,
message_request: &MessageToDeviceRequest,

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 22, 5:19 PM (17 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2563138
Default Alt Text
D13606.diff (3 KB)

Event Timeline