diff --git a/services/tunnelbroker/src/server/mod.rs b/services/tunnelbroker/src/server/mod.rs --- a/services/tunnelbroker/src/server/mod.rs +++ b/services/tunnelbroker/src/server/mod.rs @@ -210,42 +210,48 @@ // When a client connects to the bidirectional messages stream, first we check // if there are undelivered messages in the database - let messages_from_database = - match getMessagesFromDatabase(&session_item.deviceID) { - Ok(messages) => messages, - Err(err) => return Err(Status::internal(err.what())), - }; - if messages_from_database.len() > 0 { - if let Err(err) = eraseMessagesFromAMQP(&session_item.deviceID) { - return Err(Status::internal(err.what())); - }; - let mut messages_to_response = vec![]; - for message in &messages_from_database { - messages_to_response.push(tunnelbroker::MessageToClientStruct { - message_id: message.messageID.clone(), - from_device_id: message.fromDeviceID.clone(), - payload: message.payload.clone(), - blob_hashes: vec![message.blobHashes.clone()], - }); - } - let result_from_writer = tx_writer( - &session_id, - &tx, - Ok(tunnelbroker::MessageToClient { - data: Some(tunnelbroker::message_to_client::Data::MessagesToDeliver( - tunnelbroker::MessagesToDeliver { - messages: messages_to_response, - }, - )), - }), - ); - if let Err(err) = result_from_writer.await { - debug!( + if !isConfigParameterSet("messages.skip_persistence").expect( + "Error while checking the `messages.skip_persistence` config file parameter", + ) { + let messages_from_database = + match getMessagesFromDatabase(&session_item.deviceID) { + Ok(messages) => messages, + Err(err) => return Err(Status::internal(err.what())), + }; + if messages_from_database.len() > 0 { + if let Err(err) = eraseMessagesFromAMQP(&session_item.deviceID) { + return Err(Status::internal(err.what())); + }; + let mut messages_to_response = vec![]; + for message in &messages_from_database { + messages_to_response.push(tunnelbroker::MessageToClientStruct { + message_id: message.messageID.clone(), + from_device_id: message.fromDeviceID.clone(), + payload: message.payload.clone(), + blob_hashes: vec![message.blobHashes.clone()], + }); + } + let result_from_writer = tx_writer( + &session_id, + &tx, + Ok(tunnelbroker::MessageToClient { + data: Some( + tunnelbroker::message_to_client::Data::MessagesToDeliver( + tunnelbroker::MessagesToDeliver { + messages: messages_to_response, + }, + ), + ), + }), + ); + if let Err(err) = result_from_writer.await { + debug!( "Error while sending undelivered messages from database to the client: {}", err ); - return Err(Status::aborted(err)); - }; + return Err(Status::aborted(err)); + }; + } } // Spawning asynchronous Tokio task to deliver new messages