Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3373461
D9158.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D9158.diff
View Options
diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock
--- a/services/tunnelbroker/Cargo.lock
+++ b/services/tunnelbroker/Cargo.lock
@@ -853,6 +853,12 @@
"typenum",
]
+[[package]]
+name = "data-encoding"
+version = "2.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
+
[[package]]
name = "derive_more"
version = "0.99.17"
@@ -1217,6 +1223,19 @@
"tokio-io-timeout",
]
+[[package]]
+name = "hyper-tungstenite"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
+dependencies = [
+ "hyper",
+ "pin-project-lite",
+ "tokio",
+ "tokio-tungstenite 0.20.0",
+ "tungstenite 0.20.0",
+]
+
[[package]]
name = "idna"
version = "0.3.0"
@@ -1570,9 +1589,9 @@
[[package]]
name = "pin-project-lite"
-version = "0.2.9"
+version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
+checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
@@ -2274,7 +2293,19 @@
"futures-util",
"log",
"tokio",
- "tungstenite",
+ "tungstenite 0.18.0",
+]
+
+[[package]]
+name = "tokio-tungstenite"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite 0.20.0",
]
[[package]]
@@ -2466,6 +2497,25 @@
"utf-8",
]
+[[package]]
+name = "tungstenite"
+version = "0.20.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "data-encoding",
+ "http",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
[[package]]
name = "tunnelbroker"
version = "0.5.0"
@@ -2476,12 +2526,14 @@
"clap",
"derive_more",
"futures-util",
+ "hyper",
+ "hyper-tungstenite",
"lapin",
"once_cell",
"prost",
"serde_json",
"tokio",
- "tokio-tungstenite",
+ "tokio-tungstenite 0.18.0",
"tonic",
"tonic-build",
"tracing",
diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml
--- a/services/tunnelbroker/Cargo.toml
+++ b/services/tunnelbroker/Cargo.toml
@@ -13,6 +13,8 @@
aws-sdk-dynamodb = "0.27"
clap = { version = "4.2", features = ["derive", "env"] }
futures-util = "0.3"
+hyper = "0.14"
+hyper-tungstenite = "0.11"
once_cell = "1.17"
prost = "0.11"
serde_json = "1.0"
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -5,7 +5,10 @@
use crate::CONFIG;
use futures_util::stream::SplitSink;
use futures_util::StreamExt;
+use hyper::{Body, Request, Response, StatusCode};
+use std::future::Future;
use std::net::SocketAddr;
+use std::pin::Pin;
use std::{env, io::Error};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
@@ -13,8 +16,73 @@
use tokio_tungstenite::WebSocketStream;
use tracing::{debug, error, info};
+type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
+
use self::session::WebsocketSession;
+/// Hyper HTTP service that handles incoming HTTP and websocket connections
+/// It handles the initial websocket upgrade request and spawns a task to
+/// handle the websocket connection.
+/// It also handles regular HTTP requests (currently health check)
+struct WebsocketService {
+ addr: SocketAddr,
+ channel: lapin::Channel,
+ db_client: DatabaseClient,
+}
+
+impl hyper::service::Service<Request<Body>> for WebsocketService {
+ type Response = Response<Body>;
+ type Error = BoxedError;
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+ // This function is called to check if the service is ready to accept
+ // connections. Since we don't have any state to check, we're always ready.
+ fn poll_ready(
+ &mut self,
+ _: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ std::task::Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, mut req: Request<Body>) -> Self::Future {
+ let addr = self.addr;
+ let db_client = self.db_client.clone();
+ let channel = self.channel.clone();
+
+ let future = async move {
+ // Check if the request is a websocket upgrade request.
+ if hyper_tungstenite::is_upgrade_request(&req) {
+ let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?;
+
+ // Spawn a task to handle the websocket connection.
+ tokio::spawn(async move {
+ // TODO: Accept connection here - call accept_connection()
+ });
+
+ // Return the response so the spawned future can continue.
+ return Ok(response);
+ }
+
+ debug!(
+ "Incoming HTTP request on WebSocket port: {} {}",
+ req.method(),
+ req.uri().path()
+ );
+
+ // A simple router for regular HTTP requests
+ let response = match req.uri().path() {
+ "/health" => Response::new(Body::from("OK")),
+ _ => Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Not found"))?,
+ };
+ Ok(response)
+ };
+ Box::pin(future)
+ }
+}
+
pub async fn run_server(
db_client: DatabaseClient,
amqp_connection: &lapin::Connection,
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Nov 27, 9:49 AM (20 h, 23 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2590078
Default Alt Text
D9158.diff (5 KB)
Attached To
Mode
D9158: [tunnelbroker] Hyper service for WebSocket connections
Attached
Detach File
Event Timeline
Log In to Comment