diff --git a/services/blob/src/http/handlers/holders.rs b/services/blob/src/http/handlers/holders.rs --- a/services/blob/src/http/handlers/holders.rs +++ b/services/blob/src/http/handlers/holders.rs @@ -45,8 +45,10 @@ info!("Assign holder request for {} holders", requests.len()); validate_request(&requests)?; - let mut results = Vec::with_capacity(requests.len()); + let blob_hashes = requests.iter().map(|it| &it.blob_hash).collect(); + let existing_blobs = service.find_existing_blobs(blob_hashes).await?; + let mut results = Vec::with_capacity(requests.len()); for item in requests { let BlobHashAndHolder { blob_hash, holder } = &item; let result = match service.assign_holder(blob_hash, holder).await { @@ -57,8 +59,7 @@ holder_already_exists: false, }, Err(BlobServiceError::DB(DBError::ItemAlreadyExists)) => { - let data_exists = - service.blob_hash_exists(blob_hash).await.unwrap_or(false); + let data_exists = existing_blobs.contains(blob_hash); HolderAssignmentResult { request: item, success: true, @@ -68,8 +69,7 @@ } Err(err) => { warn!("Holder assignment error: {:?}", err); - let data_exists = - service.blob_hash_exists(blob_hash).await.unwrap_or(false); + let data_exists = existing_blobs.contains(blob_hash); HolderAssignmentResult { request: item, success: false, diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,6 +1,6 @@ #![allow(unused)] use regex::RegexSet; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; @@ -282,17 +282,21 @@ Ok(()) } - pub async fn blob_hash_exists( + pub async fn find_existing_blobs( &self, - blob_hash: impl Into, - ) -> BlobServiceResult { - match self.db.get_blob_item(blob_hash).await { - Ok(item) => Ok(item.is_some()), - Err(err) => { - warn!("Failed to check if blob exists: {err:?}"); - Err(err.into()) - } - } + blob_hashes: HashSet<&String>, + ) -> BlobServiceResult> { + let primary_keys = blob_hashes.into_iter().map(PrimaryKey::for_blob_item); + + let existing_items: HashSet = self + .db + .list_existing_keys(primary_keys) + .await? + .into_iter() + .map(|pk| pk.blob_hash) + .collect(); + + Ok(existing_items) } pub async fn perform_cleanup(&self) -> anyhow::Result<()> {