Page MenuHomePhabricator

D7768.id26431.diff
No OneTemporary

D7768.id26431.diff

diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -1,3 +1,5 @@
+mod session;
+
use crate::CONFIG;
use futures::future;
use futures_util::stream::SplitSink;
@@ -47,11 +49,12 @@
// Create channel for messages to be passed to this connection
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
+ let session = session::WebsocketSession::new(tx.clone());
let handle_incoming = incoming.try_for_each(|msg| {
debug!("Received message from {}", addr);
match msg {
Message::Text(text) => {
- match handle_message_from_device(&text, &tx) {
+ match session.handle_message_from_device(&text) {
Ok(_) => {
debug!("Successfully handled message: {}", text)
}
@@ -81,22 +84,6 @@
}
}
-fn handle_message_from_device(
- message: &str,
- tx: &tokio::sync::mpsc::UnboundedSender<std::string::String>,
-) -> Result<(), serde_json::Error> {
- match serde_json::from_str::<Messages>(message)? {
- Messages::SessionRequest(session_info) => {
- ACTIVE_CONNECTIONS.insert(session_info.device_id, tx.clone());
- }
- _ => {
- debug!("Received invalid request");
- }
- }
-
- Ok(())
-}
-
async fn handle_message_from_service(
incoming_payload: String,
outgoing: &mut SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -0,0 +1,32 @@
+use tracing::debug;
+use tunnelbroker_messages::Messages;
+
+use crate::ACTIVE_CONNECTIONS;
+
+pub struct WebsocketSession {
+ tx: tokio::sync::mpsc::UnboundedSender<std::string::String>,
+}
+
+impl WebsocketSession {
+ pub fn new(
+ tx: tokio::sync::mpsc::UnboundedSender<std::string::String>,
+ ) -> WebsocketSession {
+ WebsocketSession { tx }
+ }
+
+ pub fn handle_message_from_device(
+ &self,
+ message: &str,
+ ) -> Result<(), serde_json::Error> {
+ match serde_json::from_str::<Messages>(message)? {
+ Messages::SessionRequest(session_info) => {
+ ACTIVE_CONNECTIONS.insert(session_info.device_id, self.tx.clone());
+ }
+ _ => {
+ debug!("Received invalid request");
+ }
+ }
+
+ Ok(())
+ }
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Jan 11, 1:07 AM (19 h, 55 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2858104
Default Alt Text
D7768.id26431.diff (2 KB)

Event Timeline