Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/websockets/mod.rs
mod session; | mod session; | ||||
use crate::database::DatabaseClient; | |||||
use crate::CONFIG; | use crate::CONFIG; | ||||
use futures::future; | |||||
use futures_util::stream::SplitSink; | use futures_util::stream::SplitSink; | ||||
use futures_util::SinkExt; | use futures_util::SinkExt; | ||||
use futures_util::{StreamExt, TryStreamExt}; | use futures_util::{StreamExt, TryStreamExt}; | ||||
use std::net::SocketAddr; | use std::net::SocketAddr; | ||||
use std::{env, io::Error}; | use std::{env, io::Error}; | ||||
use tokio::net::{TcpListener, TcpStream}; | use tokio::net::{TcpListener, TcpStream}; | ||||
use tokio::sync::mpsc; | use tokio::sync::mpsc; | ||||
use tokio_tungstenite::tungstenite::Message; | use tokio_tungstenite::tungstenite::Message; | ||||
use tokio_tungstenite::WebSocketStream; | use tokio_tungstenite::WebSocketStream; | ||||
use tracing::{debug, error, info}; | use tracing::{debug, error, info}; | ||||
use tunnelbroker_messages::messages::Messages; | use tunnelbroker_messages::messages::Messages; | ||||
use crate::ACTIVE_CONNECTIONS; | use crate::ACTIVE_CONNECTIONS; | ||||
pub async fn run_server() -> Result<(), Error> { | pub async fn run_server(db_client: DatabaseClient) -> Result<(), Error> { | ||||
let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") | let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") | ||||
.unwrap_or_else(|_| format!("127.0.0.1:{}", &CONFIG.http_port)); | .unwrap_or_else(|_| format!("127.0.0.1:{}", &CONFIG.http_port)); | ||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); | let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); | ||||
info!("Listening on: {}", addr); | info!("Listening on: {}", addr); | ||||
while let Ok((stream, addr)) = listener.accept().await { | while let Ok((stream, addr)) = listener.accept().await { | ||||
tokio::spawn(accept_connection(stream, addr)); | tokio::spawn(accept_connection(stream, addr, db_client.clone())); | ||||
} | } | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
/// Handler for any incoming websocket connections | /// Handler for any incoming websocket connections | ||||
async fn accept_connection(raw_stream: TcpStream, addr: SocketAddr) { | async fn accept_connection( | ||||
raw_stream: TcpStream, | |||||
addr: SocketAddr, | |||||
db_client: DatabaseClient, | |||||
) { | |||||
debug!("Incoming connection from: {}", addr); | debug!("Incoming connection from: {}", addr); | ||||
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { | let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { | ||||
Ok(stream) => stream, | Ok(stream) => stream, | ||||
Err(e) => { | Err(e) => { | ||||
info!( | info!( | ||||
"Failed to establish connection with {}. Reason: {}", | "Failed to establish connection with {}. Reason: {}", | ||||
addr, e | addr, e | ||||
); | ); | ||||
return; | return; | ||||
} | } | ||||
}; | }; | ||||
let (mut outgoing, incoming) = ws_stream.split(); | let (mut outgoing, incoming) = ws_stream.split(); | ||||
// Create channel for messages to be passed to this connection | // Create channel for messages to be passed to this connection | ||||
let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | ||||
let session = session::WebsocketSession::new(tx.clone()); | let session = session::WebsocketSession::new(tx.clone(), db_client.clone()); | ||||
let handle_incoming = incoming.try_for_each(|msg| { | let handle_incoming = incoming.try_for_each(|msg| async { | ||||
debug!("Received message from {}", addr); | debug!("Received message from {}", addr); | ||||
match msg { | match msg { | ||||
Message::Text(text) => { | Message::Text(text) => { | ||||
match session.handle_message_from_device(&text) { | match session.handle_message_from_device(&text).await { | ||||
Ok(_) => { | Ok(_) => { | ||||
debug!("Successfully handled message: {}", text) | debug!("Successfully handled message: {}", text) | ||||
} | } | ||||
Err(e) => { | Err(e) => { | ||||
error!("Failed to process message: {}", e); | error!("Failed to process message: {}", e); | ||||
} | } | ||||
}; | }; | ||||
} | } | ||||
_ => { | _ => { | ||||
error!("Invalid message was received"); | error!("Invalid message was received"); | ||||
} | } | ||||
} | } | ||||
Ok(()) | |||||
future::ok(()) | |||||
}); | }); | ||||
debug!("Polling for messages from: {}", addr); | debug!("Polling for messages from: {}", addr); | ||||
// Poll for messages either being sent to the device (rx) | // Poll for messages either being sent to the device (rx) | ||||
// or messages being received from the device (handle_incoming) | // or messages being received from the device (handle_incoming) | ||||
tokio::select! { | tokio::select! { | ||||
Some(message) = rx.recv() => { handle_message_from_service(message, &mut outgoing).await; }, | Some(message) = rx.recv() => { handle_message_from_service(message, &mut outgoing).await; }, | ||||
Ok(_) = handle_incoming => { debug!("Received message from websocket") }, | Ok(_) = handle_incoming => { debug!("Received message from websocket") }, | ||||
Show All 15 Lines |