Changeset View
Changeset View
Standalone View
Standalone View
services/tunnelbroker/src/websockets/mod.rs
mod session; | mod session; | ||||
use crate::database::DatabaseClient; | use crate::database::DatabaseClient; | ||||
use crate::CONFIG; | use crate::CONFIG; | ||||
use futures_util::stream::SplitSink; | use futures_util::StreamExt; | ||||
use futures_util::SinkExt; | |||||
use futures_util::{StreamExt, TryStreamExt}; | |||||
use std::net::SocketAddr; | use std::net::SocketAddr; | ||||
use std::{env, io::Error}; | use std::{env, io::Error}; | ||||
use tokio::net::{TcpListener, TcpStream}; | use tokio::net::{TcpListener, TcpStream}; | ||||
use tokio::sync::mpsc; | use tokio::sync::mpsc; | ||||
use tokio_tungstenite::tungstenite::Message; | use tracing::{debug, info}; | ||||
use tokio_tungstenite::WebSocketStream; | |||||
use tracing::{debug, error, info}; | |||||
use crate::ACTIVE_CONNECTIONS; | |||||
pub async fn run_server(db_client: DatabaseClient) -> Result<(), Error> { | pub async fn run_server(db_client: DatabaseClient) -> Result<(), Error> { | ||||
let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") | let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") | ||||
.unwrap_or_else(|_| format!("127.0.0.1:{}", &CONFIG.http_port)); | .unwrap_or_else(|_| format!("127.0.0.1:{}", &CONFIG.http_port)); | ||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); | let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); | ||||
info!("Listening on: {}", addr); | info!("Listening on: {}", addr); | ||||
Show All 18 Lines | Err(e) => { | ||||
info!( | info!( | ||||
"Failed to establish connection with {}. Reason: {}", | "Failed to establish connection with {}. Reason: {}", | ||||
addr, e | addr, e | ||||
); | ); | ||||
return; | return; | ||||
} | } | ||||
}; | }; | ||||
let (mut outgoing, incoming) = ws_stream.split(); | let (outgoing, mut incoming) = ws_stream.split(); | ||||
// Create channel for messages to be passed to this connection | // Create channel for messages to be passed to this connection | ||||
let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | let (tx, mut rx) = mpsc::unbounded_channel::<String>(); | ||||
let session = session::WebsocketSession::new(tx.clone(), db_client.clone()); | let mut session = session::WebsocketSession::new(outgoing, db_client.clone()); | ||||
let handle_incoming = incoming.try_for_each(|msg| async { | |||||
debug!("Received message from {}", addr); | |||||
match msg { | |||||
Message::Text(text) => { | |||||
match session.handle_message_from_device(&text).await { | |||||
Ok(_) => { | |||||
debug!("Successfully handled message: {}", text) | |||||
} | |||||
Err(e) => { | |||||
error!("Failed to process message: {}", e); | |||||
} | |||||
}; | |||||
} | |||||
_ => { | |||||
error!("Invalid message was received"); | |||||
} | |||||
} | |||||
Ok(()) | |||||
}); | |||||
debug!("Polling for messages from: {}", addr); | |||||
// Poll for messages either being sent to the device (rx) | // Poll for messages either being sent to the device (rx) | ||||
// or messages being received from the device (handle_incoming) | // or messages being received from the device (incoming) | ||||
loop { | |||||
debug!("Polling for messages from: {}", addr); | |||||
tokio::select! { | tokio::select! { | ||||
Some(message) = rx.recv() => { handle_message_from_service(message, &mut outgoing).await; }, | Some(message) = rx.recv() => { session.send_message_to_device(message).await; }, | ||||
Ok(_) = handle_incoming => { debug!("Received message from websocket") }, | device_message = incoming.next() => { | ||||
else => { | match device_message { | ||||
info!("Connection with {} closed.", addr); | Some(Ok(msg)) => session.handle_websocket_frame_from_device(msg, tx.clone()).await, | ||||
ACTIVE_CONNECTIONS.remove("test"); | _ => { | ||||
debug!("Connection to {} closed remotely.", addr); | |||||
break; | |||||
} | } | ||||
} | } | ||||
}, | |||||
else => { | |||||
debug!("Unhealthy connection for: {}", addr); | |||||
break; | |||||
}, | |||||
} | } | ||||
async fn handle_message_from_service( | |||||
incoming_payload: String, | |||||
outgoing: &mut SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, | |||||
) { | |||||
if let Err(e) = outgoing.send(Message::Text(incoming_payload)).await { | |||||
error!("Failed to send message to device: {}", e); | |||||
} | } | ||||
info!("Unregistering connection to: {}", addr); | |||||
session.close().await | |||||
} | } |