diff --git a/services/blob/src/database/types.rs b/services/blob/src/database/types.rs --- a/services/blob/src/database/types.rs +++ b/services/blob/src/database/types.rs @@ -150,7 +150,7 @@ /// /// It implements `TryFrom` and `Into` traits to conveniently use it /// in DynamoDB queries -#[derive(Clone, Constructor, Debug)] +#[derive(Clone, Constructor, Debug, Hash, Eq, PartialEq)] pub struct PrimaryKey { pub blob_hash: String, pub holder: String, @@ -166,6 +166,10 @@ holder: BLOB_ITEM_ROW_HOLDER_VALUE.to_string(), } } + + pub fn is_blob_item(&self) -> bool { + self.holder == BLOB_ITEM_ROW_HOLDER_VALUE + } } impl TryFrom for PrimaryKey { diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -303,6 +303,85 @@ fn into_primary_keys(self) -> Vec; } +impl CleanupOperations for UncheckedCollection { + /// Retains only items that should remain unchecked + /// (missing blob hash or holders). + /// + /// Returns removed items - these items are checked + /// (contain both blob hash and at least one holder). + fn filter_out_checked(&mut self) -> Vec { + let mut checked = Vec::new(); + + self.retain(|blob_hash, item| { + if !item.has_blob_hash() || !item.has_holders() { + // blob hash or holder missing, leave unchecked + return true; + } + + checked.extend(item.as_primary_keys(blob_hash)); + false + }); + checked + } + + /// Returns list of blob hashes for which we need to query if they contain + /// at least one holder + fn blobs_to_find_holders(&self) -> Vec { + self + .iter() + .filter_map(|(blob_hash, item)| { + if item.has_blob_hash() && !item.has_holders() { + Some(blob_hash.clone()) + } else { + None + } + }) + .collect() + } + + /// Returns primary keys for blob items that need to be checked if they exist + /// + /// Technically, this returns all blob items that have holders but no hash. + fn blobs_to_check_existence(&self) -> Vec { + self + .iter() + .filter_map(|(blob_hash, item)| { + if item.has_holders() && !item.has_blob_hash() { + Some(PrimaryKey::for_blob_item(blob_hash)) + } else { + None + } + }) + .collect() + } + + /// Updates the structure after fetching additional data from database. + fn feed_with_query_results( + &mut self, + fetched_items: impl IntoIterator, + ) { + for pk in fetched_items.into_iter() { + let Some(item) = self.get_mut(&pk.blob_hash) else { + warn!("Got fetched item that was not requested: {:?}", pk); + continue; + }; + + if pk.is_blob_item() { + item.blob_hash = Some(pk.blob_hash) + } else { + item.holders.push(pk.holder); + } + } + } + + fn into_primary_keys(self) -> Vec { + self + .into_iter() + .flat_map(|(blob_hash, item)| item.as_primary_keys(&blob_hash)) + .collect() + } +} + pub struct BlobDownloadObject { /// Size of the whole blob object in bytes. pub blob_size: u64,