Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F33272548
D9348.1768649985.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
5 KB
Referenced Files
None
Subscribers
None
D9348.1768649985.diff
View Options
diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs
--- a/services/comm-services-lib/src/database.rs
+++ b/services/comm-services-lib/src/database.rs
@@ -406,15 +406,20 @@
pub mod batch_operations {
use aws_sdk_dynamodb::{
- error::SdkError, operation::batch_write_item::BatchWriteItemError,
- types::WriteRequest,
+ error::SdkError,
+ operation::batch_write_item::BatchWriteItemError,
+ types::{KeysAndAttributes, WriteRequest},
+ Error as DynamoDBError,
};
use rand::Rng;
use std::time::Duration;
use tracing::{debug, trace};
+ use super::AttributeMap;
+
/// DynamoDB hard limit for single BatchWriteItem request
- const SINGLE_BATCH_ITEM_LIMIT: usize = 25;
+ const SINGLE_BATCH_WRITE_ITEM_LIMIT: usize = 25;
+ const SINGLE_BATCH_GET_ITEM_LIMIT: usize = 100;
/// Exponential backoff configuration for batch write operation
#[derive(derive_more::Constructor, Debug)]
@@ -461,6 +466,117 @@
}
}
+ #[tracing::instrument(name = "batch_get", skip(ddb, primary_keys, config))]
+ pub async fn batch_get<K>(
+ ddb: &aws_sdk_dynamodb::Client,
+ table_name: &str,
+ primary_keys: K,
+ projection_expression: Option<String>,
+ config: ExponentialBackoffConfig,
+ ) -> Result<Vec<AttributeMap>, super::Error>
+ where
+ K: IntoIterator,
+ K::Item: Into<AttributeMap>,
+ {
+ let mut primary_keys: Vec<_> =
+ primary_keys.into_iter().map(Into::into).collect();
+ let mut results = Vec::with_capacity(primary_keys.len());
+ tracing::debug!(
+ ?config,
+ "Starting batch read operation of {} items...",
+ primary_keys.len()
+ );
+
+ let mut exponential_backoff = config.new_counter();
+ let mut backup = Vec::with_capacity(SINGLE_BATCH_GET_ITEM_LIMIT);
+
+ loop {
+ let items_to_drain =
+ std::cmp::min(primary_keys.len(), SINGLE_BATCH_GET_ITEM_LIMIT);
+ let chunk = primary_keys.drain(..items_to_drain).collect::<Vec<_>>();
+ if chunk.is_empty() {
+ // No more items
+ tracing::trace!("No more items to process. Exiting");
+ break;
+ }
+
+ // we don't need the backup when we don't retry
+ if config.should_retry_on_capacity_exceeded() {
+ chunk.clone_into(&mut backup);
+ }
+
+ tracing::trace!("Attempting to get chunk of {} items...", chunk.len());
+ let result = ddb
+ .batch_get_item()
+ .request_items(
+ table_name,
+ KeysAndAttributes::builder()
+ .set_keys(Some(chunk))
+ .consistent_read(true)
+ .set_projection_expression(projection_expression.clone())
+ .build(),
+ )
+ .send()
+ .await;
+
+ match result {
+ Ok(output) => {
+ if let Some(mut responses) = output.responses {
+ if let Some(items) = responses.remove(table_name) {
+ tracing::trace!("Successfully read {} items", items.len());
+ results.extend(items);
+ }
+ } else {
+ tracing::warn!("Responses was None");
+ }
+
+ if let Some(mut unprocessed) = output.unprocessed_keys {
+ let keys_to_retry = match unprocessed.remove(table_name) {
+ Some(KeysAndAttributes {
+ keys: Some(keys), ..
+ }) if !keys.is_empty() => keys,
+ _ => {
+ tracing::trace!("Chunk read successfully. Continuing.");
+ exponential_backoff.reset();
+ continue;
+ }
+ };
+
+ exponential_backoff.sleep_and_retry().await?;
+ tracing::debug!(
+ "Some items failed. Retrying {} requests",
+ keys_to_retry.len()
+ );
+ primary_keys.extend(keys_to_retry);
+ } else {
+ tracing::trace!("Unprocessed items was None");
+ }
+ }
+ Err(error) => {
+ let error: DynamoDBError = error.into();
+ if !matches!(
+ error,
+ DynamoDBError::ProvisionedThroughputExceededException(_)
+ ) {
+ tracing::error!("BatchGetItem failed: {0:?} - {0}", error);
+ return Err(error.into());
+ }
+
+ tracing::warn!("Provisioned capacity exceeded!");
+ if !config.retry_on_provisioned_capacity_exceeded {
+ return Err(error.into());
+ }
+ exponential_backoff.sleep_and_retry().await?;
+ primary_keys.append(&mut backup);
+ trace!("Retrying now...");
+ }
+ };
+ }
+
+ debug!("Batch read completed.");
+ Ok(results)
+ }
+
/// Performs a single DynamoDB table batch write operation. If the batch
/// contains more than 25 items, it is split into chunks.
///
@@ -480,11 +596,11 @@
);
let mut exponential_backoff = config.new_counter();
- let mut backup = Vec::with_capacity(SINGLE_BATCH_ITEM_LIMIT);
+ let mut backup = Vec::with_capacity(SINGLE_BATCH_WRITE_ITEM_LIMIT);
loop {
let items_to_drain =
- std::cmp::min(requests.len(), SINGLE_BATCH_ITEM_LIMIT);
+ std::cmp::min(requests.len(), SINGLE_BATCH_WRITE_ITEM_LIMIT);
let chunk = requests.drain(..items_to_drain).collect::<Vec<_>>();
if chunk.is_empty() {
// No more items
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jan 17, 11:39 AM (5 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5947768
Default Alt Text
D9348.1768649985.diff (5 KB)
Attached To
Mode
D9348: [services-lib] Add function to batch get items
Attached
Detach File
Event Timeline
Log In to Comment