diff --git a/services/backup/src/error.rs b/services/backup/src/error.rs index 7f072eee8..7839daac5 100644 --- a/services/backup/src/error.rs +++ b/services/backup/src/error.rs @@ -1,96 +1,98 @@ use actix_web::{ error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, HttpError, }, HttpResponse, ResponseError, }; pub use aws_sdk_dynamodb::Error as DynamoDBError; use comm_lib::database::Error as DBError; use comm_lib::{auth::AuthServiceError, blob::client::BlobServiceError}; use grpc_clients::error::Error as IdentityClientError; use reqwest::StatusCode; use tracing::{error, trace, warn}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupError { NoBackup, NoUserID, BlobError(BlobServiceError), AuthError(AuthServiceError), DB(comm_lib::database::Error), IdentityClientError(IdentityClientError), + BadRequest, } impl From<&BackupError> for actix_web::Error { fn from(value: &BackupError) -> Self { trace!("Handling backup service error: {value}"); match value { BackupError::NoBackup => ErrorNotFound("not found"), BackupError::BlobError( err @ (BlobServiceError::ClientError(_) | BlobServiceError::UnexpectedHttpStatus(_) | BlobServiceError::ServerError | BlobServiceError::UnexpectedError), ) => { warn!("Transient blob error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::BlobError(BlobServiceError::AlreadyExists) => { ErrorConflict("blob already exists") } BackupError::BlobError(BlobServiceError::InvalidArguments) => { ErrorBadRequest("bad request") } BackupError::BlobError( err @ (BlobServiceError::URLError(_) | BlobServiceError::NotFound), ) => { error!("Unexpected blob error: {err}"); ErrorInternalServerError("server error") } BackupError::AuthError(err) => { error!("Unexpected auth error: {err}"); ErrorInternalServerError("server error") } BackupError::DB(err) => match err { DBError::AwsSdk( err @ (DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_)), ) => { warn!("AWS transient error occurred: {err}"); ErrorServiceUnavailable("please retry") } unexpected => { error!("Received an unexpected DB error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, BackupError::IdentityClientError(err) => { warn!("Transient identity error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::NoUserID => ErrorBadRequest("bad request"), + BackupError::BadRequest => ErrorBadRequest("bad request"), } } } impl From for HttpError { fn from(value: BackupError) -> Self { value.into() } } impl ResponseError for BackupError { fn error_response(&self) -> HttpResponse { actix_web::Error::from(self).error_response() } fn status_code(&self) -> StatusCode { actix_web::Error::from(self) .as_response_error() .status_code() } } diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index f9316c0cb..d6965a7b0 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,332 +1,346 @@ use actix_web::{ error::ErrorBadRequest, web::{self, Bytes}, HttpResponse, Responder, }; use comm_lib::{ auth::UserIdentity, backup::LatestBackupInfoResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::{ auth_service::Authenticated, multipart::{get_named_text_field, get_text_field}, }, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::identity::find_user_id; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: Authenticated, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; - let attachments_hashes: Vec = - match get_text_field(&mut multipart).await? { - Some((name, attachments)) => { - if name != "attachments" { - warn!( - name, - "Malformed request: 'attachments' text field expected." - ); - return Err(ErrorBadRequest("Bad request")); - } - - attachments.lines().map(ToString::to_string).collect() - } - None => Vec::new(), - }; - - let mut attachments = Vec::new(); - let mut attachments_revokes = Vec::new(); - for attachment_hash in attachments_hashes { - let (holder, revoke) = - create_attachment_holder(&attachment_hash, &blob_client).await?; + let (attachments, attachments_revokes) = + process_attachments(&mut multipart, &blob_client).await?; - attachments.push(BlobInfo { - blob_hash: attachment_hash, - holder, - }); - attachments_revokes.push(revoke); - } - - let siwe_backup_msg_option: Option = - match get_text_field(&mut multipart).await? { - Some((name, siwe_backup_msg)) => { - if name == "siwe_backup_msg" { - Some(siwe_backup_msg) - } else { - None - } - } - _ => None, - }; + let siwe_backup_msg = get_siwe_backup_msg(&mut multipart).await?; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, user_data_blob_info, attachments, - siwe_backup_msg_option, + siwe_backup_msg, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, blob_client: &'blob BlobServiceClient, -) -> Result<(String, Defer<'revoke>), BackupError> { +) -> Result<(BlobInfo, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); - Ok((holder, revoke_holder)) + let blob_info = BlobInfo { + blob_hash: attachment.to_string(), + holder, + }; + + Ok((blob_info, revoke_holder)) +} + +#[instrument(skip_all)] +async fn process_attachments<'revoke, 'blob: 'revoke>( + multipart: &mut actix_multipart::Multipart, + blob_client: &'blob BlobServiceClient, +) -> Result<(Vec, Vec>), BackupError> { + let attachments_hashes: Vec = match get_text_field(multipart).await { + Ok(Some((name, attachments))) => { + if name != "attachments" { + warn!( + name, + "Malformed request: 'attachments' text field expected." + ); + return Err(BackupError::BadRequest); + } + + attachments.lines().map(ToString::to_string).collect() + } + Ok(None) => Vec::new(), + Err(_) => return Err(BackupError::BadRequest), + }; + + let mut attachments = Vec::new(); + let mut attachments_revokes = Vec::new(); + for attachment_hash in attachments_hashes { + let (attachment, revoke) = + create_attachment_holder(&attachment_hash, blob_client).await?; + attachments.push(attachment); + attachments_revokes.push(revoke); + } + + Ok((attachments, attachments_revokes)) +} + +#[instrument(skip_all)] +pub async fn get_siwe_backup_msg( + multipart: &mut actix_multipart::Multipart, +) -> actix_web::Result> { + Ok( + get_text_field(multipart) + .await? + .filter(|(name, _)| name == "siwe_backup_msg") + .map(|(_, siwe_backup_msg)| siwe_backup_msg), + ) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, blob_client.into_inner(), db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_info( path: web::Path, db_client: web::Data, ) -> actix_web::Result { let user_identifier = path.into_inner(); let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupInfoResponse { backup_id: backup_item.backup_id, user_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, blob_client: Authenticated, ) -> actix_web::Result { let user_identifier = path.into_inner(); let user_id = find_user_id(&user_identifier).await?; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } diff --git a/services/backup/src/http/mod.rs b/services/backup/src/http/mod.rs index 85a0f47e3..2db9052fd 100644 --- a/services/backup/src/http/mod.rs +++ b/services/backup/src/http/mod.rs @@ -1,85 +1,90 @@ use actix_web::{web, App, HttpResponse, HttpServer}; use anyhow::Result; use comm_lib::{ auth::AuthService, blob::client::BlobServiceClient, http::auth::get_comm_authentication_middleware, }; use tracing::info; use crate::{database::DatabaseClient, http::handlers::log::handle_ws, CONFIG}; mod handlers { pub(super) mod backup; pub(super) mod log; pub(super) mod user_data; } pub async fn run_http_server( db_client: DatabaseClient, blob_client: BlobServiceClient, auth_service: AuthService, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); let db = web::Data::new(db_client); let blob = web::Data::new(blob_client); HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(comm_lib::http::cors_config( CONFIG.localstack_endpoint.is_some(), )) .app_data(db.clone()) .app_data(blob.clone()) .app_data(auth_service.to_owned()) .route("/health", web::get().to(HttpResponse::Ok)) .service( - // Backup services that don't require authetication + // Backup services that don't require authentication web::scope("/backups/latest") .service( web::resource("{user_identifier}/backup_info") .route(web::get().to(handlers::backup::get_latest_backup_info)), ) .service(web::resource("{user_identifier}/user_keys").route( web::get().to(handlers::backup::download_latest_backup_keys), )), ) .service( - // Backup services requiring authetication + // Backup services requiring authentication web::scope("/backups") .wrap(get_comm_authentication_middleware()) + // Uploads backup data from multipart form data. + // This function requires both User Keys and User Data form fields + // in order to proceed with the upload. + // If either User Keys or User Data is not present in the form data, + // the upload will fail. .service( web::resource("").route(web::post().to(handlers::backup::upload)), ) .service( web::resource("{backup_id}/user_keys") .route(web::get().to(handlers::backup::download_user_keys)), ) .service( web::resource("{backup_id}/user_data") .route(web::get().to(handlers::backup::download_user_data)), ), ) .service( web::scope("/logs") .service(web::resource("").route(web::get().to(handle_ws))), ) .service( web::scope("/user_data") .wrap(get_comm_authentication_middleware()) .service( web::resource("{user_id}") .route(web::delete().to(handlers::user_data::delete_user_data)), ), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) }