Page MenuHomePhabricator

D9459.id32041.diff
No OneTemporary

D9459.id32041.diff

diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs
--- a/services/commtest/tests/tunnelbroker_integration_tests.rs
+++ b/services/commtest/tests/tunnelbroker_integration_tests.rs
@@ -1,12 +1,22 @@
mod proto {
tonic::include_proto!("tunnelbroker");
}
+
use commtest::identity::device::create_device;
+use commtest::identity::olm_account_infos::{
+ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2,
+};
use commtest::tunnelbroker::socket::create_socket;
-use futures_util::StreamExt;
+use futures_util::{SinkExt, StreamExt};
use proto::tunnelbroker_service_client::TunnelbrokerServiceClient;
use proto::MessageToDevice;
-use tunnelbroker_messages::RefreshKeyRequest;
+use std::time::Duration;
+use tokio::time::sleep;
+use tokio_tungstenite::tungstenite::Message;
+
+use tunnelbroker_messages::{
+ MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest,
+};
#[tokio::test]
async fn send_refresh_request() {
@@ -46,3 +56,56 @@
serde_json::from_str(&response.to_text().unwrap()).unwrap();
assert_eq!(serialized_response, refresh_request);
}
+
+#[tokio::test]
+async fn test_messages_order() {
+ let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await;
+ let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await;
+
+ let messages = vec![
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "first message".to_string(),
+ },
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "second message".to_string(),
+ },
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "third message".to_string(),
+ },
+ ];
+
+ let serialized_messages: Vec<_> = messages
+ .iter()
+ .map(|message| {
+ serde_json::to_string(message)
+ .expect("Failed to serialize message to device")
+ })
+ .map(Message::text)
+ .collect();
+
+ let (mut sender_socket, _) = create_socket(&sender).await.split();
+
+ for msg in serialized_messages.clone() {
+ sender_socket
+ .send(msg)
+ .await
+ .expect("Failed to send the message over WebSocket");
+ }
+
+ // Wait a specified duration to ensure that message had time to persist
+ sleep(Duration::from_millis(100)).await;
+
+ let mut receiver_socket = create_socket(&receiver).await;
+
+ for msg in messages {
+ if let Some(Ok(response)) = receiver_socket.next().await {
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(msg.payload, received_payload);
+ } else {
+ panic!("Unable to receive message");
+ }
+ }
+}
diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf
--- a/services/terraform/modules/shared/dynamodb.tf
+++ b/services/terraform/modules/shared/dynamodb.tf
@@ -82,7 +82,7 @@
resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" {
name = "tunnelbroker-undelivered-messages"
hash_key = "deviceID"
- range_key = "createdAt"
+ range_key = "messageID"
billing_mode = "PAY_PER_REQUEST"
attribute {
@@ -91,8 +91,8 @@
}
attribute {
- name = "createdAt"
- type = "N"
+ name = "messageID"
+ type = "S"
}
}
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -15,14 +15,17 @@
// - (primary key) = (deviceID: Partition Key, createdAt: Sort Key)
// - deviceID: The public key of a device's olm identity key
// - payload: Message to be delivered. See shared/tunnelbroker_messages.
- // - createdAt: UNIX timestamp of when the item was inserted.
- // Timestamp is needed to order the messages correctly to the device.
+ // - messageID = [createdAt]#[clientMessageID]
+ // - createdAd: UNIX timestamp of when the item was inserted.
+ // Timestamp is needed to order the messages correctly to the device.
+ // Timestamp format is ISO 8601 to handle lexicographical sorting.
+ // - clientMessageID: Message ID generated on client using UUID Version 4.
pub mod undelivered_messages {
pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages";
pub const PARTITION_KEY: &str = "deviceID";
pub const DEVICE_ID: &str = "deviceID";
pub const PAYLOAD: &str = "payload";
- pub const CREATED_AT: &str = "createdAt";
- pub const SORT_KEY: &str = "createdAt";
+ pub const MESSAGE_ID: &str = "messageID";
+ pub const SORT_KEY: &str = "messageID";
}
}
diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs
--- a/services/tunnelbroker/src/database/message.rs
+++ b/services/tunnelbroker/src/database/message.rs
@@ -3,13 +3,13 @@
use aws_sdk_dynamodb::types::AttributeValue;
use crate::constants::dynamodb::undelivered_messages::{
- CREATED_AT, DEVICE_ID, PAYLOAD,
+ DEVICE_ID, MESSAGE_ID, PAYLOAD,
};
#[derive(Debug)]
pub struct DeviceMessage {
pub device_id: String,
- pub created_at: String,
+ pub message_id: String,
pub payload: String,
}
@@ -28,10 +28,10 @@
.as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
- let created_at: String = hashmap
- .get(CREATED_AT)
+ let message_id: String = hashmap
+ .get(MESSAGE_ID)
.ok_or(MessageErrors::SerializationError)?
- .as_n()
+ .as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
let payload: String = hashmap
@@ -43,7 +43,7 @@
Ok(DeviceMessage {
device_id,
- created_at,
+ message_id,
payload,
})
}
diff --git a/services/tunnelbroker/src/database/message_id.rs b/services/tunnelbroker/src/database/message_id.rs
--- a/services/tunnelbroker/src/database/message_id.rs
+++ b/services/tunnelbroker/src/database/message_id.rs
@@ -1,12 +1,12 @@
use chrono::{DateTime, Utc};
#[derive(Debug, derive_more::Display, derive_more::Error)]
-enum ParseMessageIdError {
+pub enum ParseMessageIdError {
InvalidTimestamp(chrono::ParseError),
InvalidFormat,
}
#[derive(Debug)]
-struct MessageID {
+pub struct MessageID {
timestamp: DateTime<Utc>,
client_message_id: String,
}
diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs
--- a/services/tunnelbroker/src/database/mod.rs
+++ b/services/tunnelbroker/src/database/mod.rs
@@ -3,12 +3,11 @@
use aws_sdk_dynamodb::operation::delete_item::{
DeleteItemError, DeleteItemOutput,
};
-use aws_sdk_dynamodb::operation::put_item::{PutItemError, PutItemOutput};
+use aws_sdk_dynamodb::operation::put_item::PutItemError;
use aws_sdk_dynamodb::operation::query::QueryError;
use aws_sdk_dynamodb::{types::AttributeValue, Client};
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error};
use crate::constants::dynamodb::undelivered_messages::{
@@ -18,6 +17,7 @@
pub mod message;
pub mod message_id;
+use crate::database::message_id::MessageID;
pub use message::*;
#[derive(Clone)]
@@ -25,13 +25,6 @@
client: Arc<Client>,
}
-pub fn unix_timestamp() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("System time is misconfigured")
- .as_secs()
-}
-
pub fn handle_ddb_error<E>(db_error: SdkError<E>) -> tonic::Status {
match db_error {
SdkError::TimeoutError(_) | SdkError::ServiceError(_) => {
@@ -57,22 +50,27 @@
&self,
device_id: &str,
payload: &str,
- ) -> Result<PutItemOutput, SdkError<PutItemError>> {
+ client_message_id: &str,
+ ) -> Result<String, SdkError<PutItemError>> {
+ let message_id: String =
+ MessageID::new(client_message_id.to_string()).into();
+
let device_av = AttributeValue::S(device_id.to_string());
let payload_av = AttributeValue::S(payload.to_string());
- let created_av = AttributeValue::N(unix_timestamp().to_string());
+ let message_id_av = AttributeValue::S(message_id.clone());
let request = self
.client
.put_item()
.table_name(TABLE_NAME)
.item(PARTITION_KEY, device_av)
- .item(SORT_KEY, created_av)
+ .item(SORT_KEY, message_id_av)
.item(PAYLOAD, payload_av);
debug!("Persisting message to device: {}", &device_id);
- request.send().await
+ request.send().await?;
+ Ok(message_id)
}
pub async fn retrieve_messages(
@@ -104,7 +102,7 @@
pub async fn delete_message(
&self,
device_id: &str,
- created_at: &str,
+ message_id: &str,
) -> Result<DeleteItemOutput, SdkError<DeleteItemError>> {
debug!("Deleting message for device: {}", device_id);
@@ -115,7 +113,7 @@
),
(
SORT_KEY.to_string(),
- AttributeValue::N(created_at.to_string()),
+ AttributeValue::S(message_id.to_string()),
),
]);
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -39,7 +39,7 @@
self
.client
- .persist_message(&message.device_id, &message.payload)
+ .persist_message(&message.device_id, &message.payload, "message_id")
.await
.map_err(handle_ddb_error)?;
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -175,6 +175,7 @@
.persist_message(
message_to_device.device_id.as_str(),
message_to_device.payload.as_str(),
+ "message_id",
)
.await?;
@@ -222,7 +223,7 @@
self.send_message_to_device(device_message.payload).await;
if let Err(e) = self
.db_client
- .delete_message(&self.device_info.device_id, &device_message.created_at)
+ .delete_message(&self.device_info.device_id, &device_message.message_id)
.await
{
error!("Failed to delete message: {}:", e);

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 24, 2:19 AM (19 h, 27 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2697160
Default Alt Text
D9459.id32041.diff (9 KB)

Event Timeline