diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -5,6 +5,7 @@ pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; pub const WS_FRAME_SIZE: usize = 1_048_576; // 1MiB pub const LOG_DEFAULT_PAGE_SIZE: i32 = 20; +pub const LOG_BACKUP_ID_SEPARATOR: &str = "#"; // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; diff --git a/services/backup/src/database/log_item.rs b/services/backup/src/database/log_item.rs --- a/services/backup/src/database/log_item.rs +++ b/services/backup/src/database/log_item.rs @@ -1,4 +1,4 @@ -use crate::constants::log_table::attr; +use crate::constants::{log_table::attr, LOG_BACKUP_ID_SEPARATOR}; use aws_sdk_dynamodb::types::AttributeValue; use comm_lib::{ blob::{ @@ -8,7 +8,8 @@ constants::DDB_ITEM_SIZE_LIMIT, database::{ blob::BlobOrDBContent, calculate_size_in_db, parse_int_attribute, - AttributeExtractor, AttributeTryInto, DBItemError, + AttributeExtractor, AttributeTryInto, DBItemAttributeError, DBItemError, + Value, }, }; use std::collections::HashMap; @@ -16,6 +17,7 @@ #[derive(Clone, Debug)] pub struct LogItem { + pub user_id: String, pub backup_id: String, pub log_id: usize, pub content: BlobOrDBContent, @@ -40,14 +42,27 @@ self.content.move_to_blob(blob_client).await } + pub fn partition_key( + user_id: impl Into, + backup_id: impl Into, + ) -> String { + format!( + "{}{}{}", + user_id.into(), + LOG_BACKUP_ID_SEPARATOR, + backup_id.into(), + ) + } + pub fn item_key( + user_id: impl Into, backup_id: impl Into, log_id: usize, ) -> HashMap { HashMap::from([ ( attr::BACKUP_ID.to_string(), - AttributeValue::S(backup_id.into()), + AttributeValue::S(Self::partition_key(user_id, backup_id)), ), ( attr::LOG_ID.to_string(), @@ -59,7 +74,8 @@ impl From for HashMap { fn from(value: LogItem) -> Self { - let mut attrs = LogItem::item_key(value.backup_id, value.log_id); + let mut attrs = + LogItem::item_key(value.user_id, value.backup_id, value.log_id); let (content_attr_name, content_attr) = value .content @@ -89,7 +105,18 @@ fn try_from( mut value: HashMap, ) -> Result { - let backup_id = value.take_attr(attr::BACKUP_ID)?; + let id: String = value.take_attr(attr::BACKUP_ID)?; + let (user_id, backup_id) = + match &id.split(LOG_BACKUP_ID_SEPARATOR).collect::>()[..] { + &[user_id, backup_id] => (user_id.to_string(), backup_id.to_string()), + _ => { + return Err(DBItemError::new( + attr::BACKUP_ID.to_string(), + Value::String(id), + DBItemAttributeError::InvalidValue, + )) + } + }; let log_id = parse_int_attribute(attr::LOG_ID, value.remove(attr::LOG_ID))?; let content = BlobOrDBContent::parse_from_attrs( &mut value, @@ -105,6 +132,7 @@ }; Ok(LogItem { + user_id, backup_id, log_id, content, diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -140,7 +140,7 @@ .transpose() .map_err(Error::from)?; - self.remove_log_items_for_backup(backup_id).await?; + self.remove_log_items_for_backup(user_id, backup_id).await?; Ok(result) } @@ -236,9 +236,11 @@ pub async fn fetch_log_items( &self, + user_id: &str, backup_id: &str, from_id: Option, ) -> Result<(Vec, Option), Error> { + let id = LogItem::partition_key(user_id, backup_id); let mut query = self .client .query() @@ -247,16 +249,13 @@ .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) .expression_attribute_values( ":valueToMatch", - AttributeValue::S(backup_id.to_string()), + AttributeValue::S(id.clone()), ) .limit(LOG_DEFAULT_PAGE_SIZE); if let Some(from_id) = from_id { query = query - .exclusive_start_key( - log_table::attr::BACKUP_ID, - AttributeValue::S(backup_id.to_string()), - ) + .exclusive_start_key(log_table::attr::BACKUP_ID, AttributeValue::S(id)) .exclusive_start_key( log_table::attr::LOG_ID, AttributeValue::N(from_id.to_string()), @@ -290,13 +289,14 @@ pub async fn remove_log_items_for_backup( &self, + user_id: &str, backup_id: &str, ) -> Result<(), Error> { let (mut items, mut last_id) = - self.fetch_log_items(backup_id, None).await?; + self.fetch_log_items(user_id, backup_id, None).await?; while last_id.is_some() { let (mut new_items, new_last_id) = - self.fetch_log_items(backup_id, last_id).await?; + self.fetch_log_items(user_id, backup_id, last_id).await?; items.append(&mut new_items); last_id = new_last_id; @@ -306,7 +306,7 @@ .into_iter() .map(|key| { DeleteRequest::builder() - .set_key(Some(LogItem::item_key(key.backup_id, key.log_id))) + .set_key(Some(LogItem::item_key(user_id, key.backup_id, key.log_id))) .build() }) .map(|request| WriteRequest::builder().delete_request(request).build()) diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs --- a/services/backup/src/http/handlers/log.rs +++ b/services/backup/src/http/handlers/log.rs @@ -7,6 +7,7 @@ Error, HttpRequest, HttpResponse, }; use actix_web_actors::ws::{self, WebsocketContext}; +use comm_lib::auth::UserIdentity; use comm_lib::{ backup::{ DownloadLogsRequest, LogWSRequest, LogWSResponse, UploadLogRequest, @@ -22,12 +23,14 @@ pub async fn handle_ws( req: HttpRequest, + user: UserIdentity, stream: web::Payload, blob_client: web::Data, db_client: web::Data, ) -> Result { ws::WsResponseBuilder::new( LogWSActor { + user, blob_client: blob_client.as_ref().clone(), db_client: db_client.as_ref().clone(), last_msg_time: Instant::now(), @@ -50,6 +53,7 @@ } struct LogWSActor { + user: UserIdentity, blob_client: BlobServiceClient, db_client: DatabaseClient, @@ -66,8 +70,12 @@ ctx: &mut WebsocketContext, bytes: Bytes, ) { - let fut = - Self::handle_msg(self.blob_client.clone(), self.db_client.clone(), bytes); + let fut = Self::handle_msg( + self.user.user_id.clone(), + self.blob_client.clone(), + self.db_client.clone(), + bytes, + ); let fut = actix::fut::wrap_future(fut).map( |responses, @@ -98,6 +106,7 @@ } async fn handle_msg( + user_id: String, blob_client: BlobServiceClient, db_client: DatabaseClient, bytes: Bytes, @@ -121,6 +130,7 @@ } let mut log_item = LogItem { + user_id, backup_id: backup_id.clone(), log_id, content: BlobOrDBContent::new(content), @@ -136,8 +146,9 @@ backup_id, from_id, }) => { - let (log_items, last_id) = - db_client.fetch_log_items(&backup_id, from_id).await?; + let (log_items, last_id) = db_client + .fetch_log_items(&user_id, &backup_id, from_id) + .await?; let mut messages = vec![];