diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock --- a/services/tunnelbroker/Cargo.lock +++ b/services/tunnelbroker/Cargo.lock @@ -853,6 +853,12 @@ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "derive_more" version = "0.99.17" @@ -1217,6 +1223,19 @@ "tokio-io-timeout", ] +[[package]] +name = "hyper-tungstenite" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-tungstenite 0.20.0", + "tungstenite 0.20.0", +] + [[package]] name = "idna" version = "0.3.0" @@ -1570,9 +1589,9 @@ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -2274,7 +2293,19 @@ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.18.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.20.0", ] [[package]] @@ -2466,6 +2497,25 @@ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tunnelbroker" version = "0.5.0" @@ -2476,12 +2526,14 @@ "clap", "derive_more", "futures-util", + "hyper", + "hyper-tungstenite", "lapin", "once_cell", "prost", "serde_json", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.18.0", "tonic", "tonic-build", "tracing", diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -13,6 +13,8 @@ aws-sdk-dynamodb = "0.27" clap = { version = "4.2", features = ["derive", "env"] } futures-util = "0.3" +hyper = "0.14" +hyper-tungstenite = "0.11" once_cell = "1.17" prost = "0.11" serde_json = "1.0" diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -5,7 +5,10 @@ use crate::CONFIG; use futures_util::stream::SplitSink; use futures_util::StreamExt; +use hyper::{Body, Request, Response, StatusCode}; +use std::future::Future; use std::net::SocketAddr; +use std::pin::Pin; use std::{env, io::Error}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; @@ -13,8 +16,67 @@ use tokio_tungstenite::WebSocketStream; use tracing::{debug, error, info}; +type BoxedError = Box; + use self::session::WebsocketSession; +/// Hyper HTTP service that handles incoming HTTP and websocket connections +/// It handles the initial websocket upgrade request and spawns a task to +/// handle the websocket connection. +/// It also handles regular HTTP requests (currently health check) +struct WebsocketService { + addr: SocketAddr, + channel: lapin::Channel, + db_client: DatabaseClient, +} + +impl hyper::service::Service> for WebsocketService { + type Response = Response; + type Error = BoxedError; + type Future = + Pin> + Send>>; + + // This function is called to check if the service is ready to accept + // connections. Since we don't have any state to check, we're always ready. + fn poll_ready( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let addr = self.addr; + let db_client = self.db_client.clone(); + let channel = self.channel.clone(); + + let future = async move { + // Check if the request is a websocket upgrade request. + if hyper_tungstenite::is_upgrade_request(&req) { + let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?; + + // Spawn a task to handle the websocket connection. + tokio::spawn(async move { + // TODO: Accept connection here - call accept_connection() + }); + + // Return the response so the spawned future can continue. + return Ok(response); + } + + // A simple router for regular HTTP requests + let response = match req.uri().path() { + "/health" => Response::new(Body::from("OK")), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found"))?, + }; + Ok(response) + }; + Box::pin(future) + } +} + pub async fn run_server( db_client: DatabaseClient, amqp_connection: &lapin::Connection,