diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index d0bedeabb..632cc87c0 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,243 +1,224 @@ use std::collections::HashSet; use crate::http::errors::handle_blob_service_error; use crate::service::BlobService; use crate::validate_identifier; use actix_web::error::{ErrorBadRequest, ErrorRangeNotSatisfiable}; use actix_web::web::Bytes; use actix_web::{ http::header::{ByteRangeSpec, Range}, web, HttpResponse, }; use async_stream::try_stream; use base64::Engine; +use comm_lib::blob::types::http::{ + AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest, +}; use comm_lib::http::multipart; -use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; use tracing::{debug, info, instrument, trace, warn}; use tracing_futures::Instrument; /// Returns a tuple of first and last byte number (inclusive) represented by given range header. fn parse_range_header( range_header: &Option>, file_size: u64, ) -> actix_web::Result<(u64, u64)> { let (range_start, range_end): (u64, u64) = match range_header { Some(web::Header(Range::Bytes(ranges))) => { if ranges.len() > 1 { return Err(ErrorBadRequest("Multiple ranges not supported")); } match ranges[0] { ByteRangeSpec::FromTo(start, end) => { if end >= file_size || start > end { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, end) } ByteRangeSpec::From(start) => { if start >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, file_size - 1) } ByteRangeSpec::Last(length) => { if length >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (file_size - length, file_size - 1) } } } Some(web::Header(Range::Unregistered(..))) => { return Err(ErrorBadRequest("Use ranges registered at IANA")); } None => (0, file_size - 1), }; Ok((range_start, range_end)) } #[instrument( name = "get_blob", skip_all, fields(blob_hash = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( service: web::Data, params: web::Path, range_header: Option>, ) -> actix_web::Result { info!("Get blob request"); let blob_hash = params.into_inner(); validate_identifier!(blob_hash); trace!("Initializing download session"); let mut download = service.create_download(blob_hash).await?; let total_size = download.blob_size; let (range_start, range_end): (u64, u64) = parse_range_header(&range_header, total_size)?; download.set_byte_range(range_start..=range_end); let content_length = download.download_size(); let stream = download .into_stream() .map(|data| match data { Ok(bytes) => Ok(web::Bytes::from(bytes)), Err(err) => { warn!("Error during download stream: {:?}", err); Err(handle_blob_service_error(&err)) } }) .in_current_span(); if range_header.is_some() { return Ok( HttpResponse::PartialContent() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .append_header(( "Content-Range", format!("bytes {}-{}/{}", range_start, range_end, total_size), )) .streaming(Box::pin(stream)), ); } Ok( HttpResponse::Ok() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .streaming(Box::pin(stream)), ) } -#[derive(Deserialize, Debug)] -pub struct AssignHolderPayload { - holder: String, - blob_hash: String, -} - -#[derive(Serialize)] -struct AssignHolderResponnse { - data_exists: bool, -} - #[instrument(name = "assign_holder", skip(service))] pub async fn assign_holder_handler( service: web::Data, - payload: web::Json, + payload: web::Json, ) -> actix_web::Result { info!("Assign holder request"); - let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); + let AssignHolderRequest { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); let data_exists = service .find_existing_blobs(HashSet::from([&blob_hash])) .await? .contains(&blob_hash); service.assign_holder(blob_hash, holder).await?; - let response = AssignHolderResponnse { data_exists }; + let response = AssignHolderResponse { data_exists }; Ok(HttpResponse::Ok().json(web::Json(response))) } #[instrument(skip_all, name = "upload_blob", fields(blob_hash))] pub async fn upload_blob_handler( service: web::Data, mut payload: actix_multipart::Multipart, ) -> actix_web::Result { info!("Upload blob request"); let Some((name, blob_hash)) = multipart::get_text_field(&mut payload).await? else { warn!("Malformed request: expected a field."); return Err(ErrorBadRequest("Bad request")); }; if name != "blob_hash" { warn!(name, "Malformed request: 'blob_hash' text field expected."); return Err(ErrorBadRequest("Bad request")); } validate_identifier!(blob_hash); tracing::Span::current().record("blob_hash", &blob_hash); trace!("Receiving blob data"); let stream = try_stream! { while let Some(mut field) = payload.try_next().await? { let field_name = field.name(); if field_name == "base64_data" { trace!("Got base64_data"); let mut buf = Vec::new(); while let Some(chunk) = field.try_next().await? { buf.extend_from_slice(&chunk); } let base64_string = String::from_utf8(buf) .map_err(|err| actix_web::error::ParseError::Utf8(err.utf8_error()))?; let data = base64::engine::general_purpose::STANDARD .decode(&base64_string) .map_err(|err| { debug!("Invalid base64 payload: {err:?}"); ErrorBadRequest("Invalid base64") })?; yield Bytes::from(data); return; } if field_name != "blob_data" { warn!( field_name, "Malformed request: 'blob_data' or 'base64_data' multipart field expected." ); Err(ErrorBadRequest("Bad request"))?; } trace!("Got blob_data. Streaming..."); while let Some(chunk) = field.try_next().await? { yield chunk; } } trace!("Stream done"); }; service.put_blob(blob_hash, stream).await?; Ok(HttpResponse::NoContent().finish()) } -#[derive(Deserialize, Debug)] -pub struct RemoveHolderPayload { - holder: String, - blob_hash: String, - /// If true, the blob will be deleted intantly - /// after the last holder is revoked. - #[serde(default)] - instant_delete: bool, -} - #[instrument(name = "remove_holder", skip(service))] pub async fn remove_holder_handler( service: web::Data, - payload: web::Json, + payload: web::Json, ) -> actix_web::Result { info!("Revoke holder request"); - let RemoveHolderPayload { + let RemoveHolderRequest { holder, blob_hash, instant_delete, } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); service .revoke_holder(blob_hash, holder, instant_delete) .await?; Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs index 6f4070273..7af5fabc4 100644 --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -1,154 +1,113 @@ use actix_web::error::ErrorBadRequest; use actix_web::{web, HttpResponse}; -use serde::{Deserialize, Serialize}; +use comm_lib::blob::types::http::{ + AssignHoldersRequest, AssignHoldersResponse, BlobInfo, + HolderAssignmentResult, RemoveHoldersRequest, RemoveHoldersResponse, +}; use tracing::{info, instrument, trace, warn}; use crate::service::BlobService; -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct BlobHashAndHolder { - blob_hash: String, - holder: String, -} - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct AssignHoldersPayload { - requests: Vec, -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -struct HolderAssignmentResult { - #[serde(flatten)] - request: BlobHashAndHolder, - success: bool, - data_exists: bool, - holder_already_exists: bool, -} -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -struct AssignHoldersResponse { - results: Vec, -} - #[instrument(name = "assign_multiple_holders", skip_all)] pub async fn assign_holders_handler( service: web::Data, - payload: web::Json, + payload: web::Json, ) -> actix_web::Result { use crate::database::DBError; use crate::service::BlobServiceError; - let AssignHoldersPayload { requests } = payload.into_inner(); + let AssignHoldersRequest { requests } = payload.into_inner(); info!("Assign holder request for {} holders", requests.len()); validate_request(&requests)?; let blob_hashes = requests.iter().map(|it| &it.blob_hash).collect(); let existing_blobs = service.find_existing_blobs(blob_hashes).await?; - let mut results = Vec::with_capacity(requests.len()); for item in requests { - let BlobHashAndHolder { blob_hash, holder } = &item; + let BlobInfo { blob_hash, holder } = &item; let data_exists = existing_blobs.contains(blob_hash); let result = match service.assign_holder(blob_hash, holder).await { Ok(()) => HolderAssignmentResult { request: item, success: true, data_exists, holder_already_exists: false, }, Err(BlobServiceError::DB(DBError::ItemAlreadyExists)) => { HolderAssignmentResult { request: item, success: true, data_exists, holder_already_exists: true, } } Err(err) => { warn!("Holder assignment error: {:?}", err); HolderAssignmentResult { request: item, success: false, data_exists, holder_already_exists: false, } } }; results.push(result); } let response = AssignHoldersResponse { results }; Ok(HttpResponse::Ok().json(web::Json(response))) } -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct RemoveHoldersPayload { - requests: Vec, - #[serde(default)] - instant_delete: bool, -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct RemoveHoldersResponse { - failed_requests: Vec, -} - #[instrument(name = "remove_multiple_holders", skip_all)] pub async fn remove_holders_handler( service: web::Data, - payload: web::Json, + payload: web::Json, ) -> actix_web::Result { - let RemoveHoldersPayload { + let RemoveHoldersRequest { requests, instant_delete, } = payload.into_inner(); info!( instant_delete, "Remove request for {} holders.", requests.len() ); validate_request(&requests)?; let mut failed_requests = Vec::new(); // This has to be done sequentially because `service.revoke_holder()` // performs a DDB transaction and these transactions could conflict // with each other, e.g. if two holders were removed for the same blob hash. for item in requests { trace!("Removing item: {:?}", &item); - let BlobHashAndHolder { holder, blob_hash } = &item; + let BlobInfo { holder, blob_hash } = &item; if let Err(err) = service .revoke_holder(blob_hash, holder, instant_delete) .await { warn!("Holder removal failed: {:?}", err); failed_requests.push(item); } } let response = RemoveHoldersResponse { failed_requests }; Ok(HttpResponse::Ok().json(web::Json(response))) } /** * Returns `HTTP 400 Bad Request` if one or more blob hashes or holders * have invalid format. See [`comm_lib::tools::is_valid_identifier`] for * valid format conditions */ -fn validate_request(items: &[BlobHashAndHolder]) -> actix_web::Result<()> { +fn validate_request(items: &[BlobInfo]) -> actix_web::Result<()> { use comm_lib::tools::is_valid_identifier; - let all_valid = - items.iter().all(|BlobHashAndHolder { holder, blob_hash }| { - is_valid_identifier(holder) && is_valid_identifier(blob_hash) - }); + let all_valid = items.iter().all(|BlobInfo { holder, blob_hash }| { + is_valid_identifier(holder) && is_valid_identifier(blob_hash) + }); if !all_valid { return Err(ErrorBadRequest("One or more requests have invalid format")); } Ok(()) } diff --git a/shared/comm-lib/src/blob/client.rs b/shared/comm-lib/src/blob/client.rs index 472dfd2b8..b1d4de8cc 100644 --- a/shared/comm-lib/src/blob/client.rs +++ b/shared/comm-lib/src/blob/client.rs @@ -1,422 +1,416 @@ use bytes::Bytes; use derive_more::{Display, Error, From}; use futures_core::Stream; use futures_util::StreamExt; use reqwest::{ multipart::{Form, Part}, Body, Method, RequestBuilder, }; use tracing::{debug, error, trace, warn}; // publicly re-export some reqwest types pub use reqwest::Error as ReqwestError; pub use reqwest::StatusCode; pub use reqwest::Url; -use crate::auth::{AuthorizationCredential, UserIdentity}; +use crate::{ + auth::{AuthorizationCredential, UserIdentity}, + blob::types::http::{ + AssignHolderRequest, AssignHolderResponse, RemoveHolderRequest, + }, +}; #[derive(From, Error, Debug, Display)] pub enum BlobServiceError { /// HTTP Client errors, this includes: /// - connection failures /// - request errors (e.g. upload input stream) #[display(...)] ClientError(ReqwestError), /// Invalid Blob service URL provided #[display(...)] URLError(#[error(ignore)] String), /// Blob service returned HTTP 404 /// - blob or holder not found #[display(...)] NotFound, /// Blob service returned HTTP 409 /// - blob or holder already exists #[display(...)] AlreadyExists, /// Blob service returned HTTP 400 /// - invalid holder or blob_hash format #[display(...)] InvalidArguments, /// Blob service returned HTTP 50x #[display(...)] ServerError, #[display(...)] UnexpectedHttpStatus(#[error(ignore)] reqwest::StatusCode), #[display(...)] UnexpectedError, } /// A client interface to Blob service. // /// The `BlobServiceClient` holds a connection pool internally, so it is advised that /// you create one and **reuse** it, by **cloning**. /// /// A clone is recommended for each individual client identity, that has /// a different `UserIdentity`. /// /// # Example /// ```ignore /// // create client for user A /// let clientA = BlobServiceClient::new(blob_endpoint).with_user_identity(userA); /// /// // reuse the client connection with different credentials /// let clientB = clientA.with_user_identity(userB); /// /// // clientA still uses userA credentials /// ``` /// /// A clone is recommended when the client concurrently handles multiple identities - /// e.g. a HTTP service handling requests from different users. The `with_user_identity()` /// method should be used for this purpose. /// /// You should **not** wrap the `BlobServiceClient` in an `Rc` or `Arc` to **reuse** it, /// because it already uses an `Arc` internally. #[derive(Clone)] pub struct BlobServiceClient { http_client: reqwest::Client, blob_service_url: reqwest::Url, auth_credential: Option, } impl BlobServiceClient { /// Creates a new Blob Service client instance. It is unauthenticated by default /// so you need to call `with_user_identity()` afterwards: /// ```ignore /// let client = BlobServiceClient::new(blob_endpoint).with_user_identity(user); /// ``` /// /// **Note**: It is advised to create this client once and reuse by **cloning**. /// See [`BlobServiceClient`] docs for details pub fn new(blob_service_url: reqwest::Url) -> Self { debug!("Creating BlobServiceClient. URL: {}", blob_service_url); Self { http_client: reqwest::Client::new(), blob_service_url, auth_credential: None, } } /// Clones the client and sets the [`UserIdentity`] for the new instance. /// This allows the client to reuse the same connection pool for different users. /// /// This is the same as calling /// ```ignore /// client.with_authentication(AuthorizationCredential::UserToken(user_identity)) /// ```` pub fn with_user_identity(&self, user_identity: UserIdentity) -> Self { self.with_authentication(AuthorizationCredential::UserToken(user_identity)) } /// Clones the client and sets the [`AuthorizationCredential`] for the new instance. /// This allows the client to reuse the same connection pool for different users. pub fn with_authentication( &self, auth_credential: AuthorizationCredential, ) -> Self { trace!("Set auth_credential: {:?}", &auth_credential); let mut this = self.clone(); this.auth_credential = Some(auth_credential); this } /// Downloads blob with given [`blob_hash`]. /// /// @returns a stream of blob bytes /// /// # Errors thrown /// - [BlobServiceError::NotFound] if blob with given hash does not exist /// - [BlobServiceError::InvalidArguments] if blob hash has incorrect format /// /// # Example /// ```ignore /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let mut stream = client.get("hello").await?; /// while let Some(data) = stream.try_next().await? { /// println!("Got data: {:?}", data); /// } /// ``` pub async fn get( &self, blob_hash: &str, ) -> BlobResult>> { debug!(?blob_hash, "Get blob request"); let url = self.get_blob_url(Some(blob_hash))?; let response = self .request(Method::GET, url)? .send() .await .map_err(BlobServiceError::ClientError)?; debug!("Response status: {}", response.status()); if response.status().is_success() { let stream = response.bytes_stream().map(|result| match result { Ok(bytes) => Ok(bytes), Err(error) => { warn!("Error while streaming response: {}", error); Err(BlobServiceError::ClientError(error)) } }); return Ok(stream); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Assigns a new holder to a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::AlreadyExists` if blob already has /// a holder with given [`holder`] name. pub async fn assign_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult { debug!("Assign holder request"); let url = self.get_blob_url(None)?; let payload = AssignHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), }; debug!("Request payload: {:?}", payload); let response = self .request(Method::POST, url)? .json(&payload) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { let AssignHolderResponse { data_exists } = response.json().await?; trace!("Data exists: {}", data_exists); return Ok(data_exists); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Revokes given holder from a blob represented by [`blob_hash`]. /// Returns `BlobServiceError::NotFound` if blob with given hash does not exist /// or it does not have such holder pub async fn revoke_holder( &self, blob_hash: &str, holder: &str, ) -> BlobResult<()> { debug!("Revoke holder request"); let url = self.get_blob_url(None)?; - let payload = RevokeHolderRequest { + let payload = RemoveHolderRequest { holder: holder.to_string(), blob_hash: blob_hash.to_string(), + instant_delete: false, }; debug!("Request payload: {:?}", payload); let response = self .request(Method::DELETE, url)? .json(&payload) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Revoke holder request successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// Uploads a blob. Returns `BlobServiceError::AlreadyExists` if blob with given hash /// already exists. /// /// # Example /// ```ignore /// use std::io::{Error, ErrorKind}; /// /// let client = /// BlobServiceClient::new("http://localhost:50053".parse()?); /// /// let stream = async_stream::stream! { /// yield Ok(vec![1, 2, 3]); /// yield Ok(vec![4, 5, 6]); /// yield Err(Error::new(ErrorKind::Other, "Oops")); /// }; /// client.upload_blob(&blob_hash, stream).await?; /// ``` pub async fn upload_blob( &self, blob_hash: H, data_stream: S, ) -> BlobResult<()> where H: Into, S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { debug!("Upload blob request"); let url = self.get_blob_url(None)?; let streaming_body = Body::wrap_stream(data_stream); let form = Form::new() .text("blob_hash", blob_hash.into()) .part("blob_data", Part::stream(streaming_body)); let response = self .request(Method::PUT, url)? .multipart(form) .send() .await?; debug!("Response status: {}", response.status()); if response.status().is_success() { trace!("Blob upload successful"); return Ok(()); } let error = handle_http_error(response.status()); if let Ok(message) = response.text().await { trace!("Error response message: {}", message); } Err(error) } /// A wrapper around [`BlobServiceClient::assign_holder`] and [`BlobServiceClient::upload_blob`]. /// /// Assigns a new holder to a blob represented by [`blob_hash`]. If the blob does not exist, /// uploads the data from [`data_stream`]. pub async fn simple_put( &self, blob_hash: &str, holder: &str, data_stream: S, ) -> BlobResult where S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { trace!("Begin simple put. Assigning holder..."); let data_exists = self.assign_holder(blob_hash, holder).await?; if data_exists { trace!("Blob data already exists. Skipping upload."); return Ok(false); } trace!("Uploading blob data..."); let Err(upload_error) = self.upload_blob(blob_hash, data_stream).await else { return Ok(true); }; trace!(%blob_hash, %holder, "Revoking holder due to upload failure"); self.schedule_revoke_holder(blob_hash, holder); Err(upload_error) } /// Revokes holder in a separate task. Useful to clean up after /// upload failure without blocking the current task. pub fn schedule_revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) { let this = self.clone(); let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); tokio::spawn(async move { if let Err(err) = this.revoke_holder(&blob_hash, &holder).await { warn!("Failed to revoke holder: {0:?} - {0}", err); } }); } } // private helper methods impl BlobServiceClient { fn get_blob_url( &self, blob_hash: Option<&str>, ) -> Result { let path = match blob_hash { Some(hash) => format!("/blob/{}", hash), None => "/blob".to_string(), }; let url = self .blob_service_url .join(&path) .map_err(|err| BlobServiceError::URLError(err.to_string()))?; trace!("Constructed request URL: {}", url); Ok(url) } fn request( &self, http_method: Method, url: Url, ) -> BlobResult { let request = self.http_client.request(http_method, url); match &self.auth_credential { Some(credential) => { let token = credential.as_authorization_token().map_err(|e| { error!("Failed to parse authorization token: {}", e); BlobServiceError::UnexpectedError })?; Ok(request.bearer_auth(token)) } None => Ok(request), } } } fn handle_http_error(status_code: StatusCode) -> BlobServiceError { match status_code { StatusCode::BAD_REQUEST => BlobServiceError::InvalidArguments, StatusCode::NOT_FOUND => BlobServiceError::NotFound, StatusCode::CONFLICT => BlobServiceError::AlreadyExists, code if code.is_server_error() => BlobServiceError::ServerError, code => BlobServiceError::UnexpectedHttpStatus(code), } } type BlobResult = Result; -#[derive(serde::Deserialize)] -struct AssignHolderResponse { - data_exists: bool, -} -#[derive(Debug, serde::Serialize)] -struct AssignHolderRequest { - blob_hash: String, - holder: String, -} -// they have the same layout so we can simply alias -type RevokeHolderRequest = AssignHolderRequest; - #[cfg(feature = "http")] impl crate::http::auth_service::HttpAuthenticatedService for BlobServiceClient { fn make_authenticated( self, auth_credential: AuthorizationCredential, ) -> Self { self.with_authentication(auth_credential) } fn accepts_services_token(&self, _req: &actix_web::HttpRequest) -> bool { true } } diff --git a/shared/comm-lib/src/blob/types.rs b/shared/comm-lib/src/blob/types.rs index 3c4425353..0480543f3 100644 --- a/shared/comm-lib/src/blob/types.rs +++ b/shared/comm-lib/src/blob/types.rs @@ -1,69 +1,134 @@ use derive_more::Constructor; use hex::ToHex; use sha2::{Digest, Sha256}; +pub mod http { + use serde::{Deserialize, Serialize}; + + pub use super::BlobInfo; + + // Assign multiple holders + #[derive(Serialize, Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct AssignHoldersRequest { + pub requests: Vec, + } + + #[derive(Serialize, Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct HolderAssignmentResult { + #[serde(flatten)] + pub request: BlobInfo, + pub success: bool, + pub data_exists: bool, + pub holder_already_exists: bool, + } + #[derive(Serialize, Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct AssignHoldersResponse { + pub results: Vec, + } + + // Remove multiple holders + #[derive(Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct RemoveHoldersRequest { + pub requests: Vec, + #[serde(default)] + pub instant_delete: bool, + } + #[derive(Serialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct RemoveHoldersResponse { + pub failed_requests: Vec, + } + + // Single holder endpoint types + + #[derive(Serialize, Deserialize, Debug)] + pub struct AssignHolderRequest { + pub blob_hash: String, + pub holder: String, + } + #[derive(Serialize, Deserialize, Debug)] + pub struct AssignHolderResponse { + pub data_exists: bool, + } + + #[derive(Serialize, Deserialize, Debug)] + pub struct RemoveHolderRequest { + pub blob_hash: String, + pub holder: String, + /// If true, the blob will be deleted intantly + /// after the last holder is revoked. + #[serde(default)] + pub instant_delete: bool, + } +} + /// Blob owning information - stores both blob_hash and holder -#[derive(Clone, Debug, Constructor)] +#[derive(Clone, Debug, Constructor, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] pub struct BlobInfo { pub blob_hash: String, pub holder: String, } impl BlobInfo { pub fn from_bytes(data: &[u8]) -> Self { Self { blob_hash: Sha256::digest(data).encode_hex(), holder: uuid::Uuid::new_v4().to_string(), } } } #[cfg(feature = "aws")] mod db_conversions { use super::*; use crate::database::{AttributeTryInto, DBItemError, TryFromAttribute}; use aws_sdk_dynamodb::types::AttributeValue; use std::collections::HashMap; const BLOB_HASH_DDB_MAP_KEY: &str = "blob_hash"; const HOLDER_DDB_MAP_KEY: &str = "holder"; impl From for AttributeValue { fn from(value: BlobInfo) -> Self { let map = HashMap::from([ ( BLOB_HASH_DDB_MAP_KEY.to_string(), AttributeValue::S(value.blob_hash), ), ( HOLDER_DDB_MAP_KEY.to_string(), AttributeValue::S(value.holder), ), ]); AttributeValue::M(map) } } impl From<&BlobInfo> for AttributeValue { fn from(value: &BlobInfo) -> Self { AttributeValue::from(value.to_owned()) } } impl TryFromAttribute for BlobInfo { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name: String = attribute_name.into(); let mut inner_map: HashMap = attribute.attr_try_into(&attr_name)?; let blob_hash = inner_map .remove("blob_hash") .attr_try_into(format!("{attr_name}.blob_hash"))?; let holder = inner_map .remove("holder") .attr_try_into(format!("{attr_name}.holder"))?; Ok(BlobInfo { blob_hash, holder }) } } }