diff --git a/services/identity/src/database/farcaster.rs b/services/identity/src/database/farcaster.rs --- a/services/identity/src/database/farcaster.rs +++ b/services/identity/src/database/farcaster.rs @@ -30,35 +30,56 @@ &self, farcaster_ids: Vec, ) -> Result, Error> { + const BATCH_SIZE: usize = 10; let mut users: Vec = Vec::new(); - for id in farcaster_ids { - let query_response = self - .client - .query() - .table_name(USERS_TABLE) - .index_name(USERS_TABLE_FARCASTER_ID_INDEX) - .key_condition_expression(format!( - "{} = :val", - USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME - )) - .expression_attribute_values(":val", AttributeValue::S(id)) - .send() - .await - .map_err(|e| { - error!( - errorType = error_types::FARCASTER_DB_LOG, - "Failed to query users by farcasterID: {:?}", e - ); - Error::AwsSdk(e.into()) - })? - .items - .and_then(|mut items| items.pop()) - .map(FarcasterUserData::try_from) - .transpose() - .map_err(Error::from)?; - if let Some(data) = query_response { - users.push(data); + for chunk in farcaster_ids.chunks(BATCH_SIZE) { + let mut handles = Vec::new(); + + for id in chunk { + let client = self.client.clone(); + let id = id.clone(); + + let handle = tokio::spawn(async move { + client + .query() + .table_name(USERS_TABLE) + .index_name(USERS_TABLE_FARCASTER_ID_INDEX) + .key_condition_expression(format!( + "{} = :val", + USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME + )) + .expression_attribute_values(":val", AttributeValue::S(id)) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))? + .items + .and_then(|mut items| items.pop()) + .map(FarcasterUserData::try_from) + .transpose() + .map_err(Error::from) + }); + + handles.push(handle); + } + + for handle in handles { + match handle.await { + Ok(Ok(Some(user))) => users.push(user), + Ok(Ok(None)) => {} + Ok(Err(e)) => { + error!( + errorType = error_types::FARCASTER_DB_LOG, + "Failed to query users by farcasterID: {:?}", e + ); + } + Err(e) => { + error!( + errorType = error_types::FARCASTER_DB_LOG, + "Task failed: {:?}", e + ); + } + } } }