diff --git a/services/tunnelbroker/src/websockets/messages.rs b/services/tunnelbroker/src/websockets/messages.rs deleted file mode 100644 index 8038af168..000000000 --- a/services/tunnelbroker/src/websockets/messages.rs +++ /dev/null @@ -1,79 +0,0 @@ -// This file intends to preserve the original structure of the tunnelbroker -// protobuf RPCs. However, this is converted to simple rust datastructures -// to allow for deserialization and serialization into JSON websocket messages - -// Original Tunnelbroker Service definition: -// -// service TunnelbrokerService { -// rpc SessionSignature(SessionSignatureRequest) returns (SessionSignatureResponse) {} -// rpc NewSession(NewSessionRequest) returns (NewSessionResponse) {} -// rpc MessagesStream(stream MessageToTunnelbroker) returns (stream MessageToClient) {} -// } - -// Session -pub struct SessionSignatureRequest { - pub device_id: String, -} -pub struct SessionSignatureResponse { - pub to_sign: String, -} - -pub enum DeviceTypes { - Mobile, - Web, - Keyserver, -} - -pub struct SessionRequest { - pub device_id: String, - pub public_key: String, - pub signature: String, - pub notify_token: Option, - pub device_type: DeviceTypes, - pub device_app_version: String, - pub device_os: String, -} - -pub struct SessionResponse { - pub session_id: String, -} - -// Common messages structures for the MessagesStream -pub struct ProcessedMessages { - pub message_id: Vec, -} - -// The messages from the Client to the Tunnelbroker -pub struct MessageToTunnelbrokerStruct { - pub to_device_id: String, - pub payload: String, - pub blob_hashes: Vec, -} - -pub struct MessagesToSend { - pub messages: Vec, -} - -pub enum MessageToTunnelbroker { - Messages(MessagesToSend), - ProcessedMessages(ProcessedMessages), - NewNotifyToken(String), -} - -// The messages from the Tunnelbroker to the Client -pub struct MessageToClientStruct { - pub message_id: String, - pub from_device_id: String, - pub payload: String, - pub blob_hashes: Vec, -} - -pub struct MessagesToDeliver { - pub messages: Vec, -} - -pub enum MessageToClient { - Messages(MessagesToDeliver), - ProcessedMessages(ProcessedMessages), - NewNotifyTokenRequired, -} diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs index 7cf30b56d..9fd60135b 100644 --- a/services/tunnelbroker/src/websockets/mod.rs +++ b/services/tunnelbroker/src/websockets/mod.rs @@ -1,24 +1,22 @@ -pub mod messages; - use std::{env, io::Error}; use tokio::net::{TcpListener, TcpStream}; use tracing::info; pub async fn create_server() -> Result<(), Error> { let addr = env::var("COMM_TUNNELBROKER_WEBSOCKET_ADDR") .unwrap_or_else(|_| "127.0.0.1:51001".to_string()); let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); info!("Listening on: {}", addr); while let Ok((stream, _)) = listener.accept().await { tokio::spawn(accept_connection(stream)); } Ok(()) } async fn accept_connection(_stream: TcpStream) { unimplemented!() } diff --git a/shared/tunnelbroker_messages/.gitignore b/shared/tunnelbroker_messages/.gitignore new file mode 100644 index 000000000..2f7896d1d --- /dev/null +++ b/shared/tunnelbroker_messages/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/shared/tunnelbroker_messages/Cargo.lock b/shared/tunnelbroker_messages/Cargo.lock new file mode 100644 index 000000000..e47708a08 --- /dev/null +++ b/shared/tunnelbroker_messages/Cargo.lock @@ -0,0 +1,89 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + +[[package]] +name = "proc-macro2" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" + +[[package]] +name = "serde" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tunnelbroker_messages" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "unicode-ident" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" diff --git a/shared/tunnelbroker_messages/Cargo.toml b/shared/tunnelbroker_messages/Cargo.toml new file mode 100644 index 000000000..c70ec8d25 --- /dev/null +++ b/shared/tunnelbroker_messages/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "tunnelbroker_messages" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0.160", features = [ "derive" ] } +serde_json = "1.0.96" diff --git a/shared/tunnelbroker_messages/src/lib.rs b/shared/tunnelbroker_messages/src/lib.rs new file mode 100644 index 000000000..8f054889b --- /dev/null +++ b/shared/tunnelbroker_messages/src/lib.rs @@ -0,0 +1,3 @@ +pub mod messages; + +pub use messages::*; diff --git a/shared/tunnelbroker_messages/src/messages/keys.rs b/shared/tunnelbroker_messages/src/messages/keys.rs new file mode 100644 index 000000000..28411b7c7 --- /dev/null +++ b/shared/tunnelbroker_messages/src/messages/keys.rs @@ -0,0 +1,28 @@ +// Messages sent between tunnelbroker and a device + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub struct RefreshKeyRequest { + pub device_id: String, + pub number_of_keys: u32, +} + +#[cfg(test)] +mod key_tests { + use super::*; + + #[test] + fn test_refresh_deserialization() { + let example_payload = r#"{ + "type": "RefreshKeyRequest", + "deviceId": "adfjEDFS", + "numberOfKeys": 6 + }"#; + + let request = + serde_json::from_str::(example_payload).unwrap(); + assert_eq!(request.number_of_keys, 6); + } +} diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs new file mode 100644 index 000000000..eb920efab --- /dev/null +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -0,0 +1,6 @@ +// Messages sent between tunnelbroker and a device +pub mod keys; +pub mod session; + +pub use keys::*; +pub use session::*; diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs new file mode 100644 index 000000000..7c9df423f --- /dev/null +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -0,0 +1,68 @@ +// Messages sent between tunnelbroker and a device + +use serde::{Deserialize, Serialize}; + +/// The workflow when estabilishing a tunnelbroker connection: +/// - Client sends SessionRequest +/// - Tunnelbroker validates access_token with identity service +/// - Tunnelbroker emits an AMQP message declaring that it has opened a new +/// connection with a given device, so that the respective tunnelbroker +/// instance can close the existing connection. +/// - Tunnelbroker returns a session_id representing that the connection was +/// accepted +/// - Tunnelbroker will flush all messages related to device from RabbitMQ. +/// This must be done first before flushing DynamoDB to prevent duplicated +/// messages. +/// - Tunnelbroker flushes all messages in DynamoDB +/// - Tunnelbroker orders messages by creation date (oldest first), and sends +/// messages to device +/// - Tunnelbroker then polls for incoming messages from device + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum DeviceTypes { + Mobile, + Web, + Keyserver, +} + +/// Message sent by a client to tunnelbroker to initiate a websocket +/// session. Tunnelbroker will then validate the access token with identity +/// service before continuing with the request. +#[derive(Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub struct SessionRequest { + pub device_id: String, + pub access_token: String, + pub notify_token: Option, + pub device_type: DeviceTypes, + pub device_app_version: Option, + pub device_os: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct SessionResponse { + pub session_id: String, +} + +#[cfg(test)] +mod session_tests { + use super::*; + + #[test] + fn test_session_deserialization() { + let example_payload = r#"{ + "type": "sessionRequest", + "accessToken": "xkdeifjsld", + "deviceId": "foo", + "deviceType": "keyserver" + }"#; + + let request = + serde_json::from_str::(example_payload).unwrap(); + assert_eq!(request.device_id, "foo"); + assert_eq!(request.access_token, "xkdeifjsld"); + assert_eq!(request.device_os, None); + assert_eq!(request.device_type, DeviceTypes::Keyserver); + } +}