diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock --- a/services/tunnelbroker/Cargo.lock +++ b/services/tunnelbroker/Cargo.lock @@ -64,7 +64,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -75,7 +75,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -281,7 +281,7 @@ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.103", ] [[package]] @@ -298,7 +298,20 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", +] + +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", ] [[package]] @@ -424,9 +437,9 @@ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", "futures-sink", @@ -434,9 +447,9 @@ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" @@ -451,38 +464,38 @@ [[package]] name = "futures-io" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.15", ] [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-channel", "futures-core", @@ -778,6 +791,16 @@ "cc", ] +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" @@ -882,9 +905,9 @@ [[package]] name = "once_cell" -version = "1.16.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "openssl" @@ -909,7 +932,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -947,6 +970,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot_core" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.45.0", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -980,7 +1016,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1014,14 +1050,14 @@ checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51" dependencies = [ "proc-macro2", - "syn", + "syn 1.0.103", ] [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" dependencies = [ "unicode-ident", ] @@ -1053,7 +1089,7 @@ "prost", "prost-types", "regex", - "syn", + "syn 1.0.103", "tempfile", "which", ] @@ -1068,7 +1104,7 @@ "itertools", "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1083,9 +1119,9 @@ [[package]] name = "quote" -version = "1.0.21" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -1236,6 +1272,12 @@ "windows-sys 0.36.1", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "scratch" version = "1.0.2" @@ -1277,29 +1319,29 @@ [[package]] name = "serde" -version = "1.0.147" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.147" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.15", ] [[package]] name = "serde_json" -version = "1.0.87" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" +checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" dependencies = [ "itoa", "ryu", @@ -1380,6 +1422,17 @@ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sync_wrapper" version = "0.1.1" @@ -1426,7 +1479,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1501,7 +1554,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1604,7 +1657,7 @@ "proc-macro2", "prost-build", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1679,7 +1732,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", ] [[package]] @@ -1761,15 +1814,19 @@ "base64 0.20.0", "cxx", "cxx-build", + "dashmap", "env_logger", "fcm", "futures", + "futures-util", "glob", "lazy_static", "log", + "once_cell", "openssl", "prost", "prost-types", + "serde_json", "tokio", "tokio-stream", "tokio-tungstenite", @@ -1777,6 +1834,15 @@ "tonic-build", "tracing", "tracing-subscriber", + "tunnelbroker_messages", +] + +[[package]] +name = "tunnelbroker_messages" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", ] [[package]] @@ -1896,7 +1962,7 @@ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.103", "wasm-bindgen-shared", ] @@ -1930,7 +1996,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.103", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2032,19 +2098,43 @@ checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.42.0", - "windows_i686_gnu 0.42.0", - "windows_i686_msvc 0.42.0", - "windows_x86_64_gnu 0.42.0", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.42.0", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.2", ] [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_msvc" @@ -2054,9 +2144,9 @@ [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_i686_gnu" @@ -2066,9 +2156,9 @@ [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_msvc" @@ -2078,9 +2168,9 @@ [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_x86_64_gnu" @@ -2090,15 +2180,15 @@ [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_msvc" @@ -2108,9 +2198,9 @@ [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "winreg" diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -8,24 +8,29 @@ links = "tunnelbroker" [dependencies] -cxx = "1.0" -tracing = "0.1" -tokio = { version = "1.24", features = ["rt-multi-thread"]} -tokio-stream = "0.1" -lazy_static = "1.4" a2 = "0.6" -log = "0.4" +anyhow = "1.0" +base64 = "0.20" +cxx = "1.0" +dashmap = "5.4" env_logger = "0.9" fcm = "0.9" -anyhow = "1.0" +futures = "0.3" +futures-util = "0.3" +lazy_static = "1.4" +log = "0.4" +once_cell = "1.17" openssl = { version = "0.10", features = ["vendored"] } -base64 = "0.20" -tonic = "0.8" prost = "0.11" prost-types = "0.11" -futures = "0.3" +serde_json = "1.0" +tokio-stream = "0.1" tokio-tungstenite = { version = "0.18.0", features = [ ] } +tokio = { version = "1.24", features = ["rt-multi-thread"]} +tonic = "0.8" +tracing = "0.1" tracing-subscriber = "0.3.16" +tunnelbroker_messages = { path = "../../shared/tunnelbroker_messages" } [build-dependencies] cxx-build = "1.0" diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -1,8 +1,16 @@ pub mod constants; pub mod grpc; pub mod websockets; +use dashmap::DashMap; +use once_cell::sync::Lazy; use std::io::{self, Error, ErrorKind}; +use tokio::sync::mpsc::UnboundedSender; use tracing; +use tunnelbroker_messages::Messages; + +pub static ACTIVE_CONNECTIONS: Lazy< + DashMap>, +> = Lazy::new(DashMap::new); #[tokio::main] async fn main() -> Result<(), io::Error> { diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,7 +1,13 @@ +use futures::future; +use futures_util::{StreamExt, TryStreamExt}; +use std::net::SocketAddr; use std::{env, io::Error}; - use tokio::net::{TcpListener, TcpStream}; -use tracing::info; +use tokio::sync::mpsc; +use tracing::{debug, error, info}; +use tunnelbroker_messages::messages::Messages; + +use crate::ACTIVE_CONNECTIONS; pub async fn run_server() -> Result<(), Error> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") @@ -10,13 +16,63 @@ let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); info!("Listening on: {}", addr); - while let Ok((stream, _)) = listener.accept().await { - tokio::spawn(accept_connection(stream)); + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(accept_connection(stream, addr)); } Ok(()) } -async fn accept_connection(_stream: TcpStream) { - unimplemented!() +/// Handler for any incoming websocket connections +async fn accept_connection(raw_stream: TcpStream, addr: SocketAddr) { + debug!("Incoming connection from: {}", addr); + + let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { + Ok(stream) => stream, + Err(e) => { + info!( + "Failed to establish connection with {}. Reason: {}", + addr, e + ); + return; + } + }; + + let (_outgoing, incoming) = ws_stream.split(); + + let handle_incoming = incoming.try_for_each(|msg| { + let message_text = msg.to_text().unwrap(); + debug!("Received message from {}: {}", addr, message_text); + + match handle_message(message_text) { + Ok(_) => { + debug!("Successfully handled message: {}", message_text) + } + Err(e) => { + error!("Failed to process message: {}", e); + } + }; + + future::ok(()) + }); + + // Create channel for messages to be passed to this connection + let (tx, mut rx) = mpsc::unbounded_channel::(); + // TODO: Use device's public key, once we support the SessionRequest message + ACTIVE_CONNECTIONS.insert("test".to_string(), tx.clone()); + + tokio::select! { + Some(_) = rx.recv() => { debug!("Received message from channel") }, + Ok(_) = handle_incoming => { debug!("Received message from websocket" )}, + else => { + info!("Connection with {} closed.", addr); + ACTIVE_CONNECTIONS.remove("test"); + } + } +} + +fn handle_message(message: &str) -> Result<(), serde_json::Error> { + serde_json::from_str::(message)?; + + Ok(()) } diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -4,3 +4,11 @@ pub use keys::*; pub use session::*; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub enum Messages { + RefreshKeysRequest(RefreshKeyRequest), + SessionRequest(SessionRequest), +}