Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/websockets/mod.rs
mod session; | |||||
use crate::CONFIG; | use crate::CONFIG; | ||||
use futures::future; | use futures::future; | ||||
use futures_util::stream::SplitSink; | use futures_util::stream::SplitSink; | ||||
use futures_util::SinkExt; | use futures_util::SinkExt; | ||||
use futures_util::{StreamExt, TryStreamExt}; | use futures_util::{StreamExt, TryStreamExt}; | ||||
use std::net::SocketAddr; | use std::net::SocketAddr; | ||||
use std::{env, io::Error}; | use std::{env, io::Error}; | ||||
use tokio::net::{TcpListener, TcpStream}; | use tokio::net::{TcpListener, TcpStream}; | ||||
Show All 33 Lines | Err(e) => { | ||||
return; | return; | ||||
} | } | ||||
}; | }; | ||||
let (mut outgoing, incoming) = ws_stream.split(); | let (mut outgoing, incoming) = ws_stream.split(); | ||||
// Create channel for messages to be passed to this connection | // Create channel for messages to be passed to this connection | ||||
let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | ||||
let session = session::WebsocketSession::new(tx.clone()); | |||||
let handle_incoming = incoming.try_for_each(|msg| { | let handle_incoming = incoming.try_for_each(|msg| { | ||||
debug!("Received message from {}", addr); | debug!("Received message from {}", addr); | ||||
match msg { | match msg { | ||||
Message::Text(text) => { | Message::Text(text) => { | ||||
match handle_message_from_device(&text, &tx) { | match session.handle_message_from_device(&text) { | ||||
Ok(_) => { | Ok(_) => { | ||||
debug!("Successfully handled message: {}", text) | debug!("Successfully handled message: {}", text) | ||||
} | } | ||||
Err(e) => { | Err(e) => { | ||||
error!("Failed to process message: {}", e); | error!("Failed to process message: {}", e); | ||||
} | } | ||||
}; | }; | ||||
} | } | ||||
Show All 13 Lines | tokio::select! { | ||||
Ok(_) = handle_incoming => { debug!("Received message from websocket") }, | Ok(_) = handle_incoming => { debug!("Received message from websocket") }, | ||||
else => { | else => { | ||||
info!("Connection with {} closed.", addr); | info!("Connection with {} closed.", addr); | ||||
ACTIVE_CONNECTIONS.remove("test"); | ACTIVE_CONNECTIONS.remove("test"); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
fn handle_message_from_device( | |||||
message: &str, | |||||
tx: &tokio::sync::mpsc::UnboundedSender<std::string::String>, | |||||
) -> Result<(), serde_json::Error> { | |||||
match serde_json::from_str::<Messages>(message)? { | |||||
Messages::SessionRequest(session_info) => { | |||||
ACTIVE_CONNECTIONS.insert(session_info.device_id, tx.clone()); | |||||
} | |||||
_ => { | |||||
debug!("Received invalid request"); | |||||
} | |||||
} | |||||
Ok(()) | |||||
} | |||||
async fn handle_message_from_service( | async fn handle_message_from_service( | ||||
incoming_payload: String, | incoming_payload: String, | ||||
outgoing: &mut SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, | outgoing: &mut SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, | ||||
) { | ) { | ||||
if let Err(e) = outgoing.send(Message::Text(incoming_payload)).await { | if let Err(e) = outgoing.send(Message::Text(incoming_payload)).await { | ||||
error!("Failed to send message to device: {}", e); | error!("Failed to send message to device: {}", e); | ||||
} | } | ||||
} | } |