diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs --- a/services/commtest/tests/tunnelbroker_integration_tests.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -14,9 +14,7 @@ use tokio::time::sleep; use tokio_tungstenite::tungstenite::Message; -use tunnelbroker_messages::{ - MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest, -}; +use tunnelbroker_messages::{MessageToDeviceRequest, RefreshKeyRequest}; #[tokio::test] async fn send_refresh_request() { @@ -63,15 +61,18 @@ let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await; let messages = vec![ - WebsocketMessageToDevice { + MessageToDeviceRequest { + client_message_id: "5".to_string(), device_id: receiver.device_id.clone(), payload: "first message".to_string(), }, - WebsocketMessageToDevice { + MessageToDeviceRequest { + client_message_id: "2".to_string(), device_id: receiver.device_id.clone(), payload: "second message".to_string(), }, - WebsocketMessageToDevice { + MessageToDeviceRequest { + client_message_id: "7".to_string(), device_id: receiver.device_id.clone(), payload: "third message".to_string(), }, diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs --- a/services/commtest/tests/tunnelbroker_persist_tests.rs +++ b/services/commtest/tests/tunnelbroker_persist_tests.rs @@ -13,9 +13,7 @@ use std::time::Duration; use tokio::time::sleep; use tokio_tungstenite::tungstenite::Message; -use tunnelbroker_messages::{ - MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest, -}; +use tunnelbroker_messages::{MessageToDeviceRequest, RefreshKeyRequest}; /// Tests that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects @@ -67,7 +65,8 @@ // Send message to not connected client let payload = "persisted message"; - let request = WebsocketMessageToDevice { + let request = MessageToDeviceRequest { + client_message_id: "mockID".to_string(), device_id: receiver.device_id.clone(), payload: payload.to_string(), }; diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs --- a/services/tunnelbroker/src/websockets/session.rs +++ b/services/tunnelbroker/src/websockets/session.rs @@ -160,14 +160,17 @@ let serialized_message = serde_json::from_str::(&msg)?; match serialized_message { - Messages::MessageToDevice(message_to_device) => { - debug!("Received message for {}", message_to_device.device_id); + Messages::MessageToDeviceRequest(message_to_device_request) => { + debug!( + "Received message for {}", + message_to_device_request.device_id + ); self .db_client .persist_message( - message_to_device.device_id.as_str(), - message_to_device.payload.as_str(), - "message_id", + &message_to_device_request.device_id, + &message_to_device_request.payload, + &message_to_device_request.client_message_id, ) .await?; @@ -175,9 +178,9 @@ .amqp_channel .basic_publish( "", - &message_to_device.device_id, + &message_to_device_request.device_id, BasicPublishOptions::default(), - message_to_device.payload.as_bytes(), + message_to_device_request.payload.as_bytes(), BasicProperties::default(), ) .await?; diff --git a/shared/tunnelbroker_messages/src/messages/message_to_device_request.rs b/shared/tunnelbroker_messages/src/messages/message_to_device_request.rs new file mode 100644 --- /dev/null +++ b/shared/tunnelbroker_messages/src/messages/message_to_device_request.rs @@ -0,0 +1,34 @@ +// Message sent from WebSocket clients to Tunnelbroker + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(tag = "type", rename_all = "camelCase")] +pub struct MessageToDeviceRequest { + #[serde(rename = "clientMessageID")] + pub client_message_id: String, + #[serde(rename = "deviceID")] + pub device_id: String, + pub payload: String, +} + +#[cfg(test)] +mod message_to_device_request_tests { + use super::*; + + #[test] + fn test_message_to_device_request_deserialization() { + let example_payload = r#"{ + "type": "MessageToDeviceRequest", + "clientMessageID": "client123", + "deviceID": "alice", + "payload": "message from Bob" + }"#; + + let request = + serde_json::from_str::(example_payload).unwrap(); + assert_eq!(request.client_message_id, "client123"); + assert_eq!(request.device_id, "alice"); + assert_eq!(request.payload, "message from Bob"); + } +} diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -1,18 +1,24 @@ // Messages sent between Tunnelbroker and a device pub mod keys; pub mod message_to_device; +pub mod message_to_device_request; pub mod session; pub use keys::*; pub use message_to_device::*; +pub use message_to_device_request::*; pub use session::*; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] pub enum Messages { RefreshKeysRequest(RefreshKeyRequest), ConnectionInitializationMessage(ConnectionInitializationMessage), + // MessageToDeviceRequest must be placed before MessageToDevice. + // This is due to serde's pattern matching behavior where it prioritizes + // the first matching pattern it encounters. + MessageToDeviceRequest(MessageToDeviceRequest), MessageToDevice(MessageToDevice), } diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs --- a/shared/tunnelbroker_messages/src/messages/session.rs +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -29,7 +29,7 @@ /// Message sent by a client to Tunnelbroker to initiate a websocket /// session. Tunnelbroker will then validate the access token with identity /// service before continuing with the request. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] pub struct ConnectionInitializationMessage { #[serde(rename = "deviceID")]