Page MenuHomePhabricator

D8452.id28508.diff
No OneTemporary

D8452.id28508.diff

diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
--- a/services/blob/src/database/client.rs
+++ b/services/blob/src/database/client.rs
@@ -212,6 +212,103 @@
.collect::<Result<Vec<_>, _>>()
.map_err(|err| DBError::Attribute(err))
}
+
+ /// Returns a list of primary keys for "unchecked" items (blob / holder)
+ /// that were last modified at least `min_age` ago.
+ /// We need to specify if we want to get blob or holder items.
+ pub async fn find_unchecked_items(
+ &self,
+ kind: UncheckedKind,
+ min_age: chrono::Duration,
+ ) -> DBResult<Vec<PrimaryKey>> {
+ let created_until = Utc::now() - min_age;
+ let timestamp = created_until.timestamp_millis();
+
+ let response = self
+ .ddb
+ .query()
+ .table_name(BLOB_TABLE_NAME)
+ .index_name(UNCHECKED_INDEX_NAME)
+ .key_condition_expression(
+ "#unchecked = :kind AND #last_modified < :timestamp",
+ )
+ .expression_attribute_names("#unchecked", ATTR_UNCHECKED)
+ .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED)
+ .expression_attribute_values(":kind", kind.into())
+ .expression_attribute_values(
+ ":timestamp",
+ AttributeValue::N(timestamp.to_string()),
+ )
+ .send()
+ .await
+ .map_err(|err| {
+ error!("DynamoDB client failed to query unchecked items: {:?}", err);
+ DBError::AwsSdk(err.into())
+ })?;
+
+ let Some(items) = response.items else { return Ok(vec![]); };
+ items
+ .into_iter()
+ .map(PrimaryKey::try_from)
+ .collect::<Result<Vec<_>, _>>()
+ }
+
+ /// For all rows in specified set of primary keys, removes
+ /// the "unchecked" attribute using PutItem operation in batch.
+ pub async fn batch_mark_checked(
+ &self,
+ keys: Vec<PrimaryKey>,
+ ) -> DBResult<()> {
+ let write_requests = self
+ .get_raw_items(keys)
+ .await?
+ .into_iter()
+ .filter_map(|mut row| {
+ // filter out rows that are already checked
+ // to save some write capacity
+ if row.remove(ATTR_UNCHECKED).is_none() {
+ return None;
+ }
+ let put_request = PutRequest::builder().set_item(Some(row)).build();
+ let request = WriteRequest::builder().put_request(put_request).build();
+ Some(request)
+ })
+ .collect();
+
+ self
+ .ddb
+ .batch_write_item()
+ .request_items(BLOB_TABLE_NAME, write_requests)
+ .send()
+ .await
+ .map_err(|err| {
+ error!("DynamoDB client failed to batch update rows");
+ DBError::AwsSdk(err.into())
+ })?;
+ Ok(())
+ }
+
+ /// Performs multiple DeleteItem operations in batch
+ pub async fn batch_delete_rows(&self, keys: Vec<PrimaryKey>) -> DBResult<()> {
+ let write_requests = keys
+ .into_iter()
+ .map(|key| DeleteRequest::builder().set_key(Some(key.into())).build())
+ .map(|request| WriteRequest::builder().delete_request(request).build())
+ .collect::<Vec<_>>();
+
+ self
+ .ddb
+ .batch_write_item()
+ .request_items(BLOB_TABLE_NAME, write_requests)
+ .send()
+ .await
+ .map_err(|err| {
+ error!("DynamoDB client failed to batch delete rows: {:?}", err);
+ DBError::AwsSdk(err.into())
+ })?;
+
+ Ok(())
+ }
}
// private helpers

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 12:50 PM (14 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2584761
Default Alt Text
D8452.id28508.diff (3 KB)

Event Timeline