Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33307315
D9467.1768797480.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
8 KB
Referenced Files
None
Subscribers
None
D9467.1768797480.diff
View Options
diff --git a/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs b/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs
new file mode 100644
--- /dev/null
+++ b/services/commtest/tests/tunnelbroker_sender_confirmation_tests.rs
@@ -0,0 +1,95 @@
+use commtest::identity::device::create_device;
+use commtest::identity::olm_account_infos::{
+ DEFAULT_CLIENT_KEYS, MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2,
+};
+use commtest::tunnelbroker::socket::create_socket;
+use futures_util::{SinkExt, StreamExt};
+use tokio_tungstenite::tungstenite::Message;
+use tunnelbroker_messages::{
+ MessageSentStatus, MessageToDeviceRequest, MessageToDeviceRequestStatus,
+};
+
+/// Tests of responses sent from Tunnelberoker to client
+/// trying to send message to other device
+
+#[tokio::test]
+async fn get_confirmation() {
+ let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await;
+ let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await;
+
+ let client_message_id = "mockID".to_string();
+
+ // Send message to not connected client
+ let payload = "persisted message";
+ let request = MessageToDeviceRequest {
+ client_message_id: client_message_id.clone(),
+ device_id: receiver.device_id.clone(),
+ payload: payload.to_string(),
+ };
+
+ let serialized_request = serde_json::to_string(&request)
+ .expect("Failed to serialize message to device");
+
+ let mut sender_socket = create_socket(&sender).await;
+ sender_socket
+ .send(Message::Text(serialized_request))
+ .await
+ .expect("Failed to send message");
+
+ if let Some(Ok(response)) = sender_socket.next().await {
+ let expected_response = MessageToDeviceRequestStatus {
+ client_message_ids: vec![MessageSentStatus::Success(client_message_id)],
+ };
+ let expected_payload = serde_json::to_string(&expected_response).unwrap();
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(received_payload, expected_payload);
+ };
+
+ // Connect receiver to flush DDB and avoid polluting other tests
+ let mut receiver_socket = create_socket(&receiver).await;
+ if let Some(Ok(response)) = receiver_socket.next().await {
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(payload, received_payload);
+ };
+}
+
+#[tokio::test]
+async fn get_serialization_error() {
+ let sender = create_device(Some(&DEFAULT_CLIENT_KEYS)).await;
+ let message = "some bad json".to_string();
+
+ let mut sender_socket = create_socket(&sender).await;
+ sender_socket
+ .send(Message::Text(message.clone()))
+ .await
+ .expect("Failed to send message");
+
+ if let Some(Ok(response)) = sender_socket.next().await {
+ let expected_response = MessageToDeviceRequestStatus {
+ client_message_ids: vec![MessageSentStatus::SerializationError(message)],
+ };
+ let expected_payload = serde_json::to_string(&expected_response).unwrap();
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(received_payload, expected_payload);
+ };
+}
+
+#[tokio::test]
+async fn get_invalid_request_error() {
+ let sender = create_device(Some(&DEFAULT_CLIENT_KEYS)).await;
+
+ let mut sender_socket = create_socket(&sender).await;
+ sender_socket
+ .send(Message::Binary(vec![]))
+ .await
+ .expect("Failed to send message");
+
+ if let Some(Ok(response)) = sender_socket.next().await {
+ let expected_response = MessageToDeviceRequestStatus {
+ client_message_ids: vec![MessageSentStatus::InvalidRequest],
+ };
+ let expected_payload = serde_json::to_string(&expected_response).unwrap();
+ let received_payload = response.to_text().unwrap();
+ assert_eq!(received_payload, expected_payload);
+ };
+}
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -16,6 +16,7 @@
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tracing::{debug, error, info};
+use tunnelbroker_messages::{MessageSentStatus, MessageToDeviceRequestStatus};
type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
@@ -194,10 +195,24 @@
session.send_message_to_device(Message::Pong(msg)).await;
}
Message::Text(msg) => {
- session::consume_error(session.handle_websocket_frame_from_device(msg).await);
+ let message_status = session.handle_websocket_frame_from_device(msg).await;
+ let request_status = MessageToDeviceRequestStatus {
+ client_message_ids: vec![message_status]
+ };
+ if let Ok(response) = serde_json::to_string(&request_status) {
+ session.send_message_to_device(Message::text(response)).await;
+ } else {
+ break;
+ }
}
_ => {
error!("Client sent invalid message type");
+ let confirmation = MessageToDeviceRequestStatus {client_message_ids: vec![MessageSentStatus::InvalidRequest]};
+ if let Ok(response) = serde_json::to_string(&confirmation) {
+ session.send_message_to_device(Message::text(response)).await;
+ } else {
+ break;
+ }
}
}
},
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -15,7 +15,10 @@
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tracing::{debug, error, info};
-use tunnelbroker_messages::{session::DeviceTypes, Messages};
+use tunnelbroker_messages::{
+ send_confirmation::Failure, send_confirmation::MessageSentStatus,
+ session::DeviceTypes, MessageToDeviceRequest, Messages,
+};
use crate::database::{self, DatabaseClient, DeviceMessage};
use crate::error::Error;
@@ -153,45 +156,64 @@
})
}
- pub async fn handle_websocket_frame_from_device(
+ pub async fn handle_message_to_device(
&self,
- msg: String,
+ message_request: &MessageToDeviceRequest,
) -> Result<(), SessionError> {
- let serialized_message = serde_json::from_str::<Messages>(&msg)?;
+ let message_id = self
+ .db_client
+ .persist_message(
+ &message_request.device_id,
+ &message_request.payload,
+ &message_request.client_message_id,
+ )
+ .await?;
+
+ let publish_result = self
+ .amqp_channel
+ .basic_publish(
+ "",
+ &message_request.device_id,
+ BasicPublishOptions::default(),
+ message_request.payload.as_bytes(),
+ BasicProperties::default(),
+ )
+ .await;
+
+ if let Err(publish_error) = publish_result {
+ self
+ .db_client
+ .delete_message(&self.device_info.device_id, &message_id)
+ .await
+ .expect("Error deleting message");
+ return Err(SessionError::AmqpError(publish_error));
+ }
+ Ok(())
+ }
+
+ pub async fn handle_websocket_frame_from_device(
+ &mut self,
+ msg: String,
+ ) -> MessageSentStatus {
+ let Ok(serialized_message) = serde_json::from_str::<Messages>(&msg) else {
+ return MessageSentStatus::SerializationError(msg);
+ };
match serialized_message {
- Messages::MessageToDeviceRequest(message_to_device_request) => {
- debug!(
- "Received message for {}",
- message_to_device_request.device_id
- );
- self
- .db_client
- .persist_message(
- &message_to_device_request.device_id,
- &message_to_device_request.payload,
- &message_to_device_request.client_message_id,
- )
- .await?;
+ Messages::MessageToDeviceRequest(message_request) => {
+ debug!("Received message for {}", message_request.device_id);
- self
- .amqp_channel
- .basic_publish(
- "",
- &message_to_device_request.device_id,
- BasicPublishOptions::default(),
- message_to_device_request.payload.as_bytes(),
- BasicProperties::default(),
- )
- .await?;
+ let result = self.handle_message_to_device(&message_request).await;
+ self.get_message_to_device_status(
+ &message_request.client_message_id,
+ result,
+ )
}
_ => {
error!("Client sent invalid message type");
- return Err(SessionError::InvalidMessage);
+ MessageSentStatus::InvalidRequest
}
}
-
- Ok(())
}
pub async fn next_amqp_message(
@@ -269,4 +291,18 @@
error!("Failed to delete queue: {}", e);
}
}
+
+ pub fn get_message_to_device_status(
+ &mut self,
+ client_message_id: &str,
+ result: Result<(), SessionError>,
+ ) -> MessageSentStatus {
+ match result {
+ Ok(()) => MessageSentStatus::Success(client_message_id.to_string()),
+ Err(err) => MessageSentStatus::Error(Failure {
+ id: client_message_id.to_string(),
+ error: err.to_string(),
+ }),
+ }
+ }
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Jan 19, 4:38 AM (17 h, 25 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5954380
Default Alt Text
D9467.1768797480.diff (8 KB)
Attached To
Mode
D9467: [Tunnelbroker] implement Tunnelbroker confirmation for sender
Attached
Detach File
Event Timeline
Log In to Comment