Page MenuHomePhabricator

D9349.diff
No OneTemporary

D9349.diff

diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
--- a/services/blob/src/database/client.rs
+++ b/services/blob/src/database/client.rs
@@ -1,10 +1,15 @@
use aws_sdk_dynamodb::{
operation::put_item::PutItemOutput,
- types::{AttributeValue, Delete, TransactWriteItem, Update},
+ types::{
+ AttributeValue, Delete, DeleteRequest, PutRequest, TransactWriteItem,
+ Update, WriteRequest,
+ },
Error as DynamoDBError,
};
use chrono::Utc;
-use comm_services_lib::database::TryFromAttribute;
+use comm_services_lib::database::{
+ self, batch_operations::ExponentialBackoffConfig, TryFromAttribute,
+};
use std::collections::HashMap;
use tracing::{debug, error, trace};
@@ -190,7 +195,9 @@
AttributeValue::S(blob_hash.into()),
)
.consistent_read(true)
- .set_limit(limit)
+ // we need to increase limit by 1 because the blob item itself can be fetched too
+ // it is filtered-out later
+ .set_limit(limit.map(|it| it + 1))
.send()
.await
.map_err(|err| {
@@ -213,6 +220,123 @@
.collect::<Result<Vec<_>, _>>()
.map_err(DBError::Attribute)
}
+
+ /// Returns a list of primary keys for rows that already exist in the table
+ pub async fn list_existing_keys(
+ &self,
+ keys: impl IntoIterator<Item = PrimaryKey>,
+ ) -> DBResult<Vec<PrimaryKey>> {
+ database::batch_operations::batch_get(
+ &self.ddb,
+ BLOB_TABLE_NAME,
+ keys,
+ Some(format!("{}, {}", ATTR_BLOB_HASH, ATTR_HOLDER)),
+ ExponentialBackoffConfig::default(),
+ )
+ .await?
+ .into_iter()
+ .map(PrimaryKey::try_from)
+ .collect::<Result<Vec<_>, _>>()
+ }
+
+ /// Returns a list of primary keys for "unchecked" items (blob / holder)
+ /// that were last modified at least `min_age` ago.
+ /// We need to specify if we want to get blob or holder items.
+ pub async fn find_unchecked_items(
+ &self,
+ kind: UncheckedKind,
+ min_age: chrono::Duration,
+ ) -> DBResult<Vec<PrimaryKey>> {
+ let created_until = Utc::now() - min_age;
+ let timestamp = created_until.timestamp_millis();
+
+ let response = self
+ .ddb
+ .query()
+ .table_name(BLOB_TABLE_NAME)
+ .index_name(UNCHECKED_INDEX_NAME)
+ .key_condition_expression(
+ "#unchecked = :kind AND #last_modified < :timestamp",
+ )
+ .expression_attribute_names("#unchecked", ATTR_UNCHECKED)
+ .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED)
+ .expression_attribute_values(":kind", kind.into())
+ .expression_attribute_values(
+ ":timestamp",
+ AttributeValue::N(timestamp.to_string()),
+ )
+ .send()
+ .await
+ .map_err(|err| {
+ error!("DynamoDB client failed to query unchecked items: {:?}", err);
+ DBError::AwsSdk(err.into())
+ })?;
+
+ let Some(items) = response.items else { return Ok(vec![]); };
+ items
+ .into_iter()
+ .map(PrimaryKey::try_from)
+ .collect::<Result<Vec<_>, _>>()
+ }
+
+ /// For all rows in specified set of primary keys, removes
+ /// the "unchecked" attribute using PutItem operation in batch.
+ pub async fn batch_mark_checked(
+ &self,
+ keys: impl IntoIterator<Item = PrimaryKey>,
+ ) -> DBResult<()> {
+ let items_to_mark = database::batch_operations::batch_get(
+ &self.ddb,
+ BLOB_TABLE_NAME,
+ keys,
+ None,
+ ExponentialBackoffConfig::default(),
+ )
+ .await?;
+
+ let write_requests = items_to_mark
+ .into_iter()
+ .filter_map(|mut row| {
+ // filter out rows that are already checked
+ // to save some write capacity
+ row.remove(ATTR_UNCHECKED)?;
+ let put_request = PutRequest::builder().set_item(Some(row)).build();
+ let request = WriteRequest::builder().put_request(put_request).build();
+ Some(request)
+ })
+ .collect();
+
+ database::batch_operations::batch_write(
+ &self.ddb,
+ BLOB_TABLE_NAME,
+ write_requests,
+ ExponentialBackoffConfig::default(),
+ )
+ .await?;
+ Ok(())
+ }
+
+ /// Performs multiple DeleteItem operations in batch
+ pub async fn batch_delete_rows(
+ &self,
+ keys: impl IntoIterator<Item = PrimaryKey>,
+ ) -> DBResult<()> {
+ let write_requests = keys
+ .into_iter()
+ .map(|key| DeleteRequest::builder().set_key(Some(key.into())).build())
+ .map(|request| WriteRequest::builder().delete_request(request).build())
+ .collect::<Vec<_>>();
+
+ database::batch_operations::batch_write(
+ &self.ddb,
+ BLOB_TABLE_NAME,
+ write_requests,
+ ExponentialBackoffConfig::default(),
+ )
+ .await?;
+
+ Ok(())
+ }
}
// private helpers
diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs
--- a/services/blob/src/database/errors.rs
+++ b/services/blob/src/database/errors.rs
@@ -18,6 +18,18 @@
Blob(BlobDBError),
#[display(...)]
ItemAlreadyExists,
+ MaxRetriesExceeded,
+}
+
+impl From<comm_services_lib::database::Error> for Error {
+ fn from(value: comm_services_lib::database::Error) -> Self {
+ use comm_services_lib::database::Error as E;
+ match value {
+ E::AwsSdk(err) => Self::AwsSdk(err),
+ E::Attribute(err) => Self::Attribute(err),
+ E::MaxRetriesExceeded => Self::MaxRetriesExceeded,
+ }
+ }
}
#[derive(Debug)]

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 12:26 PM (19 h, 51 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2584701
Default Alt Text
D9349.diff (5 KB)

Event Timeline