diff --git a/services/backup/src/error.rs b/services/backup/src/error.rs index b8ebdfa5f..466a50647 100644 --- a/services/backup/src/error.rs +++ b/services/backup/src/error.rs @@ -1,83 +1,88 @@ use actix_web::{ error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, HttpError, }, HttpResponse, ResponseError, }; pub use aws_sdk_dynamodb::Error as DynamoDBError; -use comm_lib::blob::client::BlobServiceError; use comm_lib::database::Error as DBError; +use comm_lib::{auth::AuthServiceError, blob::client::BlobServiceError}; use reqwest::StatusCode; use tracing::{error, trace, warn}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BackupError { NoBackup, BlobError(BlobServiceError), + AuthError(AuthServiceError), DB(comm_lib::database::Error), } impl From<&BackupError> for actix_web::Error { fn from(value: &BackupError) -> Self { trace!("Handling backup service error: {value}"); match value { BackupError::NoBackup => ErrorNotFound("not found"), BackupError::BlobError( err @ (BlobServiceError::ClientError(_) | BlobServiceError::UnexpectedHttpStatus(_) | BlobServiceError::ServerError | BlobServiceError::UnexpectedError), ) => { warn!("Transient blob error occurred: {err}"); ErrorServiceUnavailable("please retry") } BackupError::BlobError(BlobServiceError::AlreadyExists) => { ErrorConflict("blob already exists") } BackupError::BlobError(BlobServiceError::InvalidArguments) => { ErrorBadRequest("bad request") } BackupError::BlobError( err @ (BlobServiceError::URLError(_) | BlobServiceError::NotFound), ) => { error!("Unexpected blob error: {err}"); ErrorInternalServerError("server error") } + BackupError::AuthError(err) => { + error!("Unexpected auth error: {err}"); + ErrorInternalServerError("server error") + } BackupError::DB(err) => match err { DBError::AwsSdk( err @ (DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_)), ) => { warn!("AWS transient error occurred: {err}"); ErrorServiceUnavailable("please retry") } unexpected => { error!("Received an unexpected DB error: {0:?} - {0}", unexpected); ErrorInternalServerError("server error") } }, } } } impl From for HttpError { fn from(value: BackupError) -> Self { value.into() } } impl ResponseError for BackupError { fn error_response(&self) -> HttpResponse { actix_web::Error::from(self).error_response() } fn status_code(&self) -> StatusCode { actix_web::Error::from(self) .as_response_error() .status_code() } } diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index bd5dcc861..1b8d5ad69 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,328 +1,345 @@ use actix_web::{ - error::ErrorBadRequest, + error::{ErrorBadRequest, ErrorInternalServerError}, web::{self, Bytes}, - HttpResponse, Responder, + HttpRequest, HttpResponse, Responder, }; use comm_lib::{ - auth::UserIdentity, + auth::{AuthService, UserIdentity}, backup::LatestBackupIDResponse, blob::{client::BlobServiceClient, types::BlobInfo}, http::multipart::{get_named_text_field, get_text_field}, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, blob_client: web::Data, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; + let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let attachments_hashes: Vec = match get_text_field(&mut multipart).await? { Some((name, attachments)) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(ErrorBadRequest("Bad request")); } attachments.lines().map(ToString::to_string).collect() } None => Vec::new(), }; let mut attachments = Vec::new(); let mut attachments_revokes = Vec::new(); for attachment_hash in attachments_hashes { let (holder, revoke) = create_attachment_holder(&attachment_hash, &blob_client).await?; attachments.push(BlobInfo { blob_hash: attachment_hash, holder, }); attachments_revokes.push(revoke); } let siwe_backup_msg_option: Option = match get_text_field(&mut multipart).await? { Some((name, siwe_backup_msg)) => { if name == "siwe_backup_msg" { Some(siwe_backup_msg) } else { None } } _ => None, }; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, user_data_blob_info, attachments, siwe_backup_msg_option, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, - blob_client: &'blob web::Data, + blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, - blob_client: &'blob web::Data, + blob_client: &'blob BlobServiceClient, ) -> Result<(String, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); Ok((holder, revoke_holder)) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { + let blob_client = blob_client.with_user_identity(user.clone()); info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, blob_client, db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, blob_client: web::Data, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); + let blob_client = blob_client.with_user_identity(user.clone()); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, blob_client, db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, - blob_client: web::Data, + blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_id( path: web::Path, db_client: web::Data, ) -> actix_web::Result { let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupIDResponse { backup_id: backup_item.backup_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, blob_client: web::Data, + req: HttpRequest, ) -> actix_web::Result { + let auth_service = req.app_data::().ok_or_else(|| { + tracing::error!( + "Failed to get AuthService from request. Check HTTP server config." + ); + ErrorInternalServerError("internal error") + })?; + let services_token = auth_service + .get_services_token() + .await + .map_err(BackupError::from)?; + let blob_client = blob_client.with_authentication( + comm_lib::auth::AuthorizationCredential::ServicesToken(services_token), + ); let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } diff --git a/services/backup/src/http/handlers/log.rs b/services/backup/src/http/handlers/log.rs index 71d19cdfb..9b9743996 100644 --- a/services/backup/src/http/handlers/log.rs +++ b/services/backup/src/http/handlers/log.rs @@ -1,342 +1,342 @@ use crate::constants::WS_FRAME_SIZE; use crate::database::{log_item::LogItem, DatabaseClient}; use actix::fut::ready; use actix::{Actor, ActorContext, ActorFutureExt, AsyncContext, StreamHandler}; use actix_http::ws::{CloseCode, Item}; use actix_web::{ web::{self, Bytes, BytesMut}, Error, HttpRequest, HttpResponse, }; use actix_web_actors::ws::{self, WebsocketContext}; use comm_lib::auth::UserIdentity; use comm_lib::{ backup::{ DownloadLogsRequest, LogWSRequest, LogWSResponse, UploadLogRequest, }, blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, database::{self, blob::BlobOrDBContent}, }; use std::future::Future; use std::time::{Duration, Instant}; use tracing::{error, info, instrument, warn}; pub async fn handle_ws( req: HttpRequest, stream: web::Payload, blob_client: web::Data, db_client: web::Data, ) -> Result { ws::WsResponseBuilder::new( LogWSActor { user: None, blob_client: blob_client.as_ref().clone(), db_client: db_client.as_ref().clone(), last_msg_time: Instant::now(), buffer: BytesMut::new(), }, &req, stream, ) .frame_size(WS_FRAME_SIZE) .start() } #[derive( Debug, derive_more::From, derive_more::Display, derive_more::Error, )] enum LogWSError { Bincode(bincode::Error), Blob(BlobServiceError), DB(database::Error), } struct LogWSActor { user: Option, blob_client: BlobServiceClient, db_client: DatabaseClient, last_msg_time: Instant, buffer: BytesMut, } impl LogWSActor { const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); fn handle_msg_sync( &mut self, ctx: &mut WebsocketContext, bytes: Bytes, ) { match bincode::deserialize(&bytes) { Ok(request) => { if let LogWSRequest::Authenticate(user) = request { self.user.replace(user); return; } let Some(user) = &self.user else { Self::spawn_response_future( ctx, ready(Ok(vec![LogWSResponse::Unauthenticated])), ); return; }; Self::spawn_response_future( ctx, Self::handle_msg( user.user_id.clone(), - self.blob_client.clone(), + self.blob_client.clone().with_user_identity(user.clone()), self.db_client.clone(), request, ), ); } Err(err) => { error!("Error: {err:?}"); Self::spawn_response_future( ctx, ready(Ok(vec![LogWSResponse::ServerError])), ); } }; } fn spawn_response_future( ctx: &mut WebsocketContext, future: impl Future, LogWSError>> + 'static, ) { let fut = actix::fut::wrap_future(future).map( |responses, _: &mut LogWSActor, ctx: &mut WebsocketContext| { let responses = match responses { Ok(responses) => responses, Err(err) => { error!("Error: {err:?}"); vec![LogWSResponse::ServerError] } }; for response in responses { match bincode::serialize(&response) { Ok(bytes) => ctx.binary(bytes), Err(error) => { error!( "Error serializing a response: {response:?}. Error: {error}" ); } }; } }, ); ctx.spawn(fut); } async fn handle_msg( user_id: String, blob_client: BlobServiceClient, db_client: DatabaseClient, request: LogWSRequest, ) -> Result, LogWSError> { match request { LogWSRequest::UploadLog(UploadLogRequest { backup_id, log_id, content, attachments, }) => { let mut attachment_blob_infos = Vec::new(); for attachment in attachments.unwrap_or_default() { let blob_info = Self::create_attachment(&blob_client, attachment).await?; attachment_blob_infos.push(blob_info); } let mut log_item = LogItem { user_id, backup_id: backup_id.clone(), log_id, content: BlobOrDBContent::new(content), attachments: attachment_blob_infos, }; log_item.ensure_size_constraints(&blob_client).await?; db_client.put_log_item(log_item, &blob_client).await?; Ok(vec![LogWSResponse::LogUploaded { backup_id, log_id }]) } LogWSRequest::DownloadLogs(DownloadLogsRequest { backup_id, from_id, }) => { let (log_items, last_id) = db_client .fetch_log_items(&user_id, &backup_id, from_id) .await?; let mut messages = vec![]; for LogItem { log_id, content, attachments, .. } in log_items { let content = content.fetch_bytes(&blob_client).await?; let attachments: Vec = attachments.into_iter().map(|att| att.blob_hash).collect(); let attachments = if attachments.is_empty() { None } else { Some(attachments) }; messages.push(LogWSResponse::LogDownload { log_id, content, attachments, }) } messages.push(LogWSResponse::LogDownloadFinished { last_log_id: last_id, }); Ok(messages) } LogWSRequest::Authenticate(_) => { warn!("LogWSRequest::Authenticate should have been handled earlier."); Ok(Vec::new()) } } } async fn create_attachment( blob_client: &BlobServiceClient, attachment: String, ) -> Result { let blob_info = BlobInfo { blob_hash: attachment, holder: uuid::Uuid::new_v4().to_string(), }; if !blob_client .assign_holder(&blob_info.blob_hash, &blob_info.holder) .await? { warn!( "Blob attachment with hash {:?} doesn't exist", blob_info.blob_hash ); } Ok(blob_info) } } impl Actor for LogWSActor { type Context = ws::WebsocketContext; #[instrument(skip_all)] fn started(&mut self, ctx: &mut Self::Context) { info!("Socket opened"); ctx.run_interval(Self::HEARTBEAT_INTERVAL, |actor, ctx| { if Instant::now().duration_since(actor.last_msg_time) > Self::CONNECTION_TIMEOUT { warn!("Socket timeout, closing connection"); ctx.stop(); return; } ctx.ping(&[]); }); } #[instrument(skip_all)] fn stopped(&mut self, _: &mut Self::Context) { info!("Socket closed"); } } impl StreamHandler> for LogWSActor { #[instrument(skip_all)] fn handle( &mut self, msg: Result, ctx: &mut Self::Context, ) { let msg = match msg { Ok(msg) => msg, Err(err) => { warn!("Error during socket message handling: {err}"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } }; self.last_msg_time = Instant::now(); match msg { ws::Message::Binary(bytes) => self.handle_msg_sync(ctx, bytes), // Continuations - this is mostly boilerplate code. Some websocket // clients may split a message into these ones ws::Message::Continuation(Item::FirstBinary(bytes)) => { if !self.buffer.is_empty() { warn!("Socket received continuation before previous was completed"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); } ws::Message::Continuation(Item::Continue(bytes)) => { if self.buffer.is_empty() { warn!("Socket received continuation message before it was started"); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); } ws::Message::Continuation(Item::Last(bytes)) => { if self.buffer.is_empty() { warn!( "Socket received last continuation message before it was started" ); ctx.close(Some(CloseCode::Error.into())); ctx.stop(); return; } self.buffer.extend_from_slice(&bytes); let bytes = self.buffer.split(); self.handle_msg_sync(ctx, bytes.into()); } // Heartbeat ws::Message::Ping(message) => ctx.pong(&message), ws::Message::Pong(_) => (), // Other ws::Message::Text(_) | ws::Message::Continuation(Item::FirstText(_)) => { warn!("Socket received unsupported message"); ctx.close(Some(CloseCode::Unsupported.into())); ctx.stop(); } ws::Message::Close(reason) => { info!("Socket was closed"); ctx.close(reason); ctx.stop(); } ws::Message::Nop => (), } } }