diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -11,6 +11,7 @@ pub const DDB_RMQ_MSG_PRIORITY: u8 = 10; pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1; pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker"; +pub const WS_SESSION_CLOSE_AMQP_MSG: &str = "SessionClose"; pub const ENV_APNS_CONFIG: &str = "APNS_CONFIG"; pub const ENV_FCM_CONFIG: &str = "FCM_CONFIG"; pub const ENV_WEB_PUSH_CONFIG: &str = "WEB_PUSH_CONFIG"; diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,6 +1,6 @@ pub mod session; -use crate::constants::SOCKET_HEARTBEAT_TIMEOUT; +use crate::constants::{SOCKET_HEARTBEAT_TIMEOUT, WS_SESSION_CLOSE_AMQP_MSG}; use crate::database::DatabaseClient; use crate::notifs::NotifClient; use crate::websockets::session::{initialize_amqp, SessionError}; @@ -233,7 +233,12 @@ tokio::select! { Some(Ok(delivery)) = session.next_amqp_message() => { if let Ok(message) = std::str::from_utf8(&delivery.data) { - session.send_message_to_device(Message::Text(message.to_string())).await; + if message == WS_SESSION_CLOSE_AMQP_MSG { + debug!("Connection to {} closed by server.", addr); + break; + } else { + session.send_message_to_device(Message::Text(message.to_string())).await; + } } else { error!("Invalid payload"); }