diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -7,8 +7,12 @@ Error as DynamoDBError, }; use chrono::Utc; -use comm_lib::database::{ - self, batch_operations::ExponentialBackoffConfig, TryFromAttribute, +use comm_lib::{ + blob::types::BlobInfo, + database::{ + self, batch_operations::ExponentialBackoffConfig, AttributeExtractor, + TryFromAttribute, + }, }; use std::collections::HashMap; use tracing::{debug, error, trace}; @@ -255,6 +259,38 @@ .collect::, _>>() } + pub async fn query_indexed_holders( + &self, + tag: String, + ) -> DBResult> { + self + .ddb + .query() + .table_name(BLOB_TABLE_NAME) + .index_name(HOLDER_TAG_INDEX_NAME) + .key_condition_expression("#indexed_tag = :tag") + .expression_attribute_names("#indexed_tag", HOLDER_TAG_INDEX_KEY_ATTR) + .expression_attribute_values(":tag", AttributeValue::S(tag)) + .send() + .await + .map_err(|err| { + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to query indexed holders: {:?}", err + ); + DBError::AwsSdk(Box::new(err.into())) + })? + .items + .unwrap_or_default() + .into_iter() + .map(|mut item| { + let blob_hash = item.take_attr(ATTR_BLOB_HASH)?; + let holder = item.take_attr(ATTR_HOLDER)?; + Ok(BlobInfo { blob_hash, holder }) + }) + .collect() + } + /// Returns a list of primary keys for "unchecked" items (blob / holder) /// that were last modified at least `min_age` ago. /// We need to specify if we want to get blob or holder items. diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,4 +1,5 @@ #![allow(unused)] +use comm_lib::blob::types::BlobInfo; use regex::RegexSet; use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; @@ -298,6 +299,14 @@ Ok(existing_items) } + pub async fn query_indexed_holders( + &self, + tag: String, + ) -> BlobServiceResult> { + let results = self.db.query_indexed_holders(tag).await?; + Ok(results) + } + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked"