diff --git a/services/backup/src/database/backup_item.rs b/services/backup/src/database/backup_item.rs index f2ec37270..746691558 100644 --- a/services/backup/src/database/backup_item.rs +++ b/services/backup/src/database/backup_item.rs @@ -1,127 +1,170 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_services_lib::{ blob::types::BlobInfo, database::{DBItemError, TryFromAttribute}, }; use std::collections::{HashMap, HashSet}; use crate::constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, BACKUP_TABLE_FIELD_CREATED, BACKUP_TABLE_FIELD_USER_DATA, BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_FIELD_USER_KEYS, }; #[derive(Clone, Debug)] pub struct BackupItem { pub user_id: String, pub backup_id: String, pub created: DateTime, pub user_keys: BlobInfo, pub user_data: BlobInfo, pub attachment_holders: HashSet, } impl BackupItem { pub fn new( user_id: String, backup_id: String, user_keys: BlobInfo, user_data: BlobInfo, attachment_holders: HashSet, ) -> Self { BackupItem { user_id, backup_id, created: chrono::Utc::now(), user_keys, user_data, attachment_holders, } } } impl From for HashMap { fn from(value: BackupItem) -> Self { let mut attrs = HashMap::from([ ( BACKUP_TABLE_FIELD_USER_ID.to_string(), AttributeValue::S(value.user_id), ), ( BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(value.backup_id), ), ( BACKUP_TABLE_FIELD_CREATED.to_string(), AttributeValue::S(value.created.to_rfc3339()), ), ( BACKUP_TABLE_FIELD_USER_KEYS.to_string(), value.user_keys.into(), ), ( BACKUP_TABLE_FIELD_USER_DATA.to_string(), value.user_data.into(), ), ]); if !value.attachment_holders.is_empty() { attrs.insert( BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::Ss(value.attachment_holders.into_iter().collect()), ); } attrs } } impl TryFrom> for BackupItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let user_id = String::try_from_attr( BACKUP_TABLE_FIELD_USER_ID, value.remove(BACKUP_TABLE_FIELD_USER_ID), )?; let backup_id = String::try_from_attr( BACKUP_TABLE_FIELD_BACKUP_ID, value.remove(BACKUP_TABLE_FIELD_BACKUP_ID), )?; let created = DateTime::::try_from_attr( BACKUP_TABLE_FIELD_CREATED, value.remove(BACKUP_TABLE_FIELD_CREATED), )?; let user_keys = BlobInfo::try_from_attr( BACKUP_TABLE_FIELD_USER_KEYS, value.remove(BACKUP_TABLE_FIELD_USER_KEYS), )?; let user_data = BlobInfo::try_from_attr( BACKUP_TABLE_FIELD_USER_DATA, value.remove(BACKUP_TABLE_FIELD_USER_DATA), )?; let attachments = value.remove(BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS); let attachment_holders = if attachments.is_some() { HashSet::::try_from_attr( BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, attachments, )? } else { HashSet::new() }; Ok(BackupItem { user_id, backup_id, created, user_keys, user_data, attachment_holders, }) } } + +/// Corresponds to the items in the [`crate::constants::BACKUP_TABLE_INDEX_USERID_CREATED`] +/// global index +#[derive(Clone, Debug)] +pub struct OrderedBackupItem { + pub user_id: String, + pub created: DateTime, + pub backup_id: String, + pub user_keys: BlobInfo, +} + +impl TryFrom> for OrderedBackupItem { + type Error = DBItemError; + + fn try_from( + mut value: HashMap, + ) -> Result { + let user_id = String::try_from_attr( + BACKUP_TABLE_FIELD_USER_ID, + value.remove(BACKUP_TABLE_FIELD_USER_ID), + )?; + let created = DateTime::::try_from_attr( + BACKUP_TABLE_FIELD_CREATED, + value.remove(BACKUP_TABLE_FIELD_CREATED), + )?; + let backup_id = String::try_from_attr( + BACKUP_TABLE_FIELD_BACKUP_ID, + value.remove(BACKUP_TABLE_FIELD_BACKUP_ID), + )?; + + let user_keys = BlobInfo::try_from_attr( + BACKUP_TABLE_FIELD_USER_KEYS, + value.remove(BACKUP_TABLE_FIELD_USER_KEYS), + )?; + + Ok(OrderedBackupItem { + user_id, + created, + backup_id, + user_keys, + }) + } +} diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index f32b8963b..1483d1c9b 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,282 +1,282 @@ pub mod backup_item; pub mod log_item; use std::collections::HashMap; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::AttributeValue, }; use comm_services_lib::database::Error; use tracing::error; use crate::constants::{ BACKUP_TABLE_FIELD_BACKUP_ID, BACKUP_TABLE_FIELD_USER_ID, BACKUP_TABLE_INDEX_USERID_CREATED, BACKUP_TABLE_NAME, LOG_TABLE_FIELD_ATTACHMENT_HOLDERS, LOG_TABLE_FIELD_BACKUP_ID, LOG_TABLE_FIELD_DATA_HASH, LOG_TABLE_FIELD_LOG_ID, LOG_TABLE_FIELD_PERSISTED_IN_BLOB, LOG_TABLE_FIELD_VALUE, LOG_TABLE_NAME, }; use self::{ - backup_item::BackupItem, + backup_item::{BackupItem, OrderedBackupItem}, log_item::{parse_log_item, LogItem}, }; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } // backup item pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = backup_item.into(); self .client .put_item() .table_name(BACKUP_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( BACKUP_TABLE_FIELD_USER_ID.to_string(), AttributeValue::S(user_id.to_string()), ), ( BACKUP_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ]); let output = self .client .get_item() .table_name(BACKUP_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })?; let GetItemOutput { item: Some(item), .. } = output else { return Ok(None) }; let backup_item = item.try_into()?; Ok(Some(backup_item)) } pub async fn find_last_backup_item( &self, user_id: &str, - ) -> Result, Error> { + ) -> Result, Error> { let response = self .client .query() .table_name(BACKUP_TABLE_NAME) .index_name(BACKUP_TABLE_INDEX_USERID_CREATED) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", BACKUP_TABLE_FIELD_USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .limit(1) .scan_index_forward(false) .send() .await .map_err(|e| { error!("DynamoDB client failed to find last backup"); Error::AwsSdk(e.into()) })?; match response.items.unwrap_or_default().pop() { Some(item) => { let backup_item = item.try_into()?; Ok(Some(backup_item)) } None => Ok(None), } } pub async fn remove_backup_item(&self, backup_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BACKUP_TABLE_NAME) .key( BACKUP_TABLE_FIELD_BACKUP_ID, AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } // log item pub async fn put_log_item(&self, log_item: LogItem) -> Result<(), Error> { let item = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(log_item.backup_id), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_item.log_id), ), ( LOG_TABLE_FIELD_PERSISTED_IN_BLOB.to_string(), AttributeValue::Bool(log_item.persisted_in_blob), ), ( LOG_TABLE_FIELD_VALUE.to_string(), AttributeValue::S(log_item.value), ), ( LOG_TABLE_FIELD_DATA_HASH.to_string(), AttributeValue::S(log_item.data_hash), ), ( LOG_TABLE_FIELD_ATTACHMENT_HOLDERS.to_string(), AttributeValue::S(log_item.attachment_holders), ), ]); self .client .put_item() .table_name(LOG_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_log_item( &self, backup_id: &str, log_id: &str, ) -> Result, Error> { let item_key = HashMap::from([ ( LOG_TABLE_FIELD_BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ( LOG_TABLE_FIELD_LOG_ID.to_string(), AttributeValue::S(log_id.to_string()), ), ]); match self .client .get_item() .table_name(LOG_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(item), .. } => { let log_item = parse_log_item(item)?; Ok(Some(log_item)) } _ => Ok(None), } } pub async fn find_log_items_for_backup( &self, backup_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(LOG_TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", LOG_TABLE_FIELD_BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(backup_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find log items for backup"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(Vec::new()); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for item in response.items.unwrap_or_default() { let log_item = parse_log_item(item)?; results.push(log_item); } Ok(results) } pub async fn remove_log_item(&self, log_id: &str) -> Result<(), Error> { self .client .delete_item() .table_name(LOG_TABLE_NAME) .key( LOG_TABLE_FIELD_LOG_ID, AttributeValue::S(log_id.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove log item"); Error::AwsSdk(e.into()) })?; Ok(()) } } diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index 337f196e2..cd1228f44 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,209 +1,264 @@ use std::{collections::HashSet, convert::Infallible}; use actix_web::{ error::ErrorBadRequest, web::{self, Bytes}, - HttpResponse, + HttpResponse, Responder, }; use comm_services_lib::{ auth::UserIdentity, + backup::LatestBackupIDResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::multipart::{get_named_text_field, get_text_field}, }; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(name = "upload_backup", skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: web::Data, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { info!("Upload backup request"); let backup_id = get_named_text_field("backup_id", &mut multipart).await?; tracing::Span::current().record("backup_id", &backup_id); let user_keys_blob_info = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let user_data_blob_info = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let attachments_holders: HashSet = match get_text_field(&mut multipart).await? { Some((name, attachments)) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(ErrorBadRequest("Bad request")); } attachments.lines().map(ToString::to_string).collect() } None => HashSet::new(), }; let item = BackupItem::new( user.user_id, backup_id, user_keys_blob_info, user_data_blob_info, attachments_holders, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument( skip_all, name = "forward_to_blob", fields(hash_field_name, data_field_name) )] async fn forward_field_to_blob( multipart: &mut actix_multipart::Multipart, blob_client: &web::Data, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; Ok(blob_info) } #[instrument(name = "download_user_keys", skip_all, fields(backup_id = %path.as_str()))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, blob_client, db_client, ) .await } #[instrument(name = "download_user_data", skip_all, fields(backup_id = %path.as_str()))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, blob_client, db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } + +#[instrument(name = "get_latest_backup_id", skip_all, fields(username = %path.as_str()))] +pub async fn get_latest_backup_id( + path: web::Path, + db_client: web::Data, +) -> actix_web::Result { + let username = path.into_inner(); + // Treat username as user_id in the initial version + let user_id = username; + + let Some(backup_item) = db_client + .find_last_backup_item(&user_id) + .await + .map_err(BackupError::from)? else + { + return Err(BackupError::NoBackup.into()); + }; + + let response = LatestBackupIDResponse { + backup_id: backup_item.backup_id, + }; + + Ok(web::Json(response)) +} + +#[instrument(name = "download_latest_backup_keys", skip_all, fields(username = %path.as_str()))] +pub async fn download_latest_backup_keys( + path: web::Path, + db_client: web::Data, + blob_client: web::Data, +) -> actix_web::Result { + let username = path.into_inner(); + // Treat username as user_id in the initial version + let user_id = username; + + let Some(backup_item) = db_client + .find_last_backup_item(&user_id) + .await + .map_err(BackupError::from)? else + { + return Err(BackupError::NoBackup.into()); + }; + + let stream = blob_client + .get(&backup_item.user_keys.blob_hash) + .await + .map_err(BackupError::from)?; + + Ok( + HttpResponse::Ok() + .content_type("application/octet-stream") + .streaming(stream), + ) +} diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs index 7206975d9..da903022a 100644 --- a/services/backup/src/http/mod.rs +++ b/services/backup/src/http/mod.rs @@ -1,57 +1,68 @@ use actix_web::{web, App, HttpServer}; use anyhow::Result; use comm_services_lib::{ blob::client::BlobServiceClient, http::auth::get_comm_authentication_middleware, }; use tracing::info; use crate::{database::DatabaseClient, CONFIG}; mod handlers { pub(super) mod backup; } pub async fn run_http_server( db_client: DatabaseClient, blob_client: BlobServiceClient, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); let db = web::Data::new(db_client); let blob = web::Data::new(blob_client); HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(comm_services_lib::http::cors_config( CONFIG.localstack_endpoint.is_some(), )) .app_data(db.clone()) .app_data(blob.clone()) + .service( + // Services that don't require authetication + web::scope("/backups/latest") + .service( + web::resource("{username}/backup_id") + .route(web::get().to(handlers::backup::get_latest_backup_id)), + ) + .service(web::resource("{username}/user_keys").route( + web::get().to(handlers::backup::download_latest_backup_keys), + )), + ) .service( // Services requiring authetication web::scope("/backups") .wrap(get_comm_authentication_middleware()) .service( web::resource("").route(web::post().to(handlers::backup::upload)), ) .service( web::resource("{backup_id}/user_keys") .route(web::get().to(handlers::backup::download_user_keys)), ) .service( web::resource("{backup_id}/user_data") .route(web::get().to(handlers::backup::download_user_data)), ), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/comm-services-lib/src/backup/mod.rs b/services/comm-services-lib/src/backup/mod.rs new file mode 100644 index 000000000..8f6b0f776 --- /dev/null +++ b/services/comm-services-lib/src/backup/mod.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LatestBackupIDResponse { + #[serde(rename = "backupID")] + pub backup_id: String, +} diff --git a/services/comm-services-lib/src/lib.rs b/services/comm-services-lib/src/lib.rs index ea82edd00..f5575a452 100644 --- a/services/comm-services-lib/src/lib.rs +++ b/services/comm-services-lib/src/lib.rs @@ -1,16 +1,17 @@ pub mod auth; +pub mod backup; pub mod blob; pub mod constants; pub mod database; #[cfg(feature = "http")] pub mod http; pub mod tools; mod reexports { #[cfg(feature = "blob-client")] pub use {bytes, reqwest}; #[cfg(feature = "http")] pub use {actix_web, http}; } pub use reexports::*;