diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -12,7 +12,7 @@ blob::types::BlobInfo, database::{ self, batch_operations::ExponentialBackoffConfig, is_transaction_conflict, - AttributeExtractor, TryFromAttribute, + parse_int_attribute, AttributeExtractor, TryFromAttribute, }, }; use std::collections::HashMap; @@ -261,6 +261,39 @@ .map_err(DBError::Attribute) } + /// Gets blob sizes for given blob hashes. PrimaryKeys should be blob PKs + /// (holder PKs will be ignored). Non-existing blobs, or blobs with missing + /// size attribute will not be returned. + pub async fn get_blob_sizes( + &self, + keys: impl IntoIterator, + ) -> DBResult> { + let primary_keys: Vec<_> = keys.into_iter().collect(); + let projection_expression = format!("{ATTR_BLOB_HASH}, {ATTR_BLOB_SIZE}"); + + let returned_items = comm_lib::database::batch_operations::batch_get( + &self.ddb, + BLOB_TABLE_NAME, + primary_keys, + Some(projection_expression), + Default::default(), + ) + .await?; + + let mut blob_sizes = HashMap::with_capacity(returned_items.len()); + for mut attrs in returned_items { + let blob_hash: String = attrs.take_attr(ATTR_BLOB_HASH)?; + let size_attr = attrs.remove(ATTR_BLOB_SIZE); + + if size_attr.is_some() { + let blob_size: u64 = parse_int_attribute(ATTR_BLOB_SIZE, size_attr)?; + blob_sizes.insert(blob_hash, blob_size); + } + } + + Ok(blob_sizes) + } + /// Returns a list of primary keys for rows that already exist in the table pub async fn list_existing_keys( &self, diff --git a/services/blob/src/http/handlers/metadata.rs b/services/blob/src/http/handlers/metadata.rs --- a/services/blob/src/http/handlers/metadata.rs +++ b/services/blob/src/http/handlers/metadata.rs @@ -1,7 +1,8 @@ use crate::{http::utils::verify_caller_is_service, service::BlobService}; use actix_web::{web, HttpResponse}; use comm_lib::{ - auth::AuthorizationCredential, blob::types::http::BlobSizesRequest, + auth::AuthorizationCredential, + blob::types::http::{BlobSizesRequest, BlobSizesResponse}, }; use tracing::instrument; @@ -13,5 +14,9 @@ ) -> actix_web::Result { verify_caller_is_service(&requesting_identity)?; - Ok(HttpResponse::NotImplemented().body("Not implemented yet")) + let request = payload.into_inner(); + let blob_hashes = request.blob_hashes.into_iter().collect(); + let blob_sizes = service.query_blob_sizes(blob_hashes).await?; + let response = BlobSizesResponse { blob_sizes }; + Ok(HttpResponse::Ok().json(response)) } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -263,6 +263,29 @@ Ok(()) } + pub async fn query_blob_sizes( + &self, + blob_hashes: HashSet, + ) -> BlobServiceResult> { + let primary_keys = blob_hashes.iter().map(PrimaryKey::for_blob_item); + let ddb_results = self.db.get_blob_sizes(primary_keys).await?; + + let mut results = HashMap::with_capacity(blob_hashes.len()); + for blob_hash in blob_hashes { + let blob_size = match ddb_results.get(&blob_hash) { + Some(ddb_size) => *ddb_size, + None => { + let s3_path = BlobItemInput::new(&blob_hash).s3_path; + let s3_size = self.s3.get_object_size(&s3_path).await?; + s3_size + } + }; + results.insert(blob_hash, blob_size); + } + + Ok(results) + } + pub async fn find_existing_blobs( &self, blob_hashes: HashSet<&String>,