diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock --- a/services/tunnelbroker/Cargo.lock +++ b/services/tunnelbroker/Cargo.lock @@ -1232,8 +1232,8 @@ "hyper", "pin-project-lite", "tokio", - "tokio-tungstenite 0.20.0", - "tungstenite 0.20.0", + "tokio-tungstenite", + "tungstenite", ] [[package]] @@ -2284,18 +2284,6 @@ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.18.0", -] - [[package]] name = "tokio-tungstenite" version = "0.20.0" @@ -2305,7 +2293,7 @@ "futures-util", "log", "tokio", - "tungstenite 0.20.0", + "tungstenite", ] [[package]] @@ -2478,25 +2466,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "tungstenite" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" -dependencies = [ - "base64 0.13.1", - "byteorder", - "bytes", - "http", - "httparse", - "log", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.20.0" @@ -2533,7 +2502,6 @@ "prost", "serde_json", "tokio", - "tokio-tungstenite 0.18.0", "tonic", "tonic-build", "tracing", diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -18,8 +18,7 @@ once_cell = "1.17" prost = "0.11" serde_json = "1.0" -tokio-tungstenite = { version = "0.18.0", features = [ ] } -tokio = { version = "1.24", features = ["rt-multi-thread"]} +tokio = { version = "1.24", features = ["rt-multi-thread"] } tonic = "0.8" tracing = "0.1" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -6,14 +6,15 @@ use futures_util::stream::SplitSink; use futures_util::StreamExt; use hyper::{Body, Request, Response, StatusCode}; +use hyper_tungstenite::tungstenite::Message; +use hyper_tungstenite::HyperWebsocket; +use hyper_tungstenite::WebSocketStream; +use std::env; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use std::{env, io::Error}; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::{TcpListener, TcpStream}; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::WebSocketStream; +use tokio::net::TcpListener; use tracing::{debug, error, info}; type BoxedError = Box; @@ -57,7 +58,7 @@ // Spawn a task to handle the websocket connection. tokio::spawn(async move { - // TODO: Accept connection here - call accept_connection() + accept_connection(websocket, addr, db_client, channel).await; }); // Return the response so the spawned future can continue. @@ -86,19 +87,38 @@ pub async fn run_server( db_client: DatabaseClient, amqp_connection: &lapin::Connection, -) -> Result<(), Error> { +) -> Result<(), BoxedError> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") .unwrap_or_else(|_| format!("0.0.0.0:{}", &CONFIG.http_port)); let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); info!("WebSocket listening on: {}", addr); + let mut http = hyper::server::conn::Http::new(); + http.http1_only(true); + http.http1_keep_alive(true); + while let Ok((stream, addr)) = listener.accept().await { let channel = amqp_connection .create_channel() .await - .expect("Unable to create amqp channel"); - tokio::spawn(accept_connection(stream, addr, db_client.clone(), channel)); + .expect("Failed to create AMQP channel"); + let connection = http + .serve_connection( + stream, + WebsocketService { + channel, + db_client: db_client.clone(), + addr, + }, + ) + .with_upgrades(); + + tokio::spawn(async move { + if let Err(err) = connection.await { + error!("Error serving HTTP/WebSocket connection: {:?}", err); + } + }); } Ok(()) @@ -106,14 +126,14 @@ /// Handler for any incoming websocket connections async fn accept_connection( - raw_stream: TcpStream, + hyper_ws: HyperWebsocket, addr: SocketAddr, db_client: DatabaseClient, amqp_channel: lapin::Channel, ) { debug!("Incoming connection from: {}", addr); - let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { + let ws_stream = match hyper_ws.await { Ok(stream) => stream, Err(e) => { info!( diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -2,12 +2,12 @@ use futures_util::stream::SplitSink; use futures_util::SinkExt; use futures_util::StreamExt; +use hyper_tungstenite::{tungstenite::Message, WebSocketStream}; use lapin::message::Delivery; use lapin::options::{BasicConsumeOptions, QueueDeclareOptions}; use lapin::types::FieldTable; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use tracing::{debug, error}; use tunnelbroker_messages::{session::DeviceTypes, Messages};