Page MenuHomePhorge

D7801.1768575366.diff
No OneTemporary

Size
3 KB
Referenced Files
None
Subscribers
None

D7801.1768575366.diff

diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/database/message.rs
@@ -0,0 +1,50 @@
+use std::collections::HashMap;
+
+use aws_sdk_dynamodb::types::AttributeValue;
+
+use crate::constants::dynamodb::undelivered_messages::{
+ CREATED_AT, DEVICE_ID, PAYLOAD,
+};
+
+#[derive(Debug)]
+pub struct DeviceMessage {
+ pub device_id: String,
+ pub created_at: String,
+ pub payload: String,
+}
+
+#[derive(Debug, derive_more::Display)]
+pub enum MessageErrors {
+ SerializationError,
+}
+
+impl DeviceMessage {
+ pub fn from_hashmap(
+ hashmap: HashMap<String, AttributeValue>,
+ ) -> Result<DeviceMessage, MessageErrors> {
+ let device_id: String = hashmap
+ .get(DEVICE_ID)
+ .ok_or(MessageErrors::SerializationError)?
+ .as_s()
+ .map_err(|_| MessageErrors::SerializationError)?
+ .to_string();
+ let created_at: String = hashmap
+ .get(CREATED_AT)
+ .ok_or(MessageErrors::SerializationError)?
+ .as_n()
+ .map_err(|_| MessageErrors::SerializationError)?
+ .to_string();
+ let payload: String = hashmap
+ .get(PAYLOAD)
+ .ok_or(MessageErrors::SerializationError)?
+ .as_s()
+ .map_err(|_| MessageErrors::SerializationError)?
+ .to_string();
+
+ Ok(DeviceMessage {
+ device_id,
+ created_at,
+ payload,
+ })
+ }
+}
diff --git a/services/tunnelbroker/src/database.rs b/services/tunnelbroker/src/database/mod.rs
rename from services/tunnelbroker/src/database.rs
rename to services/tunnelbroker/src/database/mod.rs
--- a/services/tunnelbroker/src/database.rs
+++ b/services/tunnelbroker/src/database/mod.rs
@@ -15,6 +15,9 @@
CREATED_AT, PARTITION_KEY, PAYLOAD, SORT_KEY, TABLE_NAME,
};
+pub mod message;
+pub use message::*;
+
#[derive(Clone)]
pub struct DatabaseClient {
client: Arc<Client>,
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -7,8 +7,8 @@
use tunnelbroker_messages::{session::DeviceTypes, Messages};
use crate::{
- constants::dynamodb::undelivered_messages::CREATED_AT,
- database::DatabaseClient, ACTIVE_CONNECTIONS,
+ database::{self, DatabaseClient, DeviceMessage},
+ ACTIVE_CONNECTIONS,
};
pub struct DeviceInfo {
@@ -29,6 +29,7 @@
pub enum SessionError {
InvalidMessage,
SerializationError(serde_json::Error),
+ MessageError(database::MessageErrors),
}
fn consume_error<T>(result: Result<T, SessionError>) {
@@ -104,16 +105,15 @@
ACTIVE_CONNECTIONS.insert(device_info.device_id.clone(), tx.clone());
for message in messages {
- let payload =
- message.get("payload").unwrap().as_s().unwrap().to_string();
- let created_at =
- message.get(CREATED_AT).unwrap().as_n().unwrap().to_string();
- self.send_message_to_device(payload).await;
- self
+ let device_message = DeviceMessage::from_hashmap(message)?;
+ self.send_message_to_device(device_message.payload).await;
+ if let Err(e) = self
.db_client
- .delete_message(&device_info.device_id, &created_at)
+ .delete_message(&device_info.device_id, &device_message.created_at)
.await
- .expect("Failed to delete messages");
+ {
+ error!("Failed to delete message: {}:", e);
+ }
}
debug!("Flushed messages for device: {}", &session_info.device_id);

File Metadata

Mime Type
text/plain
Expires
Fri, Jan 16, 2:56 PM (16 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5944840
Default Alt Text
D7801.1768575366.diff (3 KB)

Event Timeline