diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -1,12 +1,22 @@ mod proto { tonic::include_proto!("tunnelbroker"); } + use commtest::identity::device::create_device; +use commtest::identity::olm_account_infos::{ + MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2, +}; use commtest::tunnelbroker::socket::create_socket; -use futures_util::StreamExt; +use futures_util::{SinkExt, StreamExt}; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; -use tunnelbroker_messages::RefreshKeyRequest; +use std::time::Duration; +use tokio::time::sleep; +use tokio_tungstenite::tungstenite::Message; + +use tunnelbroker_messages::{ + MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest, +}; #[tokio::test] async fn send_refresh_request() { @@ -46,3 +56,56 @@ serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } + +#[tokio::test] +async fn test_messages_order() { + let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await; + let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; + + let messages = vec![ + WebsocketMessageToDevice { + device_id: receiver.device_id.clone(), + payload: "first message".to_string(), + }, + WebsocketMessageToDevice { + device_id: receiver.device_id.clone(), + payload: "second message".to_string(), + }, + WebsocketMessageToDevice { + device_id: receiver.device_id.clone(), + payload: "third message".to_string(), + }, + ]; + + let serialized_messages: Vec<_> = messages + .iter() + .map(|message| { + serde_json::to_string(message) + .expect("Failed to serialize message to device") + }) + .map(Message::text) + .collect(); + + let (mut sender_socket, _) = create_socket(&sender).await.split(); + + for msg in serialized_messages.clone() { + sender_socket + .send(msg) + .await + .expect("Failed to send the message over WebSocket"); + } + + // Wait a specified duration to ensure that message had time to persist + sleep(Duration::from_millis(100)).await; + + let mut receiver_socket = create_socket(&receiver).await; + + for msg in messages { + if let Some(Ok(response)) = receiver_socket.next().await { + let received_payload = response.to_text().unwrap(); + assert_eq!(msg.payload, received_payload); + } else { + panic!("Unable to receive message"); + } + } +} diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf --- a/services/terraform/modules/shared/dynamodb.tf +++ b/services/terraform/modules/shared/dynamodb.tf @@ -82,7 +82,7 @@ resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" { name = "tunnelbroker-undelivered-messages" hash_key = "deviceID" - range_key = "createdAt" + range_key = "messageID" billing_mode = "PAY_PER_REQUEST" attribute { @@ -91,8 +91,8 @@ } attribute { - name = "createdAt" - type = "N" + name = "messageID" + type = "S" } } diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock --- a/services/tunnelbroker/Cargo.lock +++ b/services/tunnelbroker/Cargo.lock @@ -59,6 +59,21 @@ "url", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.3.2" @@ -730,6 +745,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.48.0", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1248,6 +1277,29 @@ "tungstenite 0.20.0", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.3.0" @@ -2589,6 +2641,7 @@ "anyhow", "aws-config", "aws-sdk-dynamodb", + "chrono", "clap", "derive_more", "futures-util", @@ -2825,6 +2878,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.0", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -27,6 +27,7 @@ tunnelbroker_messages = { path = "../../shared/tunnelbroker_messages" } derive_more = "0.99.17" lapin = "2.2.1" +chrono = "0.4.31" [build-dependencies] tonic-build = "0.8" diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -15,14 +15,17 @@ // - (primary key) = (deviceID: Partition Key, createdAt: Sort Key) // - deviceID: The public key of a device's olm identity key // - payload: Message to be delivered. See shared/tunnelbroker_messages. - // - createdAt: UNIX timestamp of when the item was inserted. - // Timestamp is needed to order the messages correctly to the device. + // - messageID = [createdAt]#[clientMessageID] + // - createdAd: UNIX timestamp of when the item was inserted. + // Timestamp is needed to order the messages correctly to the device. + // Timestamp format is ISO 8601 to handle lexicographical sorting. + // - clientMessageID: Message ID generated on client using UUID Version 4. pub mod undelivered_messages { pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages"; pub const PARTITION_KEY: &str = "deviceID"; pub const DEVICE_ID: &str = "deviceID"; pub const PAYLOAD: &str = "payload"; - pub const CREATED_AT: &str = "createdAt"; - pub const SORT_KEY: &str = "createdAt"; + pub const MESSAGE_ID: &str = "messageID"; + pub const SORT_KEY: &str = "messageID"; } } diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs --- a/services/tunnelbroker/src/database/message.rs +++ b/services/tunnelbroker/src/database/message.rs @@ -3,13 +3,13 @@ use aws_sdk_dynamodb::types::AttributeValue; use crate::constants::dynamodb::undelivered_messages::{ - CREATED_AT, DEVICE_ID, PAYLOAD, + DEVICE_ID, MESSAGE_ID, PAYLOAD, }; #[derive(Debug)] pub struct DeviceMessage { pub device_id: String, - pub created_at: String, + pub message_id: String, pub payload: String, } @@ -28,10 +28,10 @@ .as_s() .map_err(|_| MessageErrors::SerializationError)? .to_string(); - let created_at: String = hashmap - .get(CREATED_AT) + let message_id: String = hashmap + .get(MESSAGE_ID) .ok_or(MessageErrors::SerializationError)? - .as_n() + .as_s() .map_err(|_| MessageErrors::SerializationError)? .to_string(); let payload: String = hashmap @@ -43,7 +43,7 @@ Ok(DeviceMessage { device_id, - created_at, + message_id, payload, }) } diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs --- a/services/tunnelbroker/src/database/mod.rs +++ b/services/tunnelbroker/src/database/mod.rs @@ -3,12 +3,12 @@ use aws_sdk_dynamodb::operation::delete_item::{ DeleteItemError, DeleteItemOutput, }; -use aws_sdk_dynamodb::operation::put_item::{PutItemError, PutItemOutput}; +use aws_sdk_dynamodb::operation::put_item::PutItemError; use aws_sdk_dynamodb::operation::query::QueryError; use aws_sdk_dynamodb::{types::AttributeValue, Client}; +use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, error}; use crate::constants::dynamodb::undelivered_messages::{ @@ -23,13 +23,6 @@ client: Arc, } -pub fn unix_timestamp() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("System time is misconfigured") - .as_secs() -} - pub fn handle_ddb_error(db_error: SdkError) -> tonic::Status { match db_error { SdkError::TimeoutError(_) | SdkError::ServiceError(_) => { @@ -55,22 +48,27 @@ &self, device_id: &str, payload: &str, - ) -> Result> { + client_message_id: &str, + ) -> Result> { + let created_at = Utc::now().to_rfc3339(); + let message_id = format!("{}#{}", created_at, client_message_id); + let device_av = AttributeValue::S(device_id.to_string()); let payload_av = AttributeValue::S(payload.to_string()); - let created_av = AttributeValue::N(unix_timestamp().to_string()); + let message_id_av = AttributeValue::S(message_id.clone()); let request = self .client .put_item() .table_name(TABLE_NAME) .item(PARTITION_KEY, device_av) - .item(SORT_KEY, created_av) + .item(SORT_KEY, message_id_av) .item(PAYLOAD, payload_av); debug!("Persisting message to device: {}", &device_id); - request.send().await + request.send().await?; + Ok(message_id) } pub async fn retrieve_messages( @@ -102,7 +100,7 @@ pub async fn delete_message( &self, device_id: &str, - created_at: &str, + message_id: &str, ) -> Result> { debug!("Deleting message for device: {}", device_id); @@ -113,7 +111,7 @@ ), ( SORT_KEY.to_string(), - AttributeValue::N(created_at.to_string()), + AttributeValue::S(message_id.to_string()), ), ]); diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -39,7 +39,7 @@ self .client - .persist_message(&message.device_id, &message.payload) + .persist_message(&message.device_id, &message.payload, "message_id") .await .map_err(handle_ddb_error)?; diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -175,6 +175,7 @@ .persist_message( message_to_device.device_id.as_str(), message_to_device.payload.as_str(), + "message_id", ) .await?; @@ -222,7 +223,7 @@ self.send_message_to_device(device_message.payload).await; if let Err(e) = self .db_client - .delete_message(&self.device_info.device_id, &device_message.created_at) + .delete_message(&self.device_info.device_id, &device_message.message_id) .await { error!("Failed to delete message: {}:", e);