diff --git a/services/backup/src/http/handlers/backup.rs b/services/backup/src/http/handlers/backup.rs index 410f906b7..bdfed2477 100644 --- a/services/backup/src/http/handlers/backup.rs +++ b/services/backup/src/http/handlers/backup.rs @@ -1,339 +1,332 @@ use actix_web::{ - error::{ErrorBadRequest, ErrorInternalServerError}, + error::ErrorBadRequest, web::{self, Bytes}, - HttpRequest, HttpResponse, Responder, + HttpResponse, Responder, }; use comm_lib::{ - auth::{AuthService, UserIdentity}, + auth::UserIdentity, backup::LatestBackupIDResponse, blob::{client::BlobServiceClient, types::BlobInfo}, - http::multipart::{get_named_text_field, get_text_field}, + http::{ + auth_service::Authenticated, + multipart::{get_named_text_field, get_text_field}, + }, tools::Defer, }; use std::convert::Infallible; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tracing::{info, instrument, trace, warn}; use crate::{ database::{backup_item::BackupItem, DatabaseClient}, error::BackupError, }; #[instrument(skip_all, fields(backup_id))] pub async fn upload( user: UserIdentity, - blob_client: web::Data, + blob_client: Authenticated, db_client: web::Data, mut multipart: actix_multipart::Multipart, ) -> actix_web::Result { let backup_id = get_named_text_field("backup_id", &mut multipart).await?; let blob_client = blob_client.with_user_identity(user.clone()); tracing::Span::current().record("backup_id", &backup_id); info!("Backup data upload started"); let (user_keys_blob_info, user_keys_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_keys_hash", "user_keys", ) .await?; let (user_data_blob_info, user_data_revoke) = forward_field_to_blob( &mut multipart, &blob_client, "user_data_hash", "user_data", ) .await?; let attachments_hashes: Vec = match get_text_field(&mut multipart).await? { Some((name, attachments)) => { if name != "attachments" { warn!( name, "Malformed request: 'attachments' text field expected." ); return Err(ErrorBadRequest("Bad request")); } attachments.lines().map(ToString::to_string).collect() } None => Vec::new(), }; let mut attachments = Vec::new(); let mut attachments_revokes = Vec::new(); for attachment_hash in attachments_hashes { let (holder, revoke) = create_attachment_holder(&attachment_hash, &blob_client).await?; attachments.push(BlobInfo { blob_hash: attachment_hash, holder, }); attachments_revokes.push(revoke); } let siwe_backup_msg_option: Option = match get_text_field(&mut multipart).await? { Some((name, siwe_backup_msg)) => { if name == "siwe_backup_msg" { Some(siwe_backup_msg) } else { None } } _ => None, }; let item = BackupItem::new( user.user_id.clone(), backup_id, user_keys_blob_info, user_data_blob_info, attachments, siwe_backup_msg_option, ); db_client .put_backup_item(item) .await .map_err(BackupError::from)?; user_keys_revoke.cancel(); user_data_revoke.cancel(); for attachment_revoke in attachments_revokes { attachment_revoke.cancel(); } db_client .remove_old_backups(&user.user_id, &blob_client) .await .map_err(BackupError::from)?; Ok(HttpResponse::Ok().finish()) } #[instrument(skip_all, fields(hash_field_name, data_field_name))] async fn forward_field_to_blob<'revoke, 'blob: 'revoke>( multipart: &mut actix_multipart::Multipart, blob_client: &'blob BlobServiceClient, hash_field_name: &str, data_field_name: &str, ) -> actix_web::Result<(BlobInfo, Defer<'revoke>)> { trace!("Reading blob fields: {hash_field_name:?}, {data_field_name:?}"); let blob_hash = get_named_text_field(hash_field_name, multipart).await?; let Some(mut field) = multipart.try_next().await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request"))?; }; if field.name() != data_field_name { warn!( hash_field_name, "Malformed request: '{data_field_name}' data field expected." ); return Err(ErrorBadRequest("Bad request"))?; } let blob_info = BlobInfo { blob_hash, holder: uuid::Uuid::new_v4().to_string(), }; // [`actix_multipart::Multipart`] isn't [`std::marker::Send`], and so we cannot pass it to the blob client directly. // Instead we have to forward it to a channel and create stream from the receiver. let (tx, rx) = tokio::sync::mpsc::channel(1); let receive_promise = async move { trace!("Receiving blob data"); // [`actix_multipart::MultipartError`] isn't [`std::marker::Send`] so we return it here, and pass [`Infallible`] // as the error to the channel while let Some(chunk) = field.try_next().await? { if let Err(err) = tx.send(Result::::Ok(chunk)).await { warn!("Error when sending data through a channel: '{err}'"); // Error here means that the channel has been closed from the blob client side. We don't want to return an error // here, because `tokio::try_join!` only returns the first error it receives and we want to prioritize the backup // client error. break; } } trace!("Finished receiving blob data"); Result::<(), actix_web::Error>::Ok(()) }; let data_stream = ReceiverStream::new(rx); let send_promise = async { blob_client .simple_put(&blob_info.blob_hash, &blob_info.holder, data_stream) .await .map_err(BackupError::from)?; Ok(()) }; tokio::try_join!(receive_promise, send_promise)?; let revoke_info = blob_info.clone(); let revoke_holder = Defer::new(|| { blob_client .schedule_revoke_holder(revoke_info.blob_hash, revoke_info.holder) }); Ok((blob_info, revoke_holder)) } #[instrument(skip_all)] async fn create_attachment_holder<'revoke, 'blob: 'revoke>( attachment: &str, blob_client: &'blob BlobServiceClient, ) -> Result<(String, Defer<'revoke>), BackupError> { let holder = uuid::Uuid::new_v4().to_string(); if !blob_client .assign_holder(attachment, &holder) .await .map_err(BackupError::from)? { warn!("Blob attachment with hash {attachment:?} doesn't exist"); } let revoke_hash = attachment.to_string(); let revoke_holder = holder.clone(); let revoke_holder = Defer::new(|| { blob_client.schedule_revoke_holder(revoke_hash, revoke_holder) }); Ok((holder, revoke_holder)) } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_keys( user: UserIdentity, path: web::Path, - blob_client: web::Data, + blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { - let blob_client = blob_client.with_user_identity(user.clone()); info!("Download user keys request"); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_keys, &user.user_id, &backup_id, - blob_client, + blob_client.into_inner(), db_client, ) .await } #[instrument(skip_all, fields(backup_id = %path))] pub async fn download_user_data( user: UserIdentity, path: web::Path, - blob_client: web::Data, + blob_client: Authenticated, db_client: web::Data, ) -> actix_web::Result { info!("Download user data request"); - let blob_client = blob_client.with_user_identity(user.clone()); let backup_id = path.into_inner(); download_user_blob( |item| &item.user_data, &user.user_id, &backup_id, - blob_client, + blob_client.into_inner(), db_client, ) .await } pub async fn download_user_blob( data_extractor: impl FnOnce(&BackupItem) -> &BlobInfo, user_id: &str, backup_id: &str, blob_client: BlobServiceClient, db_client: web::Data, ) -> actix_web::Result { let backup_item = db_client .find_backup_item(user_id, backup_id) .await .map_err(BackupError::from)? .ok_or(BackupError::NoBackup)?; let stream = blob_client .get(&data_extractor(&backup_item).blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } #[instrument(skip_all, fields(username = %path))] pub async fn get_latest_backup_id( path: web::Path, db_client: web::Data, ) -> actix_web::Result { let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let response = LatestBackupIDResponse { backup_id: backup_item.backup_id, siwe_backup_msg: backup_item.siwe_backup_msg, }; Ok(web::Json(response)) } #[instrument(skip_all, fields(username = %path))] pub async fn download_latest_backup_keys( path: web::Path, db_client: web::Data, - blob_client: web::Data, - auth_service: AuthService, + blob_client: Authenticated, ) -> actix_web::Result { - let services_token = auth_service - .get_services_token() - .await - .map_err(BackupError::from)?; - let blob_client = blob_client.with_authentication( - comm_lib::auth::AuthorizationCredential::ServicesToken(services_token), - ); let username = path.into_inner(); // Treat username as user_id in the initial version let user_id = username; let Some(backup_item) = db_client .find_last_backup_item(&user_id) .await .map_err(BackupError::from)? else { return Err(BackupError::NoBackup.into()); }; let stream = blob_client .get(&backup_item.user_keys.blob_hash) .await .map_err(BackupError::from)?; Ok( HttpResponse::Ok() .content_type("application/octet-stream") .streaming(stream), ) } diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs index a972fc22e..3586484a9 100644 --- a/shared/comm-lib/src/blob/client.rs +++ b/shared/comm-lib/src/blob/client.rs @@ -1,408 +1,418 @@ use bytes::Bytes; use derive_more::{Display, Error, From}; use futures_core::Stream; use futures_util::StreamExt; use reqwest::{ multipart::{Form, Part}, Body, Method, RequestBuilder, }; use tracing::{debug, error, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; pub use reqwest::Url; use crate::auth::{AuthorizationCredential, UserIdentity}; #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: /// - connection failures /// - request errors (e.g. upload input stream) #[display(...)] ClientError(ReqwestError), /// Invalid Blob service URL provided #[display(...)] URLError(#[error(ignore)] String), /// Blob service returned HTTP 404 /// - blob or holder not found #[display(...)] NotFound, /// Blob service returned HTTP 409 /// - blob or holder already exists #[display(...)] AlreadyExists, /// Blob service returned HTTP 400 /// - invalid holder or blob_hash format #[display(...)] InvalidArguments, /// Blob service returned HTTP 50x #[display(...)] ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), #[display(...)] UnexpectedError, } /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// /// A clone is recommended for each individual client identity, that has /// a different `UserIdentity`. /// /// # Example /// ```ignore /// // create client for user A /// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA); /// /// // reuse the client connection with different credentials /// let clientB = clientA.with_user_identity(userB); /// /// // clientA still uses userA credentials /// ``` /// /// A clone is recommended when the client concurrently handles multiple identities - /// e.g. a HTTP service handling requests from different users. The `with_user_identity()` /// method should be used for this purpose. /// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, auth_credential: Option, } impl BlobServiceClient { /// Creates a new Blob Service client instance. It is unauthenticated by default /// so you need to call `with_user_identity()` afterwards: /// ```ignore /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user); /// ``` /// /// **Note**: It is advised to create this client once and reuse by **cloning**. /// See [`BlobServiceClient`] docs for details pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, auth_credential: None, } } /// Clones the client and sets the [`UserIdentity`] for the new instance. /// This allows the client to reuse the same connection pool for different users. /// /// This is the same as calling /// ```ignore /// client.with_authentication(AuthorizationCredential::UserToken(user_identity)) /// ```` pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self { self.with_authentication(AuthorizationCredential::UserToken(user_identity)) } /// Clones the client and sets the [`AuthorizationCredential`] for the new instance. /// This allows the client to reuse the same connection pool for different users. pub fn with_authentication( &self, auth_credential: AuthorizationCredential, ) -> Self { trace!("Set auth_credential: {:?}", &auth_credential); let mut this = self.clone(); this.auth_credential = Some(auth_credential); this } /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes /// /// # Errors thrown /// - [BlobServiceError::NotFound] if blob with given hash does not exist /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format /// /// # Example /// ```ignore /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let mut stream = client.get("hello").await?; /// while let Some(data) = stream.try_next().await? { /// println!("Got data: {:?}", data); /// } /// ``` pub async fn get( &self, blob_hash: &str, ) -> BlobResult>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; let response = self .request(Method::GET, url)? .send() .await .map_err(BlobServiceError::ClientError)?; debug!("Response status: {}", response.status()); if response.status().is_success() { let stream = response.bytes_stream().map(|result| match result { Ok(bytes) => Ok(bytes), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) } }); return Ok(stream); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Assigns a new holder to a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::AlreadyExists` if blob already has /// a holder with given [`holder`] name. pub async fn assign_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult { debug!("Assign holder request"); let url = self.get_blob_url(None)?; let payload = AssignHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self .request(Method::POST, url)? .json(&payload) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { let AssignHolderResponse { data_exists } = response.json().await?; trace!("Data exists: {}", data_exists); return Ok(data_exists); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Revokes given holder from a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist /// or it does not have such holder pub async fn revoke_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult<()> { debug!("Revoke holder request"); let url = self.get_blob_url(None)?; let payload = RevokeHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self .request(Method::DELETE, url)? .json(&payload) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Revoke holder request successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// /// # Example /// ```ignore /// use std::io::{Error, ErrorKind}; /// /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let stream = async_stream::stream! { /// yield Ok(vec![1, 2, 3]); /// yield Ok(vec![4, 5, 6]); /// yield Err(Error::new(ErrorKind::Other, "Oops")); /// }; /// client.upload_blob(&blob_hash, stream).await?; /// ``` pub async fn upload_blob( &self, blob_hash: H, data_stream: S, ) -> BlobResult<()> where H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; let streaming_body = Body::wrap_stream(data_stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); let response = self .request(Method::PUT, url)? .multipart(form) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Blob upload successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. /// /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, /// uploads the data from [`data_stream`]. pub async fn simple_put( &self, blob_hash: &str, holder: &str, data_stream: S, ) -> BlobResult where S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { trace!("Begin simple put. Assigning holder..."); let data_exists = self.assign_holder(blob_hash, holder).await?; if data_exists { trace!("Blob data already exists. Skipping upload."); return Ok(false); } trace!("Uploading blob data..."); let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await else { return Ok(true); }; trace!(%blob_hash, %holder, "Revoking holder due to upload failure"); self.schedule_revoke_holder(blob_hash, holder); Err(upload_error) } /// Revokes holder in a separate task. Useful to clean up after /// upload failure without blocking the current task. pub fn schedule_revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) { let this = self.clone(); let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); tokio::spawn(async move { if let Err(err) = this.revoke_holder(&blob_hash, &holder).await { warn!("Failed to revoke holder: {0:?} - {0}", err); } }); } } // private helper methods impl BlobServiceClient { fn get_blob_url( &self, blob_hash: Option<&str>, ) -> Result { let path = match blob_hash { Some(hash) => format!("/blob/{}", hash), None => "/blob".to_string(), }; let url = self .blob_service_url .join(&path) .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } fn request( &self, http_method: Method, url: Url, ) -> BlobResult { let request = self.http_client.request(http_method, url); match &self.auth_credential { Some(credential) => { let token = credential.as_authorization_token().map_err(|e| { error!("Failed to parse authorization token: {}", e); BlobServiceError::UnexpectedError })?; Ok(request.bearer_auth(token)) } None => Ok(request), } } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError { match status_code { StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, StatusCode::NOT_FOUND => BlobServiceError::NotFound, StatusCode::CONFLICT => BlobServiceError::AlreadyExists, code if code.is_server_error() => BlobServiceError::ServerError, code => BlobServiceError::UnexpectedHttpStatus(code), } } type BlobResult = Result; #[derive(serde::Deserialize)] struct AssignHolderResponse { data_exists: bool, } #[derive(Debug, serde::Serialize)] struct AssignHolderRequest { blob_hash: String, holder: String, } // they have the same layout so we can simply alias type RevokeHolderRequest = AssignHolderRequest; + +#[cfg(feature = "http")] +impl crate::http::auth_service::HttpAuthenticatedService for BlobServiceClient { + fn make_authenticated( + self, + auth_credential: AuthorizationCredential, + ) -> Self { + self.with_authentication(auth_credential) + } +}