diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -8,13 +8,14 @@ }; use chrono::Utc; use comm_lib::database::{ - self, batch_operations::ExponentialBackoffConfig, TryFromAttribute, + self, batch_operations::ExponentialBackoffConfig, AttributeExtractor, + TryFromAttribute, }; use std::collections::HashMap; use tracing::{debug, error, trace}; -use crate::constants::db::*; use crate::constants::error_types; +use crate::{constants::db::*, types::BlobHashAndHolder}; use super::errors::{BlobDBError, Error as DBError}; use super::types::*; @@ -255,6 +256,38 @@ .collect::, _>>() } + pub async fn query_indexed_holders( + &self, + prefix: String, + ) -> DBResult> { + self + .ddb + .query() + .table_name(BLOB_TABLE_NAME) + .index_name(HOLDER_TAG_INDEX_NAME) + .key_condition_expression("#indexed_tag = :prefix") + .expression_attribute_names("#indexed_tag", HOLDER_TAG_INDEX_KEY_ATTR) + .expression_attribute_values(":prefix", AttributeValue::S(prefix)) + .send() + .await + .map_err(|err| { + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to query indexed holders: {:?}", err + ); + DBError::AwsSdk(Box::new(err.into())) + })? + .items + .unwrap_or_default() + .into_iter() + .map(|mut item| { + let blob_hash = item.take_attr(ATTR_BLOB_HASH)?; + let holder = item.take_attr(ATTR_HOLDER)?; + Ok(BlobHashAndHolder { blob_hash, holder }) + }) + .collect() + } + /// Returns a list of primary keys for "unchecked" items (blob / holder) /// that were last modified at least `min_age` ago. /// We need to specify if we want to get blob or holder items. diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -24,6 +24,7 @@ use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; use crate::tools::MemOps; +use crate::types::BlobHashAndHolder; use crate::{ constants::error_types, constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient, @@ -298,6 +299,14 @@ Ok(existing_items) } + pub async fn query_indexed_holders( + &self, + prefix: String, + ) -> BlobServiceResult> { + let results = self.db.query_indexed_holders(prefix).await?; + Ok(results) + } + pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked"