diff --git a/services/backup/src/database/backup_item.rs b/services/backup/src/database/backup_item.rs index 040567f5c..787a60ea5 100644 --- a/services/backup/src/database/backup_item.rs +++ b/services/backup/src/database/backup_item.rs @@ -1,223 +1,227 @@ use crate::constants::backup_table; use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_lib::{ blob::{client::BlobServiceClient, types::BlobInfo}, database::{ AttributeExtractor, AttributeTryInto, DBItemError, TryFromAttribute, }, }; use std::collections::HashMap; #[derive(Clone, Debug)] pub struct BackupItem { pub user_id: String, pub backup_id: String, pub created: DateTime, pub user_keys: BlobInfo, - pub user_data: BlobInfo, + pub user_data: Option, pub attachments: Vec, pub siwe_backup_msg: Option, } impl BackupItem { pub fn new( user_id: String, backup_id: String, user_keys: BlobInfo, - user_data: BlobInfo, + user_data: Option, attachments: Vec, siwe_backup_msg: Option, ) -> Self { BackupItem { user_id, backup_id, created: chrono::Utc::now(), user_keys, user_data, attachments, siwe_backup_msg, } } - pub fn revoke_holders(&self, blob_client: &BlobServiceClient) { + pub fn revoke_user_keys_holders(&self, blob_client: &BlobServiceClient) { blob_client.schedule_revoke_holder( &self.user_keys.blob_hash, &self.user_keys.holder, ); + } - blob_client.schedule_revoke_holder( - &self.user_data.blob_hash, - &self.user_data.holder, - ); + pub fn revoke_user_data_holders(&self, blob_client: &BlobServiceClient) { + if let Some(user_data) = &self.user_data { + blob_client + .schedule_revoke_holder(&user_data.blob_hash, &user_data.holder); + } for attachment_info in &self.attachments { blob_client.schedule_revoke_holder( &attachment_info.blob_hash, &attachment_info.holder, ); } } pub fn item_key( user_id: &str, backup_id: &str, ) -> HashMap { HashMap::from([ ( backup_table::attr::USER_ID.to_string(), AttributeValue::S(user_id.to_string()), ), ( backup_table::attr::BACKUP_ID.to_string(), AttributeValue::S(backup_id.to_string()), ), ]) } } impl From for HashMap { fn from(value: BackupItem) -> Self { let mut attrs = HashMap::from([ ( backup_table::attr::USER_ID.to_string(), AttributeValue::S(value.user_id), ), ( backup_table::attr::BACKUP_ID.to_string(), AttributeValue::S(value.backup_id), ), ( backup_table::attr::CREATED.to_string(), AttributeValue::S(value.created.to_rfc3339()), ), ( backup_table::attr::USER_KEYS.to_string(), value.user_keys.into(), ), - ( - backup_table::attr::USER_DATA.to_string(), - value.user_data.into(), - ), ]); + if let Some(user_data) = value.user_data { + attrs.insert(backup_table::attr::USER_DATA.to_string(), user_data.into()); + } + if !value.attachments.is_empty() { attrs.insert( backup_table::attr::ATTACHMENTS.to_string(), AttributeValue::L( value .attachments .into_iter() .map(AttributeValue::from) .collect(), ), ); } if let Some(siwe_backup_msg_value) = value.siwe_backup_msg { attrs.insert( backup_table::attr::SIWE_BACKUP_MSG.to_string(), AttributeValue::S(siwe_backup_msg_value), ); } attrs } } impl TryFrom> for BackupItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let user_id = String::try_from_attr( backup_table::attr::USER_ID, value.remove(backup_table::attr::USER_ID), )?; let backup_id = String::try_from_attr( backup_table::attr::BACKUP_ID, value.remove(backup_table::attr::BACKUP_ID), )?; let created = DateTime::::try_from_attr( backup_table::attr::CREATED, value.remove(backup_table::attr::CREATED), )?; let user_keys = BlobInfo::try_from_attr( backup_table::attr::USER_KEYS, value.remove(backup_table::attr::USER_KEYS), )?; - let user_data = BlobInfo::try_from_attr( - backup_table::attr::USER_DATA, - value.remove(backup_table::attr::USER_DATA), - )?; + let user_data = value + .remove(backup_table::attr::USER_DATA) + .map(|attr| { + BlobInfo::try_from_attr(backup_table::attr::USER_DATA, Some(attr)) + }) + .transpose()?; let attachments = value.remove(backup_table::attr::ATTACHMENTS); let attachments = if attachments.is_some() { attachments.attr_try_into(backup_table::attr::ATTACHMENTS)? } else { Vec::new() }; let siwe_backup_msg: Option = value.take_attr(backup_table::attr::SIWE_BACKUP_MSG)?; Ok(BackupItem { user_id, backup_id, created, user_keys, user_data, attachments, siwe_backup_msg, }) } } /// Corresponds to the items in the [`crate::constants::BACKUP_TABLE_INDEX_USERID_CREATED`] /// global index #[derive(Clone, Debug)] pub struct OrderedBackupItem { pub user_id: String, pub created: DateTime, pub backup_id: String, pub user_keys: BlobInfo, pub siwe_backup_msg: Option, } impl TryFrom> for OrderedBackupItem { type Error = DBItemError; fn try_from( mut value: HashMap, ) -> Result { let user_id = String::try_from_attr( backup_table::attr::USER_ID, value.remove(backup_table::attr::USER_ID), )?; let created = DateTime::::try_from_attr( backup_table::attr::CREATED, value.remove(backup_table::attr::CREATED), )?; let backup_id = String::try_from_attr( backup_table::attr::BACKUP_ID, value.remove(backup_table::attr::BACKUP_ID), )?; let user_keys = BlobInfo::try_from_attr( backup_table::attr::USER_KEYS, value.remove(backup_table::attr::USER_KEYS), )?; let siwe_backup_msg: Option = value.take_attr(backup_table::attr::SIWE_BACKUP_MSG)?; Ok(OrderedBackupItem { user_id, created, backup_id, user_keys, siwe_backup_msg, }) } } diff --git a/services/backup/src/database/mod.rs b/services/backup/src/database/mod.rs index 40a963eb7..9dab455ec 100644 --- a/services/backup/src/database/mod.rs +++ b/services/backup/src/database/mod.rs @@ -1,368 +1,369 @@ pub mod backup_item; pub mod log_item; use self::{ backup_item::{BackupItem, OrderedBackupItem}, log_item::LogItem, }; use crate::constants::{backup_table, log_table, LOG_DEFAULT_PAGE_SIZE}; use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::{AttributeValue, DeleteRequest, ReturnValue, WriteRequest}, }; use comm_lib::{ blob::client::BlobServiceClient, database::{ self, batch_operations::ExponentialBackoffConfig, parse_int_attribute, Error, }, }; use tracing::{error, trace, warn}; #[derive(Clone)] pub struct DatabaseClient { client: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { DatabaseClient { client: aws_sdk_dynamodb::Client::new(aws_config), } } } /// Backup functions impl DatabaseClient { pub async fn put_backup_item( &self, backup_item: BackupItem, ) -> Result<(), Error> { let item = backup_item.into(); self .client .put_item() .table_name(backup_table::TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put backup item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_backup_item( &self, user_id: &str, backup_id: &str, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let output = self .client .get_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find backup item"); Error::AwsSdk(e.into()) })?; let GetItemOutput { item: Some(item), .. } = output else { return Ok(None); }; let backup_item = item.try_into()?; Ok(Some(backup_item)) } pub async fn find_last_backup_item( &self, user_id: &str, ) -> Result, Error> { let mut found_backups = self.query_ordered_backups_index(user_id, Some(1)).await?; let latest_backup = found_backups.pop(); Ok(latest_backup) } pub async fn remove_backup_item( &self, user_id: &str, backup_id: &str, blob_client: &BlobServiceClient, ) -> Result, Error> { let item_key = BackupItem::item_key(user_id, backup_id); let response = self .client .delete_item() .table_name(backup_table::TABLE_NAME) .set_key(Some(item_key)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove backup item"); Error::AwsSdk(e.into()) })?; let result = response .attributes .map(BackupItem::try_from) .transpose() .map_err(Error::from)?; if let Some(backup_item) = &result { - backup_item.revoke_holders(blob_client); + backup_item.revoke_user_keys_holders(blob_client); + backup_item.revoke_user_data_holders(blob_client); } self .remove_log_items_for_backup(user_id, backup_id, blob_client) .await?; Ok(result) } /// For the purposes of the initial backup version this function /// removes all backups except for the latest one pub async fn remove_old_backups( &self, user_id: &str, blob_client: &BlobServiceClient, ) -> Result, Error> { let items = self.query_ordered_backups_index(user_id, None).await?; let mut removed_backups = vec![]; let Some(latest) = items.iter().map(|item| item.created).max() else { return Ok(removed_backups); }; for item in items { if item.created == latest { trace!( "Skipping removal of the latest backup item: {}", item.backup_id ); continue; } trace!("Removing backup item: {item:?}"); if let Some(backup) = self .remove_backup_item(user_id, &item.backup_id, blob_client) .await? { removed_backups.push(backup); } else { warn!("Backup was found during query, but wasn't found when deleting") }; } Ok(removed_backups) } } /// Backup log functions impl DatabaseClient { pub async fn put_log_item( &self, log_item: LogItem, blob_client: &BlobServiceClient, ) -> Result<(), Error> { let item = log_item.into(); let result = self .client .put_item() .table_name(log_table::TABLE_NAME) .set_item(Some(item)) .return_values(ReturnValue::AllOld) .send() .await .map_err(|e| { error!("DynamoDB client failed to put log item"); Error::AwsSdk(e.into()) })?; let Some(replaced_log_attrs) = result.attributes else { return Ok(()); }; let Ok(replaced_log) = LogItem::try_from(replaced_log_attrs) else { warn!("Couldn't parse replaced log item"); return Ok(()); }; replaced_log.revoke_holders(blob_client); Ok(()) } pub async fn fetch_log_items( &self, user_id: &str, backup_id: &str, from_id: Option, ) -> Result<(Vec, Option), Error> { let id = LogItem::partition_key(user_id, backup_id); let mut query = self .client .query() .table_name(log_table::TABLE_NAME) .key_condition_expression("#backupID = :valueToMatch") .expression_attribute_names("#backupID", log_table::attr::BACKUP_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(id.clone()), ) .limit(LOG_DEFAULT_PAGE_SIZE); if let Some(from_id) = from_id { query = query .exclusive_start_key(log_table::attr::BACKUP_ID, AttributeValue::S(id)) .exclusive_start_key( log_table::attr::LOG_ID, AttributeValue::N(from_id.to_string()), ); } let response = query.send().await.map_err(|e| { error!("DynamoDB client failed to fetch logs"); Error::AwsSdk(e.into()) })?; let last_id = response .last_evaluated_key() .map(|key| { parse_int_attribute( log_table::attr::LOG_ID, key.get(log_table::attr::LOG_ID).cloned(), ) }) .transpose()?; let items = response .items .unwrap_or_default() .into_iter() .map(LogItem::try_from) .collect::, _>>()?; Ok((items, last_id)) } pub async fn remove_log_items_for_backup( &self, user_id: &str, backup_id: &str, blob_client: &BlobServiceClient, ) -> Result<(), Error> { let (mut items, mut last_id) = self.fetch_log_items(user_id, backup_id, None).await?; while last_id.is_some() { let (mut new_items, new_last_id) = self.fetch_log_items(user_id, backup_id, last_id).await?; items.append(&mut new_items); last_id = new_last_id; } for log_item in &items { log_item.revoke_holders(blob_client); } let write_requests = items .into_iter() .map(|key| { DeleteRequest::builder() .set_key(Some(LogItem::item_key(user_id, key.backup_id, key.log_id))) .build() .expect("key not set in DeleteRequest builder") }) .map(|request| WriteRequest::builder().delete_request(request).build()) .collect::>(); database::batch_operations::batch_write( &self.client, log_table::TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } } // general functions impl DatabaseClient { pub async fn delete_user_data( &self, user_id: &str, blob_client: &BlobServiceClient, ) -> Result<(), Error> { // query the index to avoid unnecessarily querying backup data let items = self.query_ordered_backups_index(user_id, None).await?; for item in items { trace!("Removing backup item: {item:?}"); self .remove_backup_item(user_id, &item.backup_id, blob_client) .await?; } Ok(()) } async fn query_ordered_backups_index( &self, user_id: &str, limit: Option, ) -> Result, Error> { let response = self .client .query() .table_name(backup_table::TABLE_NAME) .index_name(backup_table::CREATED_INDEX) .key_condition_expression("#userID = :valueToMatch") .expression_attribute_names("#userID", backup_table::attr::USER_ID) .expression_attribute_values( ":valueToMatch", AttributeValue::S(user_id.to_string()), ) .scan_index_forward(false) .set_limit(limit) .send() .await .map_err(|e| { error!("DynamoDB client failed to fetch backups"); Error::AwsSdk(e.into()) })?; if response.last_evaluated_key().is_some() { // In the intial version of the backup service this function will be run // for every new backup (each user only has one backup), so this shouldn't // happen warn!("Not all backups have been retrieved from the index"); } let items = response .items .unwrap_or_default() .into_iter() .map(OrderedBackupItem::try_from) .collect::, _>>()?; Ok(items) } } diff --git a/services/backup/src/error.rs b/services/backup/src/error.rs index 7839daac5..8783cb3a1 100644 --- a/services/backup/src/error.rs +++ b/services/backup/src/error.rs @@ -1,98 +1,100 @@ use actix_web::{ error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, HttpError, }, HttpResponse, ResponseError, }; pub use aws_sdk_dynamodb::Error as DynamoDBError; use comm_lib::database::Error as DBError; use comm_lib::{auth::AuthServiceError, blob::client::BlobServiceError}; use grpc_clients::error::Error as IdentityClientError; use reqwest::StatusCode; use tracing::{error, trace, warn}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupError { NoBackup, NoUserID, BlobError(BlobServiceError), AuthError(AuthServiceError), DB(comm_lib::database::Error), IdentityClientError(IdentityClientError), BadRequest, + NoUserData, } impl From<&BackupError> for actix_web::Error { fn from(value: &BackupError) -> Self { trace!("Handling backup service error: {value}"); match value { BackupError::NoBackup => ErrorNotFound("not found"), BackupError::BlobError( err @ (BlobServiceError::ClientError(_) | BlobServiceError::UnexpectedHttpStatus(_) | BlobServiceError::ServerError | BlobServiceError::UnexpectedError), ) => { warn!("Transient blob error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::BlobError(BlobServiceError::AlreadyExists) => { ErrorConflict("blob already exists") } BackupError::BlobError(BlobServiceError::InvalidArguments) => { ErrorBadRequest("bad request") } BackupError::BlobError( err @ (BlobServiceError::URLError(_) | BlobServiceError::NotFound), ) => { error!("Unexpected blob error: {err}"); ErrorInternalServerError("server error") } BackupError::AuthError(err) => { error!("Unexpected auth error: {err}"); ErrorInternalServerError("server error") } BackupError::DB(err) => match err { DBError::AwsSdk( err @ (DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_)), ) => { warn!("AWS transient error occurred: {err}"); ErrorServiceUnavailable("please retry") } unexpected => { error!("Received an unexpected DB error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BackupError::IdentityClientError(err) => { warn!("Transient identity error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::NoUserID => ErrorBadRequest("bad request"), BackupError::BadRequest => ErrorBadRequest("bad request"), + BackupError::NoUserData => ErrorNotFound("not found"), } } } impl From for HttpError { fn from(value: BackupError) -> Self { value.into() } } impl ResponseError for BackupError { fn error_response(&self) -> HttpResponse { actix_web::Error::from(self).error_response() } fn status_code(&self) -> StatusCode { actix_web::Error::from(self) .as_response_error() .status_code() } } diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index d6965a7b0..9ac3de7a8 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,346 +1,348 @@ use actix_web::{ error::ErrorBadRequest, web::{self, Bytes}, HttpResponse, Responder, }; use comm_lib::{ auth::UserIdentity, backup::LatestBackupInfoResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::{ auth_service::Authenticated, multipart::{get_named_text_field, get_text_field}, }, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::identity::find_user_id; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: Authenticated, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let (attachments, attachments_revokes) = process_attachments(&mut multipart, &blob_client).await?; let siwe_backup_msg = get_siwe_backup_msg(&mut multipart).await?; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, - user_data_blob_info, + Some(user_data_blob_info), attachments, siwe_backup_msg, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, blob_client: &'blob BlobServiceClient, ) -> Result<(BlobInfo, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); let blob_info = BlobInfo { blob_hash: attachment.to_string(), holder, }; Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn process_attachments<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, ) -> Result<(Vec, Vec>), BackupError> { let attachments_hashes: Vec = match get_text_field(multipart).await { Ok(Some((name, attachments))) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(BackupError::BadRequest); } attachments.lines().map(ToString::to_string).collect() } Ok(None) => Vec::new(), Err(_) => return Err(BackupError::BadRequest), }; let mut attachments = Vec::new(); let mut attachments_revokes = Vec::new(); for attachment_hash in attachments_hashes { let (attachment, revoke) = create_attachment_holder(&attachment_hash, blob_client).await?; attachments.push(attachment); attachments_revokes.push(revoke); } Ok((attachments, attachments_revokes)) } #[instrument(skip_all)] pub async fn get_siwe_backup_msg( multipart: &mut actix_multipart::Multipart, ) -> actix_web::Result> { Ok( get_text_field(multipart) .await? .filter(|(name, _)| name == "siwe_backup_msg") .map(|(_, siwe_backup_msg)| siwe_backup_msg), ) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( - |item| &item.user_keys, + |item| Ok(&item.user_keys), &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); let backup_id = path.into_inner(); download_user_blob( - |item| &item.user_data, + |item| item.user_data.as_ref().ok_or(BackupError::NoUserData), &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } pub async fn download_user_blob( - data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, + data_extractor: impl FnOnce(&BackupItem) -> Result<&BlobInfo, BackupError>, user_id: &str, backup_id: &str, blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; + let blob_info = data_extractor(&backup_item)?; + let stream = blob_client - .get(&data_extractor(&backup_item).blob_hash) + .get(&blob_info.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_info( path: web::Path, db_client: web::Data, ) -> actix_web::Result { let user_identifier = path.into_inner(); let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupInfoResponse { backup_id: backup_item.backup_id, user_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, blob_client: Authenticated, ) -> actix_web::Result { let user_identifier = path.into_inner(); let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) }