Page MenuHomePhorge

D9316.1768403544.diff
No OneTemporary

Size
8 KB
Referenced Files
None
Subscribers
None

D9316.1768403544.diff

diff --git a/services/commtest/tests/tunnelbroker_integration_tests.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs
--- a/services/commtest/tests/tunnelbroker_integration_tests.rs
+++ b/services/commtest/tests/tunnelbroker_integration_tests.rs
@@ -46,47 +46,3 @@
serde_json::from_str(&response.to_text().unwrap()).unwrap();
assert_eq!(serialized_response, refresh_request);
}
-
-/// Test that a message to an offline device gets pushed to dynamodb
-/// then recalled once a device connects
-#[tokio::test]
-async fn persist_messages() {
- let device_info = create_device(None).await;
-
- // Send request for keyserver to refresh keys (identity service)
- let mut tunnelbroker_client =
- TunnelbrokerServiceClient::connect("http://localhost:50051")
- .await
- .unwrap();
-
- let refresh_request = RefreshKeyRequest {
- device_id: device_info.device_id.to_string(),
- number_of_keys: 5,
- };
-
- let payload = serde_json::to_string(&refresh_request).unwrap();
- let request = MessageToDevice {
- device_id: device_info.device_id.to_string(),
- payload,
- };
- let grpc_message = tonic::Request::new(request);
- tunnelbroker_client
- .send_message_to_device(grpc_message)
- .await
- .unwrap();
-
- // Wait one second to ensure that message had time to persist
- use std::{thread, time};
- let ten_millis = time::Duration::from_millis(50);
- thread::sleep(ten_millis);
-
- let mut socket = create_socket(&device_info).await;
- // Have keyserver receive any websocket messages
- if let Some(Ok(response)) = socket.next().await {
- // Check that message received by keyserver matches what identity server
- // issued
- let serialized_response: RefreshKeyRequest =
- serde_json::from_str(&response.to_text().unwrap()).unwrap();
- assert_eq!(serialized_response, refresh_request);
- };
-}
diff --git a/services/commtest/tests/tunnelbroker_persist_tests.rs b/services/commtest/tests/tunnelbroker_persist_tests.rs
new file mode 100644
--- /dev/null
+++ b/services/commtest/tests/tunnelbroker_persist_tests.rs
@@ -0,0 +1,95 @@
+mod proto {
+ tonic::include_proto!("tunnelbroker");
+}
+
+use commtest::identity::device::create_device;
+use commtest::identity::olm_account_infos::{
+ MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2,
+};
+use commtest::tunnelbroker::socket::create_socket;
+use futures_util::{SinkExt, StreamExt};
+use proto::tunnelbroker_service_client::TunnelbrokerServiceClient;
+use proto::MessageToDevice;
+use std::time::Duration;
+use tokio::time::sleep;
+use tokio_tungstenite::tungstenite::Message;
+use tunnelbroker_messages::{
+ MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest,
+};
+
+/// Tests that a message to an offline device gets pushed to dynamodb
+/// then recalled once a device connects
+
+#[tokio::test]
+async fn persist_grpc_messages() {
+ let device_info = create_device(None).await;
+
+ // Send request for keyserver to refresh keys (identity service)
+ let mut tunnelbroker_client =
+ TunnelbrokerServiceClient::connect("http://localhost:50051")
+ .await
+ .unwrap();
+
+ let refresh_request = RefreshKeyRequest {
+ device_id: device_info.device_id.to_string(),
+ number_of_keys: 5,
+ };
+
+ let payload = serde_json::to_string(&refresh_request).unwrap();
+ let request = MessageToDevice {
+ device_id: device_info.device_id.to_string(),
+ payload,
+ };
+ let grpc_message = tonic::Request::new(request);
+ tunnelbroker_client
+ .send_message_to_device(grpc_message)
+ .await
+ .unwrap();
+
+ // Wait a specified duration to ensure that message had time to persist
+ sleep(Duration::from_millis(100)).await;
+
+ let mut socket = create_socket(&device_info).await;
+ // Have keyserver receive any websocket messages
+ if let Some(Ok(response)) = socket.next().await {
+ // Check that message received by keyserver matches what identity server
+ // issued
+ let serialized_response: RefreshKeyRequest =
+ serde_json::from_str(&response.to_text().unwrap()).unwrap();
+ assert_eq!(serialized_response, refresh_request);
+ };
+}
+
+#[tokio::test]
+async fn persist_websocket_messages() {
+ let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await;
+ let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await;
+
+ // Send message to not connected client
+ let payload = "persisted message";
+ let request = WebsocketMessageToDevice {
+ device_id: receiver.device_id.clone(),
+ payload: payload.to_string(),
+ };
+
+ let serialized_request = serde_json::to_string(&request)
+ .expect("Failed to serialize message to device");
+
+ let mut sender_socket = create_socket(&sender).await;
+ sender_socket
+ .send(Message::Text(serialized_request))
+ .await
+ .expect("Failed to send message");
+
+ // Wait a specified duration to ensure that message had time to persist
+ sleep(Duration::from_millis(100)).await;
+
+ // Connect receiver
+ let mut receiver_socket = create_socket(&receiver).await;
+
+ // Receive message
+ if let Some(Ok(response)) = receiver_socket.next().await {
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(payload, received_payload);
+ };
+}
diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs
--- a/services/tunnelbroker/src/grpc/mod.rs
+++ b/services/tunnelbroker/src/grpc/mod.rs
@@ -49,7 +49,7 @@
"",
&message.device_id,
BasicPublishOptions::default(),
- &message.payload.as_bytes(),
+ message.payload.as_bytes(),
BasicProperties::default(),
)
.await
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -1,11 +1,16 @@
+use aws_sdk_dynamodb::error::SdkError;
+use aws_sdk_dynamodb::operation::put_item::PutItemError;
use derive_more;
use futures_util::stream::SplitSink;
use futures_util::SinkExt;
use futures_util::StreamExt;
use hyper_tungstenite::{tungstenite::Message, WebSocketStream};
use lapin::message::Delivery;
-use lapin::options::{BasicConsumeOptions, QueueDeclareOptions};
+use lapin::options::{
+ BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
+};
use lapin::types::FieldTable;
+use lapin::BasicProperties;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tracing::{debug, error, info};
@@ -27,6 +32,7 @@
tx: SplitSink<WebSocketStream<S>, Message>,
db_client: DatabaseClient,
pub device_info: DeviceInfo,
+ amqp_channel: lapin::Channel,
// Stream of messages from AMQP endpoint
amqp_consumer: lapin::Consumer,
}
@@ -41,6 +47,7 @@
AmqpError(lapin::Error),
InternalError,
UnauthorizedDevice,
+ PersistenceError(SdkError<PutItemError>),
}
pub fn consume_error<T>(result: Result<T, SessionError>) {
@@ -140,6 +147,7 @@
tx,
db_client,
device_info,
+ amqp_channel: amqp_channel.clone(),
amqp_consumer,
})
}
@@ -148,7 +156,43 @@
&self,
msg: Message,
) -> Result<(), SessionError> {
- debug!("Received frame: {:?}", msg);
+ let text_msg = match msg {
+ Message::Text(payload) => payload,
+ _ => {
+ error!("Client sent invalid message type");
+ return Err(SessionError::InvalidMessage);
+ }
+ };
+
+ let serialized_message = serde_json::from_str::<Messages>(&text_msg)?;
+
+ match serialized_message {
+ Messages::MessageToDevice(message_to_device) => {
+ debug!("Received message for {}", message_to_device.device_id);
+ self
+ .db_client
+ .persist_message(
+ message_to_device.device_id.as_str(),
+ message_to_device.payload.as_str(),
+ )
+ .await?;
+
+ self
+ .amqp_channel
+ .basic_publish(
+ "",
+ &message_to_device.device_id,
+ BasicPublishOptions::default(),
+ message_to_device.payload.as_bytes(),
+ BasicProperties::default(),
+ )
+ .await?;
+ }
+ _ => {
+ error!("Client sent invalid message type");
+ return Err(SessionError::InvalidMessage);
+ }
+ }
Ok(())
}

File Metadata

Mime Type
text/plain
Expires
Wed, Jan 14, 3:12 PM (7 m, 28 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5932055
Default Alt Text
D9316.1768403544.diff (8 KB)

Event Timeline