diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -1,10 +1,15 @@ use aws_sdk_dynamodb::{ operation::put_item::PutItemOutput, - types::{AttributeValue, Delete, TransactWriteItem, Update}, + types::{ + AttributeValue, Delete, DeleteRequest, PutRequest, TransactWriteItem, + Update, WriteRequest, + }, Error as DynamoDBError, }; use chrono::Utc; -use comm_services_lib::database::TryFromAttribute; +use comm_services_lib::database::{ + self, batch_operations::ExponentialBackoffConfig, TryFromAttribute, +}; use std::collections::HashMap; use tracing::{debug, error, trace}; @@ -190,7 +195,9 @@ AttributeValue::S(blob_hash.into()), ) .consistent_read(true) - .set_limit(limit) + // we need to increase limit by 1 because the blob item itself can be fetched too + // it is filtered-out later + .set_limit(limit.map(|it| it + 1)) .send() .await .map_err(|err| { @@ -213,6 +220,123 @@ .collect::, _>>() .map_err(DBError::Attribute) } + + /// Returns a list of primary keys for rows that already exist in the table + pub async fn list_existing_keys( + &self, + keys: impl IntoIterator, + ) -> DBResult> { + database::batch_operations::batch_get( + &self.ddb, + BLOB_TABLE_NAME, + keys, + Some(format!("{}, {}", ATTR_BLOB_HASH, ATTR_HOLDER)), + ExponentialBackoffConfig::default(), + ) + .await? + .into_iter() + .map(PrimaryKey::try_from) + .collect::, _>>() + } + + /// Returns a list of primary keys for "unchecked" items (blob / holder) + /// that were last modified at least `min_age` ago. + /// We need to specify if we want to get blob or holder items. + pub async fn find_unchecked_items( + &self, + kind: UncheckedKind, + min_age: chrono::Duration, + ) -> DBResult> { + let created_until = Utc::now() - min_age; + let timestamp = created_until.timestamp_millis(); + + let response = self + .ddb + .query() + .table_name(BLOB_TABLE_NAME) + .index_name(UNCHECKED_INDEX_NAME) + .key_condition_expression( + "#unchecked = :kind AND #last_modified < :timestamp", + ) + .expression_attribute_names("#unchecked", ATTR_UNCHECKED) + .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) + .expression_attribute_values(":kind", kind.into()) + .expression_attribute_values( + ":timestamp", + AttributeValue::N(timestamp.to_string()), + ) + .send() + .await + .map_err(|err| { + error!("DynamoDB client failed to query unchecked items: {:?}", err); + DBError::AwsSdk(err.into()) + })?; + + let Some(items) = response.items else { return Ok(vec![]); }; + items + .into_iter() + .map(PrimaryKey::try_from) + .collect::, _>>() + } + + /// For all rows in specified set of primary keys, removes + /// the "unchecked" attribute using PutItem operation in batch. + pub async fn batch_mark_checked( + &self, + keys: impl IntoIterator, + ) -> DBResult<()> { + let items_to_mark = database::batch_operations::batch_get( + &self.ddb, + BLOB_TABLE_NAME, + keys, + None, + ExponentialBackoffConfig::default(), + ) + .await?; + + let write_requests = items_to_mark + .into_iter() + .filter_map(|mut row| { + // filter out rows that are already checked + // to save some write capacity + row.remove(ATTR_UNCHECKED)?; + let put_request = PutRequest::builder().set_item(Some(row)).build(); + let request = WriteRequest::builder().put_request(put_request).build(); + Some(request) + }) + .collect(); + + database::batch_operations::batch_write( + &self.ddb, + BLOB_TABLE_NAME, + write_requests, + ExponentialBackoffConfig::default(), + ) + .await?; + Ok(()) + } + + /// Performs multiple DeleteItem operations in batch + pub async fn batch_delete_rows( + &self, + keys: impl IntoIterator, + ) -> DBResult<()> { + let write_requests = keys + .into_iter() + .map(|key| DeleteRequest::builder().set_key(Some(key.into())).build()) + .map(|request| WriteRequest::builder().delete_request(request).build()) + .collect::>(); + + database::batch_operations::batch_write( + &self.ddb, + BLOB_TABLE_NAME, + write_requests, + ExponentialBackoffConfig::default(), + ) + .await?; + + Ok(()) + } } // private helpers diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs --- a/services/blob/src/database/errors.rs +++ b/services/blob/src/database/errors.rs @@ -18,6 +18,18 @@ Blob(BlobDBError), #[display(...)] ItemAlreadyExists, + MaxRetriesExceeded, +} + +impl From for Error { + fn from(value: comm_services_lib::database::Error) -> Self { + use comm_services_lib::database::Error as E; + match value { + E::AwsSdk(err) => Self::AwsSdk(err), + E::Attribute(err) => Self::Attribute(err), + E::MaxRetriesExceeded => Self::MaxRetriesExceeded, + } + } } #[derive(Debug)]