diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,3 +1,5 @@ +mod session; + use crate::CONFIG; use futures::future; use futures_util::stream::SplitSink; @@ -47,11 +49,12 @@ // Create channel for messages to be passed to this connection let (tx, mut rx) = mpsc::unbounded_channel::(); + let session = session::WebsocketSession::new(tx.clone()); let handle_incoming = incoming.try_for_each(|msg| { debug!("Received message from {}", addr); match msg { Message::Text(text) => { - match handle_message_from_device(&text, &tx) { + match session.handle_message_from_device(&text) { Ok(_) => { debug!("Successfully handled message: {}", text) } @@ -81,22 +84,6 @@ } } -fn handle_message_from_device( - message: &str, - tx: &tokio::sync::mpsc::UnboundedSender, -) -> Result<(), serde_json::Error> { - match serde_json::from_str::(message)? { - Messages::SessionRequest(session_info) => { - ACTIVE_CONNECTIONS.insert(session_info.device_id, tx.clone()); - } - _ => { - debug!("Received invalid request"); - } - } - - Ok(()) -} - async fn handle_message_from_service( incoming_payload: String, outgoing: &mut SplitSink, Message>, diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs new file mode 100644 --- /dev/null +++ b/services/tunnelbroker/src/websockets/session.rs @@ -0,0 +1,32 @@ +use tracing::debug; +use tunnelbroker_messages::Messages; + +use crate::ACTIVE_CONNECTIONS; + +pub struct WebsocketSession { + tx: tokio::sync::mpsc::UnboundedSender, +} + +impl WebsocketSession { + pub fn new( + tx: tokio::sync::mpsc::UnboundedSender, + ) -> WebsocketSession { + WebsocketSession { tx } + } + + pub fn handle_message_from_device( + &self, + message: &str, + ) -> Result<(), serde_json::Error> { + match serde_json::from_str::(message)? { + Messages::SessionRequest(session_info) => { + ACTIVE_CONNECTIONS.insert(session_info.device_id, self.tx.clone()); + } + _ => { + debug!("Received invalid request"); + } + } + + Ok(()) + } +}