diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -278,16 +278,10 @@ ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT)); let Some(message_status) = session.handle_websocket_frame_from_device(msg).await else { + // There is no response back to the client continue; }; - let request_status = DeviceToTunnelbrokerRequestStatus { - client_message_ids: vec![message_status] - }; - if let Ok(response) = serde_json::to_string(&request_status) { - session.send_message_to_device(Message::text(response)).await; - } else { - break; - } + session.send_message_to_device(Message::text(message_status)).await; } _ => { error!("Client sent invalid message type"); diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -25,13 +25,13 @@ use tokio::io::AsyncWrite; use tracing::{debug, error, info, trace, warn}; use tunnelbroker_messages::bad_device_token::BadDeviceToken; -use tunnelbroker_messages::Platform; use tunnelbroker_messages::{ message_to_device_request_status::Failure, message_to_device_request_status::MessageSentStatus, session::DeviceTypes, DeviceToTunnelbrokerMessage, Heartbeat, MessageToDevice, MessageToDeviceRequest, MessageToTunnelbroker, }; +use tunnelbroker_messages::{DeviceToTunnelbrokerRequestStatus, Platform}; use web_push::WebPushError; use crate::database::{self, DatabaseClient, MessageToDeviceExt}; @@ -412,14 +412,36 @@ pub async fn handle_websocket_frame_from_device( &mut self, msg: String, - ) -> Option { - let Ok(serialized_message) = - serde_json::from_str::(&msg) - else { - return Some(MessageSentStatus::SerializationError(msg)); + ) -> Option { + let serialized_message = match serde_json::from_str::< + DeviceToTunnelbrokerMessage, + >(&msg) + { + Ok(message) => message, + Err(_) => { + error!("Error parsing {}", msg); + let request_status = DeviceToTunnelbrokerRequestStatus { + // Information to the client about which message failed to be serialized. + client_message_ids: vec![MessageSentStatus::SerializationError(msg)], + }; + return serde_json::to_string(&request_status).ok(); + } + }; + + let message_status = + self.handle_message_from_device(serialized_message).await?; + + let request_status = DeviceToTunnelbrokerRequestStatus { + client_message_ids: vec![message_status], }; + serde_json::to_string(&request_status).ok() + } - match serialized_message { + pub async fn handle_message_from_device( + &mut self, + device_to_tunnelbroker_message: DeviceToTunnelbrokerMessage, + ) -> Option { + match device_to_tunnelbroker_message { DeviceToTunnelbrokerMessage::Heartbeat(Heartbeat {}) => { trace!("Received heartbeat from: {}", self.device_info.device_id); None