diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs index 7af5fabc4..7ea495d94 100644 --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -1,113 +1,116 @@ -use actix_web::error::ErrorBadRequest; +use actix_web::error::{ErrorBadRequest, ErrorNotImplemented}; use actix_web::{web, HttpResponse}; use comm_lib::blob::types::http::{ AssignHoldersRequest, AssignHoldersResponse, BlobInfo, HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse, }; use tracing::{info, instrument, trace, warn}; use crate::service::BlobService; #[instrument(name = "assign_multiple_holders", skip_all)] pub async fn assign_holders_handler( service: web::Data, payload: web::Json, ) -> actix_web::Result { use crate::database::DBError; use crate::service::BlobServiceError; let AssignHoldersRequest { requests } = payload.into_inner(); info!("Assign holder request for {} holders", requests.len()); validate_request(&requests)?; let blob_hashes = requests.iter().map(|it| &it.blob_hash).collect(); let existing_blobs = service.find_existing_blobs(blob_hashes).await?; let mut results = Vec::with_capacity(requests.len()); for item in requests { let BlobInfo { blob_hash, holder } = &item; let data_exists = existing_blobs.contains(blob_hash); let result = match service.assign_holder(blob_hash, holder).await { Ok(()) => HolderAssignmentResult { request: item, success: true, data_exists, holder_already_exists: false, }, Err(BlobServiceError::DB(DBError::ItemAlreadyExists)) => { HolderAssignmentResult { request: item, success: true, data_exists, holder_already_exists: true, } } Err(err) => { warn!("Holder assignment error: {:?}", err); HolderAssignmentResult { request: item, success: false, data_exists, holder_already_exists: false, } } }; results.push(result); } let response = AssignHoldersResponse { results }; Ok(HttpResponse::Ok().json(web::Json(response))) } #[instrument(name = "remove_multiple_holders", skip_all)] pub async fn remove_holders_handler( service: web::Data, payload: web::Json, ) -> actix_web::Result { - let RemoveHoldersRequest { + let RemoveHoldersRequest::Items { requests, instant_delete, - } = payload.into_inner(); + } = payload.into_inner() + else { + return Err(ErrorNotImplemented("not implemented")); + }; info!( instant_delete, "Remove request for {} holders.", requests.len() ); validate_request(&requests)?; let mut failed_requests = Vec::new(); // This has to be done sequentially because `service.revoke_holder()` // performs a DDB transaction and these transactions could conflict // with each other, e.g. if two holders were removed for the same blob hash. for item in requests { trace!("Removing item: {:?}", &item); let BlobInfo { holder, blob_hash } = &item; if let Err(err) = service .revoke_holder(blob_hash, holder, instant_delete) .await { warn!("Holder removal failed: {:?}", err); failed_requests.push(item); } } let response = RemoveHoldersResponse { failed_requests }; Ok(HttpResponse::Ok().json(web::Json(response))) } /** * Returns `HTTP 400 Bad Request` if one or more blob hashes or holders * have invalid format. See [`comm_lib::tools::is_valid_identifier`] for * valid format conditions */ fn validate_request(items: &[BlobInfo]) -> actix_web::Result<()> { use comm_lib::tools::is_valid_identifier; let all_valid = items.iter().all(|BlobInfo { holder, blob_hash }| { is_valid_identifier(holder) && is_valid_identifier(blob_hash) }); if !all_valid { return Err(ErrorBadRequest("One or more requests have invalid format")); } Ok(()) } diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs index fa9f91526..24fac1442 100644 --- a/shared/comm-lib/src/blob/client.rs +++ b/shared/comm-lib/src/blob/client.rs @@ -1,465 +1,465 @@ use bytes::Bytes; use derive_more::{Display, Error, From}; use futures_core::Stream; use futures_util::StreamExt; use reqwest::{ multipart::{Form, Part}, Body, Method, RequestBuilder, }; use tracing::{debug, error, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; pub use reqwest::Url; use crate::{ auth::{AuthorizationCredential, UserIdentity}, blob::types::http::{ AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest, RemoveHoldersRequest, }, }; use super::types::{http::RemoveHoldersResponse, BlobInfo}; #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: /// - connection failures /// - request errors (e.g. upload input stream) #[display(...)] ClientError(ReqwestError), /// Invalid Blob service URL provided #[display(...)] URLError(#[error(ignore)] String), /// Blob service returned HTTP 404 /// - blob or holder not found #[display(...)] NotFound, /// Blob service returned HTTP 409 /// - blob or holder already exists #[display(...)] AlreadyExists, /// Blob service returned HTTP 400 /// - invalid holder or blob_hash format #[display(...)] InvalidArguments, /// Blob service returned HTTP 50x #[display(...)] ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), #[display(...)] UnexpectedError, } /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// /// A clone is recommended for each individual client identity, that has /// a different `UserIdentity`. /// /// # Example /// ```ignore /// // create client for user A /// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA); /// /// // reuse the client connection with different credentials /// let clientB = clientA.with_user_identity(userB); /// /// // clientA still uses userA credentials /// ``` /// /// A clone is recommended when the client concurrently handles multiple identities - /// e.g. a HTTP service handling requests from different users. The `with_user_identity()` /// method should be used for this purpose. /// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, auth_credential: Option, } impl BlobServiceClient { /// Creates a new Blob Service client instance. It is unauthenticated by default /// so you need to call `with_user_identity()` afterwards: /// ```ignore /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user); /// ``` /// /// **Note**: It is advised to create this client once and reuse by **cloning**. /// See [`BlobServiceClient`] docs for details pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, auth_credential: None, } } /// Clones the client and sets the [`UserIdentity`] for the new instance. /// This allows the client to reuse the same connection pool for different users. /// /// This is the same as calling /// ```ignore /// client.with_authentication(AuthorizationCredential::UserToken(user_identity)) /// ```` pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self { self.with_authentication(AuthorizationCredential::UserToken(user_identity)) } /// Clones the client and sets the [`AuthorizationCredential`] for the new instance. /// This allows the client to reuse the same connection pool for different users. pub fn with_authentication( &self, auth_credential: AuthorizationCredential, ) -> Self { trace!("Set auth_credential: {:?}", &auth_credential); let mut this = self.clone(); this.auth_credential = Some(auth_credential); this } /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes /// /// # Errors thrown /// - [BlobServiceError::NotFound] if blob with given hash does not exist /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format /// /// # Example /// ```ignore /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let mut stream = client.get("hello").await?; /// while let Some(data) = stream.try_next().await? { /// println!("Got data: {:?}", data); /// } /// ``` pub async fn get( &self, blob_hash: &str, ) -> BlobResult>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; let response = self .request(Method::GET, url)? .send() .await .map_err(BlobServiceError::ClientError)?; if !response.status().is_success() { return error_response_result(response).await; } let stream = response.bytes_stream().map(|result| match result { Ok(bytes) => Ok(bytes), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) } }); Ok(stream) } /// Assigns a new holder to a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::AlreadyExists` if blob already has /// a holder with given [`holder`] name. pub async fn assign_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult { debug!("Assign holder request"); let url = self.get_blob_url(None)?; let payload = AssignHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self .request(Method::POST, url)? .json(&payload) .send() .await?; if !response.status().is_success() { return error_response_result(response).await; } let AssignHolderResponse { data_exists } = response.json().await?; trace!("Data exists: {}", data_exists); Ok(data_exists) } /// Revokes given holder from a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist /// or it does not have such holder pub async fn revoke_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult<()> { debug!("Revoke holder request"); let url = self.get_blob_url(None)?; let payload = RemoveHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), instant_delete: false, }; debug!("Request payload: {:?}", payload); let response = self .request(Method::DELETE, url)? .json(&payload) .send() .await?; if response.status().is_success() { trace!("Revoke holder request successful"); return Ok(()); } error_response_result(response).await } /// Removes multiple holders. /// - Holders don't have to own the same blob item. For each item /// a (blob hash; holder) pair is specified. /// - Operation is idempotent. Not existing holders are treated as /// successfully removed. /// - If one or more removal failed server-side, these will be returned /// in the `failed_requests` response field. It has the same format /// as this function input and can be directly used to retry removal. /// /// For single holder removal, see [`BlobServiceClient::revoke_holder`]. pub async fn remove_multiple_holders( &self, blob_infos: Vec, ) -> BlobResult { let num_holders = blob_infos.len(); debug!(num_holders, "Revoke multiple holders request."); let url = self.get_holders_url()?; - let payload = RemoveHoldersRequest { + let payload = RemoveHoldersRequest::Items { requests: blob_infos, instant_delete: false, }; trace!("Request payload: {:?}", payload); let response = self .request(Method::DELETE, url)? .json(&payload) .send() .await?; if !response.status().is_success() { return error_response_result(response).await; } let result: RemoveHoldersResponse = response.json().await?; debug!( "Request successful. {} holders failed to be removed.", result.failed_requests.len() ); Ok(result) } /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// /// # Example /// ```ignore /// use std::io::{Error, ErrorKind}; /// /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let stream = async_stream::stream! { /// yield Ok(vec![1, 2, 3]); /// yield Ok(vec![4, 5, 6]); /// yield Err(Error::new(ErrorKind::Other, "Oops")); /// }; /// client.upload_blob(&blob_hash, stream).await?; /// ``` pub async fn upload_blob( &self, blob_hash: H, data_stream: S, ) -> BlobResult<()> where H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; let streaming_body = Body::wrap_stream(data_stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); let response = self .request(Method::PUT, url)? .multipart(form) .send() .await?; if response.status().is_success() { trace!("Blob upload successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. /// /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, /// uploads the data from [`data_stream`]. pub async fn simple_put( &self, blob_hash: &str, holder: &str, data_stream: S, ) -> BlobResult where S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { trace!("Begin simple put. Assigning holder..."); let data_exists = self.assign_holder(blob_hash, holder).await?; if data_exists { trace!("Blob data already exists. Skipping upload."); return Ok(false); } trace!("Uploading blob data..."); let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await else { return Ok(true); }; trace!(%blob_hash, %holder, "Revoking holder due to upload failure"); self.schedule_revoke_holder(blob_hash, holder); Err(upload_error) } /// Revokes holder in a separate task. Useful to clean up after /// upload failure without blocking the current task. pub fn schedule_revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) { let this = self.clone(); let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); tokio::spawn(async move { if let Err(err) = this.revoke_holder(&blob_hash, &holder).await { warn!("Failed to revoke holder: {0:?} - {0}", err); } }); } } // private helper methods impl BlobServiceClient { fn get_blob_url( &self, blob_hash: Option<&str>, ) -> Result { let path = match blob_hash { Some(hash) => format!("/blob/{}", hash), None => "/blob".to_string(), }; let url = self .blob_service_url .join(&path) .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } fn get_holders_url(&self) -> Result { let url = self .blob_service_url .join("/holders") .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } fn request( &self, http_method: Method, url: Url, ) -> BlobResult { let request = self.http_client.request(http_method, url); match &self.auth_credential { Some(credential) => { let token = credential.as_authorization_token().map_err(|e| { error!("Failed to parse authorization token: {}", e); BlobServiceError::UnexpectedError })?; Ok(request.bearer_auth(token)) } None => Ok(request), } } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError { match status_code { StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, StatusCode::NOT_FOUND => BlobServiceError::NotFound, StatusCode::CONFLICT => BlobServiceError::AlreadyExists, code if code.is_server_error() => BlobServiceError::ServerError, code => BlobServiceError::UnexpectedHttpStatus(code), } } async fn error_response_result( response: reqwest::Response, ) -> BlobResult { let status = response.status(); debug!("Response status: {}", status); let error = handle_http_error(status); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } type BlobResult = Result; #[cfg(feature = "http")] impl crate::http::auth_service::HttpAuthenticatedService for BlobServiceClient { fn make_authenticated( self, auth_credential: AuthorizationCredential, ) -> Self { self.with_authentication(auth_credential) } fn accepts_services_token(&self, _req: &actix_web::HttpRequest) -> bool { true } } diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs index d62fbcf78..4a851f338 100644 --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -1,141 +1,217 @@ use derive_more::Constructor; use hex::ToHex; +use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; /// This module defines structures for HTTP requests and responses /// for the Blob Service. The definitions in this file should remain in sync /// with the types and validators defined in the corresponding /// JavaScript file at `lib/types/blob-service-types.js`. /// /// If you edit the definitions in one file, /// please make sure to update the corresponding definitions in the other. pub mod http { use serde::{Deserialize, Serialize}; pub use super::BlobInfo; // Assign multiple holders #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct AssignHoldersRequest { pub requests: Vec, } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct HolderAssignmentResult { #[serde(flatten)] pub request: BlobInfo, pub success: bool, pub data_exists: bool, pub holder_already_exists: bool, } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct AssignHoldersResponse { pub results: Vec, } // Remove multiple holders #[derive(Serialize, Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - pub struct RemoveHoldersRequest { - pub requests: Vec, - #[serde(default)] - pub instant_delete: bool, + #[serde(untagged)] + pub enum RemoveHoldersRequest { + // remove holders with given (hash, holder) pairs + #[serde(rename_all = "camelCase")] + Items { + requests: Vec, + /// If true, the blobs will be deleted instantly + /// after their last holders are revoked. + #[serde(default)] + instant_delete: bool, + }, + // remove all holders that are indexed by any of given tags + ByIndexedTags { + tags: Vec, + }, } + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct RemoveHoldersResponse { pub failed_requests: Vec, } // Single holder endpoint types #[derive(Serialize, Deserialize, Debug)] pub struct AssignHolderRequest { pub blob_hash: String, pub holder: String, } #[derive(Serialize, Deserialize, Debug)] pub struct AssignHolderResponse { pub data_exists: bool, } #[derive(Serialize, Deserialize, Debug)] pub struct RemoveHolderRequest { pub blob_hash: String, pub holder: String, - /// If true, the blob will be deleted intantly + /// If true, the blob will be deleted instantly /// after the last holder is revoked. #[serde(default)] pub instant_delete: bool, } } /// Blob owning information - stores both blob_hash and holder -#[derive(Clone, Debug, Constructor, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Constructor, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BlobInfo { pub blob_hash: String, pub holder: String, } impl BlobInfo { pub fn from_bytes(data: &[u8]) -> Self { Self { blob_hash: Sha256::digest(data).encode_hex(), holder: uuid::Uuid::new_v4().to_string(), } } } #[cfg(feature = "aws")] mod db_conversions { use super::*; use crate::database::{AttributeTryInto, DBItemError, TryFromAttribute}; use aws_sdk_dynamodb::types::AttributeValue; use std::collections::HashMap; const BLOB_HASH_DDB_MAP_KEY: &str = "blob_hash"; const HOLDER_DDB_MAP_KEY: &str = "holder"; impl From for AttributeValue { fn from(value: BlobInfo) -> Self { let map = HashMap::from([ ( BLOB_HASH_DDB_MAP_KEY.to_string(), AttributeValue::S(value.blob_hash), ), ( HOLDER_DDB_MAP_KEY.to_string(), AttributeValue::S(value.holder), ), ]); AttributeValue::M(map) } } impl From<&BlobInfo> for AttributeValue { fn from(value: &BlobInfo) -> Self { AttributeValue::from(value.to_owned()) } } impl TryFromAttribute for BlobInfo { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name: String = attribute_name.into(); let mut inner_map: HashMap = attribute.attr_try_into(&attr_name)?; let blob_hash = inner_map .remove("blob_hash") .attr_try_into(format!("{attr_name}.blob_hash"))?; let holder = inner_map .remove("holder") .attr_try_into(format!("{attr_name}.holder"))?; Ok(BlobInfo { blob_hash, holder }) } } } + +#[cfg(test)] +mod serialization_tests { + use super::http::*; + mod remove_holders_request { + use super::*; + + #[test] + fn serialize_items() { + let req = RemoveHoldersRequest::Items { + requests: vec![BlobInfo::new("a".into(), "b".into())], + instant_delete: false, + }; + let expected = + r#"{"requests":[{"blobHash":"a","holder":"b"}],"instantDelete":false}"#; + assert_eq!(expected, serde_json::to_string(&req).unwrap()); + } + + #[test] + fn deserialize_items() { + let json = + r#"{"requests":[{"blobHash":"a","holder":"b"}],"instantDelete":false}"#; + let deserialized: RemoveHoldersRequest = + serde_json::from_str(json).expect("Request JSON payload invalid"); + + let expected_items = vec![BlobInfo::new("a".into(), "b".into())]; + + let is_matching = matches!( + deserialized, + RemoveHoldersRequest::Items { + requests: items, + instant_delete: false, + } if items == expected_items + ); + assert!(is_matching, "Deserialized request is incorrect"); + } + + #[test] + fn serialize_tags() { + let req = RemoveHoldersRequest::ByIndexedTags { + tags: vec!["foo".into(), "bar".into()], + }; + let expected = r#"{"tags":["foo","bar"]}"#; + assert_eq!(expected, serde_json::to_string(&req).unwrap()); + } + + #[test] + fn deserialize_tags() { + let json = r#"{"tags":["foo","bar"]}"#; + let deserialized: RemoveHoldersRequest = + serde_json::from_str(json).expect("Request JSON payload invalid"); + + let expected_tags: Vec = vec!["foo".into(), "bar".into()]; + + let is_matching = matches!( + deserialized, + RemoveHoldersRequest::ByIndexedTags { + tags: actual_tags + } if actual_tags == expected_tags + ); + assert!(is_matching, "Deserialized request is incorrect"); + } + } +}