diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs --- a/services/backup/src/constants.rs +++ b/services/backup/src/constants.rs @@ -4,6 +4,7 @@ pub const ID_SEPARATOR: &str = ":"; pub const ATTACHMENT_HOLDER_SEPARATOR: &str = ";"; pub const WS_FRAME_SIZE: usize = 1_048_576; // 1MiB +pub const LOG_DEFAULT_PAGE_SIZE: i32 = 20; // Configuration defaults pub const DEFAULT_HTTP_PORT: u16 = 50052; diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -5,12 +5,12 @@ backup_item::{BackupItem, OrderedBackupItem}, log_item::LogItem, }; -use crate::constants::{backup_table, log_table}; +use crate::constants::{backup_table, log_table, LOG_DEFAULT_PAGE_SIZE}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::{AttributeValue, ReturnValue}, }; -use comm_lib::database::Error; +use comm_lib::database::{parse_int_attribute, Error}; use tracing::{error, trace, warn}; #[derive(Clone)] @@ -226,4 +226,58 @@ Ok(()) } + + pub async fn fetch_log_items( + &self, + backup_id: &str, + from_id: Option, + ) -> Result<(Vec, Option), Error> { + let mut query = self + .client + .query() + .table_name(log_table::TABLE_NAME) + .key_condition_expression("#backupID = :valueToMatch") + .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) + .expression_attribute_values( + ":valueToMatch", + AttributeValue::S(backup_id.to_string()), + ) + .limit(LOG_DEFAULT_PAGE_SIZE); + + if let Some(from_id) = from_id { + query = query + .exclusive_start_key( + log_table::attr::BACKUP_ID, + AttributeValue::S(backup_id.to_string()), + ) + .exclusive_start_key( + log_table::attr::LOG_ID, + AttributeValue::N(from_id.to_string()), + ); + } + + let response = query.send().await.map_err(|e| { + error!("DynamoDB client failed to fetch logs"); + Error::AwsSdk(e.into()) + })?; + + let last_id = response + .last_evaluated_key() + .map(|key| { + parse_int_attribute( + log_table::attr::LOG_ID, + key.get(log_table::attr::LOG_ID).cloned(), + ) + }) + .transpose()?; + + let items = response + .items + .unwrap_or_default() + .into_iter() + .map(LogItem::try_from) + .collect::, _>>()?; + + Ok((items, last_id)) + } } diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs --- a/services/backup/src/http/handlers/log.rs +++ b/services/backup/src/http/handlers/log.rs @@ -8,7 +8,9 @@ }; use actix_web_actors::ws::{self, WebsocketContext}; use comm_lib::{ - backup::{LogWSRequest, LogWSResponse, UploadLogRequest}, + backup::{ + DownloadLogsRequest, LogWSRequest, LogWSResponse, UploadLogRequest, + }, blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, @@ -145,6 +147,40 @@ Ok(vec![LogWSResponse::LogUploaded { log_id }]) } + LogWSRequest::DownloadLogs(DownloadLogsRequest { from_id }) => { + let (log_items, last_id) = + db_client.fetch_log_items(&info.backup_id, from_id).await?; + + let mut messages = vec![]; + + for LogItem { + log_id, + content, + attachments, + .. + } in log_items + { + let content = content.fetch_bytes(&blob_client).await?; + let attachments: Vec = + attachments.into_iter().map(|att| att.blob_hash).collect(); + let attachments = if attachments.is_empty() { + None + } else { + Some(attachments) + }; + messages.push(LogWSResponse::LogDownload { + log_id, + content, + attachments, + }) + } + + messages.push(LogWSResponse::LogDownloadFinished { + last_log_id: last_id, + }); + + Ok(messages) + } } } diff --git a/shared/comm-lib/src/backup/mod.rs b/shared/comm-lib/src/backup/mod.rs --- a/shared/comm-lib/src/backup/mod.rs +++ b/shared/comm-lib/src/backup/mod.rs @@ -13,13 +13,29 @@ pub attachments: Option>, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DownloadLogsRequest { + pub from_id: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogWSRequest { UploadLog(UploadLogRequest), + DownloadLogs(DownloadLogsRequest), } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogWSResponse { - LogUploaded { log_id: usize }, + LogUploaded { + log_id: usize, + }, + LogDownload { + log_id: usize, + content: Vec, + attachments: Option>, + }, + LogDownloadFinished { + last_log_id: Option, + }, ServerError, }