Page MenuHomePhorge

D13618.1768322160.diff
No OneTemporary

Size
2 KB
Referenced Files
None
Subscribers
None

D13618.1768322160.diff

diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
--- a/services/blob/src/database/client.rs
+++ b/services/blob/src/database/client.rs
@@ -8,13 +8,14 @@
};
use chrono::Utc;
use comm_lib::database::{
- self, batch_operations::ExponentialBackoffConfig, TryFromAttribute,
+ self, batch_operations::ExponentialBackoffConfig, AttributeExtractor,
+ TryFromAttribute,
};
use std::collections::HashMap;
use tracing::{debug, error, trace};
-use crate::constants::db::*;
use crate::constants::error_types;
+use crate::{constants::db::*, types::BlobHashAndHolder};
use super::errors::{BlobDBError, Error as DBError};
use super::types::*;
@@ -255,6 +256,38 @@
.collect::<Result<Vec<_>, _>>()
}
+ pub async fn query_indexed_holders(
+ &self,
+ prefix: String,
+ ) -> DBResult<Vec<BlobHashAndHolder>> {
+ self
+ .ddb
+ .query()
+ .table_name(BLOB_TABLE_NAME)
+ .index_name(HOLDER_TAG_INDEX_NAME)
+ .key_condition_expression("#indexed_tag = :prefix")
+ .expression_attribute_names("#indexed_tag", HOLDER_TAG_INDEX_KEY_ATTR)
+ .expression_attribute_values(":prefix", AttributeValue::S(prefix))
+ .send()
+ .await
+ .map_err(|err| {
+ error!(
+ errorType = error_types::DDB_ERROR,
+ "DynamoDB client failed to query indexed holders: {:?}", err
+ );
+ DBError::AwsSdk(Box::new(err.into()))
+ })?
+ .items
+ .unwrap_or_default()
+ .into_iter()
+ .map(|mut item| {
+ let blob_hash = item.take_attr(ATTR_BLOB_HASH)?;
+ let holder = item.take_attr(ATTR_HOLDER)?;
+ Ok(BlobHashAndHolder { blob_hash, holder })
+ })
+ .collect()
+ }
+
/// Returns a list of primary keys for "unchecked" items (blob / holder)
/// that were last modified at least `min_age` ago.
/// We need to specify if we want to get blob or holder items.
diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs
--- a/services/blob/src/service.rs
+++ b/services/blob/src/service.rs
@@ -24,6 +24,7 @@
use crate::database::DBError;
use crate::s3::{Error as S3Error, S3Client, S3Path};
use crate::tools::MemOps;
+use crate::types::BlobHashAndHolder;
use crate::{
constants::error_types, constants::BLOB_DOWNLOAD_CHUNK_SIZE,
database::DatabaseClient,
@@ -298,6 +299,14 @@
Ok(existing_items)
}
+ pub async fn query_indexed_holders(
+ &self,
+ prefix: String,
+ ) -> BlobServiceResult<Vec<BlobHashAndHolder>> {
+ let results = self.db.query_indexed_holders(prefix).await?;
+ Ok(results)
+ }
+
pub async fn perform_cleanup(&self) -> anyhow::Result<()> {
info!("Starting cleanup...");
// 1. Fetch blobs and holders marked as "unchecked"

File Metadata

Mime Type
text/plain
Expires
Tue, Jan 13, 4:36 PM (5 h, 47 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5928439
Default Alt Text
D13618.1768322160.diff (2 KB)

Event Timeline