Page MenuHomePhabricator

D9158.id31116.diff
No OneTemporary

D9158.id31116.diff

diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock
--- a/services/tunnelbroker/Cargo.lock
+++ b/services/tunnelbroker/Cargo.lock
@@ -853,6 +853,12 @@
"typenum",
]
+[[package]]
+name = "data-encoding"
+version = "2.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
+
[[package]]
name = "derive_more"
version = "0.99.17"
@@ -1217,6 +1223,19 @@
"tokio-io-timeout",
]
+[[package]]
+name = "hyper-tungstenite"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
+dependencies = [
+ "hyper",
+ "pin-project-lite",
+ "tokio",
+ "tokio-tungstenite 0.20.0",
+ "tungstenite 0.20.0",
+]
+
[[package]]
name = "idna"
version = "0.3.0"
@@ -1570,9 +1589,9 @@
[[package]]
name = "pin-project-lite"
-version = "0.2.9"
+version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
+checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
@@ -2274,7 +2293,19 @@
"futures-util",
"log",
"tokio",
- "tungstenite",
+ "tungstenite 0.18.0",
+]
+
+[[package]]
+name = "tokio-tungstenite"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite 0.20.0",
]
[[package]]
@@ -2466,6 +2497,25 @@
"utf-8",
]
+[[package]]
+name = "tungstenite"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "data-encoding",
+ "http",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
[[package]]
name = "tunnelbroker"
version = "0.5.0"
@@ -2476,12 +2526,14 @@
"clap",
"derive_more",
"futures-util",
+ "hyper",
+ "hyper-tungstenite",
"lapin",
"once_cell",
"prost",
"serde_json",
"tokio",
- "tokio-tungstenite",
+ "tokio-tungstenite 0.18.0",
"tonic",
"tonic-build",
"tracing",
diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml
--- a/services/tunnelbroker/Cargo.toml
+++ b/services/tunnelbroker/Cargo.toml
@@ -13,6 +13,8 @@
aws-sdk-dynamodb = "0.27"
clap = { version = "4.2", features = ["derive", "env"] }
futures-util = "0.3"
+hyper = "0.14"
+hyper-tungstenite = "0.11"
once_cell = "1.17"
prost = "0.11"
serde_json = "1.0"
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -5,7 +5,10 @@
use crate::CONFIG;
use futures_util::stream::SplitSink;
use futures_util::StreamExt;
+use hyper::{Body, Request, Response, StatusCode};
+use std::future::Future;
use std::net::SocketAddr;
+use std::pin::Pin;
use std::{env, io::Error};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
@@ -13,8 +16,73 @@
use tokio_tungstenite::WebSocketStream;
use tracing::{debug, error, info};
+type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
+
use self::session::WebsocketSession;
+/// Hyper HTTP service that handles incoming HTTP and websocket connections
+/// It handles the initial websocket upgrade request and spawns a task to
+/// handle the websocket connection.
+/// It also handles regular HTTP requests (currently health check)
+struct WebsocketService {
+ addr: SocketAddr,
+ channel: lapin::Channel,
+ db_client: DatabaseClient,
+}
+
+impl hyper::service::Service<Request<Body>> for WebsocketService {
+ type Response = Response<Body>;
+ type Error = BoxedError;
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+ // This function is called to check if the service is ready to accept
+ // connections. Since we don't have any state to check, we're always ready.
+ fn poll_ready(
+ &mut self,
+ _: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ std::task::Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, mut req: Request<Body>) -> Self::Future {
+ let addr = self.addr;
+ let db_client = self.db_client.clone();
+ let channel = self.channel.clone();
+
+ let future = async move {
+ // Check if the request is a websocket upgrade request.
+ if hyper_tungstenite::is_upgrade_request(&req) {
+ let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?;
+
+ // Spawn a task to handle the websocket connection.
+ tokio::spawn(async move {
+ // TODO: Accept connection here - call accept_connection()
+ });
+
+ // Return the response so the spawned future can continue.
+ return Ok(response);
+ }
+
+ debug!(
+ "Incoming HTTP request on WebSocket port: {} {}",
+ req.method(),
+ req.uri().path()
+ );
+
+ // A simple router for regular HTTP requests
+ let response = match req.uri().path() {
+ "/health" => Response::new(Body::from("OK")),
+ _ => Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Not found"))?,
+ };
+ Ok(response)
+ };
+ Box::pin(future)
+ }
+}
+
pub async fn run_server(
db_client: DatabaseClient,
amqp_connection: &lapin::Connection,

File Metadata

Mime Type
text/plain
Expires
Wed, Nov 27, 11:38 AM (22 h, 13 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2590078
Default Alt Text
D9158.id31116.diff (5 KB)

Event Timeline