diff --git a/services/tunnelbroker/src/grpc/mod.rs b/services/tunnelbroker/src/grpc/mod.rs --- a/services/tunnelbroker/src/grpc/mod.rs +++ b/services/tunnelbroker/src/grpc/mod.rs @@ -18,13 +18,6 @@ amqp_channel: lapin::Channel, } -// By setting mandatory to true, we don't wait for a confirmation for an eventual -// delivery, instead we get an immediate undelivered error -const PUBLISH_OPTIONS: BasicPublishOptions = BasicPublishOptions { - immediate: false, - mandatory: true, -}; - pub fn handle_amqp_error(error: lapin::Error) -> tonic::Status { match error { lapin::Error::SerialisationError(_) | lapin::Error::ParsingError(_) => { @@ -43,26 +36,24 @@ let message = request.into_inner(); debug!("Received message for {}", &message.device_id); - if let Ok(confirmation) = self + + self + .client + .persist_message(&message.device_id, &message.payload) + .await + .map_err(handle_ddb_error)?; + + self .amqp_channel .basic_publish( "", &message.device_id, - PUBLISH_OPTIONS, + BasicPublishOptions::default(), &message.payload.as_bytes(), BasicProperties::default(), ) .await - { - debug!("Forwarded message: {:?}", &message); - confirmation.await.map_err(handle_amqp_error)?; - } else { - self - .client - .persist_message(&message.device_id, &message.payload) - .await - .map_err(handle_ddb_error)?; - } + .map_err(handle_amqp_error)?; let response = tonic::Response::new(Empty {}); Ok(response)