Page MenuHomePhorge

D9348.1768649985.diff
No OneTemporary

Size
5 KB
Referenced Files
None
Subscribers
None

D9348.1768649985.diff

diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs
--- a/services/comm-services-lib/src/database.rs
+++ b/services/comm-services-lib/src/database.rs
@@ -406,15 +406,20 @@
pub mod batch_operations {
use aws_sdk_dynamodb::{
- error::SdkError, operation::batch_write_item::BatchWriteItemError,
- types::WriteRequest,
+ error::SdkError,
+ operation::batch_write_item::BatchWriteItemError,
+ types::{KeysAndAttributes, WriteRequest},
+ Error as DynamoDBError,
};
use rand::Rng;
use std::time::Duration;
use tracing::{debug, trace};
+ use super::AttributeMap;
+
/// DynamoDB hard limit for single BatchWriteItem request
- const SINGLE_BATCH_ITEM_LIMIT: usize = 25;
+ const SINGLE_BATCH_WRITE_ITEM_LIMIT: usize = 25;
+ const SINGLE_BATCH_GET_ITEM_LIMIT: usize = 100;
/// Exponential backoff configuration for batch write operation
#[derive(derive_more::Constructor, Debug)]
@@ -461,6 +466,117 @@
}
}
+ #[tracing::instrument(name = "batch_get", skip(ddb, primary_keys, config))]
+ pub async fn batch_get<K>(
+ ddb: &aws_sdk_dynamodb::Client,
+ table_name: &str,
+ primary_keys: K,
+ projection_expression: Option<String>,
+ config: ExponentialBackoffConfig,
+ ) -> Result<Vec<AttributeMap>, super::Error>
+ where
+ K: IntoIterator,
+ K::Item: Into<AttributeMap>,
+ {
+ let mut primary_keys: Vec<_> =
+ primary_keys.into_iter().map(Into::into).collect();
+ let mut results = Vec::with_capacity(primary_keys.len());
+ tracing::debug!(
+ ?config,
+ "Starting batch read operation of {} items...",
+ primary_keys.len()
+ );
+
+ let mut exponential_backoff = config.new_counter();
+ let mut backup = Vec::with_capacity(SINGLE_BATCH_GET_ITEM_LIMIT);
+
+ loop {
+ let items_to_drain =
+ std::cmp::min(primary_keys.len(), SINGLE_BATCH_GET_ITEM_LIMIT);
+ let chunk = primary_keys.drain(..items_to_drain).collect::<Vec<_>>();
+ if chunk.is_empty() {
+ // No more items
+ tracing::trace!("No more items to process. Exiting");
+ break;
+ }
+
+ // we don't need the backup when we don't retry
+ if config.should_retry_on_capacity_exceeded() {
+ chunk.clone_into(&mut backup);
+ }
+
+ tracing::trace!("Attempting to get chunk of {} items...", chunk.len());
+ let result = ddb
+ .batch_get_item()
+ .request_items(
+ table_name,
+ KeysAndAttributes::builder()
+ .set_keys(Some(chunk))
+ .consistent_read(true)
+ .set_projection_expression(projection_expression.clone())
+ .build(),
+ )
+ .send()
+ .await;
+
+ match result {
+ Ok(output) => {
+ if let Some(mut responses) = output.responses {
+ if let Some(items) = responses.remove(table_name) {
+ tracing::trace!("Successfully read {} items", items.len());
+ results.extend(items);
+ }
+ } else {
+ tracing::warn!("Responses was None");
+ }
+
+ if let Some(mut unprocessed) = output.unprocessed_keys {
+ let keys_to_retry = match unprocessed.remove(table_name) {
+ Some(KeysAndAttributes {
+ keys: Some(keys), ..
+ }) if !keys.is_empty() => keys,
+ _ => {
+ tracing::trace!("Chunk read successfully. Continuing.");
+ exponential_backoff.reset();
+ continue;
+ }
+ };
+
+ exponential_backoff.sleep_and_retry().await?;
+ tracing::debug!(
+ "Some items failed. Retrying {} requests",
+ keys_to_retry.len()
+ );
+ primary_keys.extend(keys_to_retry);
+ } else {
+ tracing::trace!("Unprocessed items was None");
+ }
+ }
+ Err(error) => {
+ let error: DynamoDBError = error.into();
+ if !matches!(
+ error,
+ DynamoDBError::ProvisionedThroughputExceededException(_)
+ ) {
+ tracing::error!("BatchGetItem failed: {0:?} - {0}", error);
+ return Err(error.into());
+ }
+
+ tracing::warn!("Provisioned capacity exceeded!");
+ if !config.retry_on_provisioned_capacity_exceeded {
+ return Err(error.into());
+ }
+ exponential_backoff.sleep_and_retry().await?;
+ primary_keys.append(&mut backup);
+ trace!("Retrying now...");
+ }
+ };
+ }
+
+ debug!("Batch read completed.");
+ Ok(results)
+ }
+
/// Performs a single DynamoDB table batch write operation. If the batch
/// contains more than 25 items, it is split into chunks.
///
@@ -480,11 +596,11 @@
);
let mut exponential_backoff = config.new_counter();
- let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT);
+ let mut backup = Vec::with_capacity(SINGLE_BATCH_WRITE_ITEM_LIMIT);
loop {
let items_to_drain =
- std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT);
+ std::cmp::min(requests.len(), SINGLE_BATCH_WRITE_ITEM_LIMIT);
let chunk = requests.drain(..items_to_drain).collect::<Vec<_>>();
if chunk.is_empty() {
// No more items

File Metadata

Mime Type
text/plain
Expires
Sat, Jan 17, 11:39 AM (5 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5947768
Default Alt Text
D9348.1768649985.diff (5 KB)

Event Timeline