Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3521087
D9459.id32041.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Referenced Files
None
Subscribers
None
D9459.id32041.diff
View Options
diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs
--- a/services/commtest/tests/tunnelbroker_integration_tests.rs
+++ b/services/commtest/tests/tunnelbroker_integration_tests.rs
@@ -1,12 +1,22 @@
mod proto {
tonic::include_proto!("tunnelbroker");
}
+
use commtest::identity::device::create_device;
+use commtest::identity::olm_account_infos::{
+ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2,
+};
use commtest::tunnelbroker::socket::create_socket;
-use futures_util::StreamExt;
+use futures_util::{SinkExt, StreamExt};
use proto::tunnelbroker_service_client::TunnelbrokerServiceClient;
use proto::MessageToDevice;
-use tunnelbroker_messages::RefreshKeyRequest;
+use std::time::Duration;
+use tokio::time::sleep;
+use tokio_tungstenite::tungstenite::Message;
+
+use tunnelbroker_messages::{
+ MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest,
+};
#[tokio::test]
async fn send_refresh_request() {
@@ -46,3 +56,56 @@
serde_json::from_str(&response.to_text().unwrap()).unwrap();
assert_eq!(serialized_response, refresh_request);
}
+
+#[tokio::test]
+async fn test_messages_order() {
+ let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await;
+ let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await;
+
+ let messages = vec![
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "first message".to_string(),
+ },
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "second message".to_string(),
+ },
+ WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: "third message".to_string(),
+ },
+ ];
+
+ let serialized_messages: Vec<_> = messages
+ .iter()
+ .map(|message| {
+ serde_json::to_string(message)
+ .expect("Failed to serialize message to device")
+ })
+ .map(Message::text)
+ .collect();
+
+ let (mut sender_socket, _) = create_socket(&sender).await.split();
+
+ for msg in serialized_messages.clone() {
+ sender_socket
+ .send(msg)
+ .await
+ .expect("Failed to send the message over WebSocket");
+ }
+
+ // Wait a specified duration to ensure that message had time to persist
+ sleep(Duration::from_millis(100)).await;
+
+ let mut receiver_socket = create_socket(&receiver).await;
+
+ for msg in messages {
+ if let Some(Ok(response)) = receiver_socket.next().await {
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(msg.payload, received_payload);
+ } else {
+ panic!("Unable to receive message");
+ }
+ }
+}
diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf
--- a/services/terraform/modules/shared/dynamodb.tf
+++ b/services/terraform/modules/shared/dynamodb.tf
@@ -82,7 +82,7 @@
resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" {
name = "tunnelbroker-undelivered-messages"
hash_key = "deviceID"
- range_key = "createdAt"
+ range_key = "messageID"
billing_mode = "PAY_PER_REQUEST"
attribute {
@@ -91,8 +91,8 @@
}
attribute {
- name = "createdAt"
- type = "N"
+ name = "messageID"
+ type = "S"
}
}
diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs
--- a/services/tunnelbroker/src/constants.rs
+++ b/services/tunnelbroker/src/constants.rs
@@ -15,14 +15,17 @@
// - (primary key) = (deviceID: Partition Key, createdAt: Sort Key)
// - deviceID: The public key of a device's olm identity key
// - payload: Message to be delivered. See shared/tunnelbroker_messages.
- // - createdAt: UNIX timestamp of when the item was inserted.
- // Timestamp is needed to order the messages correctly to the device.
+ // - messageID = [createdAt]#[clientMessageID]
+ // - createdAd: UNIX timestamp of when the item was inserted.
+ // Timestamp is needed to order the messages correctly to the device.
+ // Timestamp format is ISO 8601 to handle lexicographical sorting.
+ // - clientMessageID: Message ID generated on client using UUID Version 4.
pub mod undelivered_messages {
pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages";
pub const PARTITION_KEY: &str = "deviceID";
pub const DEVICE_ID: &str = "deviceID";
pub const PAYLOAD: &str = "payload";
- pub const CREATED_AT: &str = "createdAt";
- pub const SORT_KEY: &str = "createdAt";
+ pub const MESSAGE_ID: &str = "messageID";
+ pub const SORT_KEY: &str = "messageID";
}
}
diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs
--- a/services/tunnelbroker/src/database/message.rs
+++ b/services/tunnelbroker/src/database/message.rs
@@ -3,13 +3,13 @@
use aws_sdk_dynamodb::types::AttributeValue;
use crate::constants::dynamodb::undelivered_messages::{
- CREATED_AT, DEVICE_ID, PAYLOAD,
+ DEVICE_ID, MESSAGE_ID, PAYLOAD,
};
#[derive(Debug)]
pub struct DeviceMessage {
pub device_id: String,
- pub created_at: String,
+ pub message_id: String,
pub payload: String,
}
@@ -28,10 +28,10 @@
.as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
- let created_at: String = hashmap
- .get(CREATED_AT)
+ let message_id: String = hashmap
+ .get(MESSAGE_ID)
.ok_or(MessageErrors::SerializationError)?
- .as_n()
+ .as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
let payload: String = hashmap
@@ -43,7 +43,7 @@
Ok(DeviceMessage {
device_id,
- created_at,
+ message_id,
payload,
})
}
diff --git a/services/tunnelbroker/src/database/message_id.rs b/services/tunnelbroker/src/database/message_id.rs
--- a/services/tunnelbroker/src/database/message_id.rs
+++ b/services/tunnelbroker/src/database/message_id.rs
@@ -1,12 +1,12 @@
use chrono::{DateTime, Utc};
#[derive(Debug, derive_more::Display, derive_more::Error)]
-enum ParseMessageIdError {
+pub enum ParseMessageIdError {
InvalidTimestamp(chrono::ParseError),
InvalidFormat,
}
#[derive(Debug)]
-struct MessageID {
+pub struct MessageID {
timestamp: DateTime<Utc>,
client_message_id: String,
}
diff --git a/services/tunnelbroker/src/database/mod.rs b/services/tunnelbroker/src/database/mod.rs
--- a/services/tunnelbroker/src/database/mod.rs
+++ b/services/tunnelbroker/src/database/mod.rs
@@ -3,12 +3,11 @@
use aws_sdk_dynamodb::operation::delete_item::{
DeleteItemError, DeleteItemOutput,
};
-use aws_sdk_dynamodb::operation::put_item::{PutItemError, PutItemOutput};
+use aws_sdk_dynamodb::operation::put_item::PutItemError;
use aws_sdk_dynamodb::operation::query::QueryError;
use aws_sdk_dynamodb::{types::AttributeValue, Client};
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error};
use crate::constants::dynamodb::undelivered_messages::{
@@ -18,6 +17,7 @@
pub mod message;
pub mod message_id;
+use crate::database::message_id::MessageID;
pub use message::*;
#[derive(Clone)]
@@ -25,13 +25,6 @@
client: Arc<Client>,
}
-pub fn unix_timestamp() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("System time is misconfigured")
- .as_secs()
-}
-
pub fn handle_ddb_error<E>(db_error: SdkError<E>) -> tonic::Status {
match db_error {
SdkError::TimeoutError(_) | SdkError::ServiceError(_) => {
@@ -57,22 +50,27 @@
&self,
device_id: &str,
payload: &str,
- ) -> Result<PutItemOutput, SdkError<PutItemError>> {
+ client_message_id: &str,
+ ) -> Result<String, SdkError<PutItemError>> {
+ let message_id: String =
+ MessageID::new(client_message_id.to_string()).into();
+
let device_av = AttributeValue::S(device_id.to_string());
let payload_av = AttributeValue::S(payload.to_string());
- let created_av = AttributeValue::N(unix_timestamp().to_string());
+ let message_id_av = AttributeValue::S(message_id.clone());
let request = self
.client
.put_item()
.table_name(TABLE_NAME)
.item(PARTITION_KEY, device_av)
- .item(SORT_KEY, created_av)
+ .item(SORT_KEY, message_id_av)
.item(PAYLOAD, payload_av);
debug!("Persisting message to device: {}", &device_id);
- request.send().await
+ request.send().await?;
+ Ok(message_id)
}
pub async fn retrieve_messages(
@@ -104,7 +102,7 @@
pub async fn delete_message(
&self,
device_id: &str,
- created_at: &str,
+ message_id: &str,
) -> Result<DeleteItemOutput, SdkError<DeleteItemError>> {
debug!("Deleting message for device: {}", device_id);
@@ -115,7 +113,7 @@
),
(
SORT_KEY.to_string(),
- AttributeValue::N(created_at.to_string()),
+ AttributeValue::S(message_id.to_string()),
),
]);
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -39,7 +39,7 @@
self
.client
- .persist_message(&message.device_id, &message.payload)
+ .persist_message(&message.device_id, &message.payload, "message_id")
.await
.map_err(handle_ddb_error)?;
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -175,6 +175,7 @@
.persist_message(
message_to_device.device_id.as_str(),
message_to_device.payload.as_str(),
+ "message_id",
)
.await?;
@@ -222,7 +223,7 @@
self.send_message_to_device(device_message.payload).await;
if let Err(e) = self
.db_client
- .delete_message(&self.device_info.device_id, &device_message.created_at)
+ .delete_message(&self.device_info.device_id, &device_message.message_id)
.await
{
error!("Failed to delete message: {}:", e);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 24, 2:19 AM (19 h, 27 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2697160
Default Alt Text
D9459.id32041.diff (9 KB)
Attached To
Mode
D9459: [Tunnelbroker] update DDB schema to handle client message ID
Attached
Detach File
Event Timeline
Log In to Comment