diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -13,6 +13,7 @@ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; +use comm_lib::tools::IntoChunks; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; @@ -20,10 +21,13 @@ pub use crate::database::device_list::DeviceIDAttribute; pub use crate::database::one_time_keys::OTKRow; use crate::{ - constants::RESERVED_USERNAMES_TABLE_USER_ID_INDEX, - ddb_utils::EthereumIdentity, device_list::SignedDeviceList, - grpc_services::shared::PlatformMetadata, log::redact_sensitive_data, - reserved_users::UserDetail, siwe::SocialProof, + constants::{tonic_status_messages, RESERVED_USERNAMES_TABLE_USER_ID_INDEX}, + ddb_utils::EthereumIdentity, + device_list::SignedDeviceList, + grpc_services::shared::PlatformMetadata, + log::redact_sensitive_data, + reserved_users::UserDetail, + siwe::SocialProof, }; use crate::{ ddb_utils::{DBIdentity, OlmAccountType}, @@ -32,7 +36,7 @@ use crate::{error::Error, grpc_utils::DeviceKeysInfo}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, warn, Instrument}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; @@ -1252,32 +1256,66 @@ .await } + #[tracing::instrument(skip_all)] pub async fn query_reserved_usernames_by_user_ids( &self, user_ids: Vec, ) -> Result, Error> { - if user_ids.len() > 50 { - tracing::warn!( - num_queries = user_ids.len(), - "Querying more than 50 reserved usernames by user ID! {}", - "This is inefficient and might lead to performance issues." - ); - } + debug!("Querying for {} reserved usernames", user_ids.len()); + + const NUM_CONCURRENT_TASKS: usize = 16; + + let mut tasks = tokio::task::JoinSet::new(); let mut results = HashMap::with_capacity(user_ids.len()); - for user_id in user_ids { - let query_result = self - .query_reserved_usernames_table_index( - &user_id, - ( - RESERVED_USERNAMES_TABLE_USER_ID_INDEX, - RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, - ), - RESERVED_USERNAMES_TABLE_PARTITION_KEY, - ) - .await?; + for local_user_ids in user_ids.into_n_chunks(NUM_CONCURRENT_TASKS) { + let db = self.clone(); + let task = async move { + let mut local_results = HashMap::new(); + for user_id in local_user_ids { + let query_result = db + .query_reserved_usernames_table_index( + &user_id, + ( + RESERVED_USERNAMES_TABLE_USER_ID_INDEX, + RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, + ), + RESERVED_USERNAMES_TABLE_PARTITION_KEY, + ) + .await?; - if let Some(username) = query_result { - results.insert(user_id, username); + if let Some(username) = query_result { + local_results.insert(user_id, username); + } + } + + Ok::<_, Error>(local_results) + }; + tasks.spawn(task.in_current_span()); + } + + while let Some(result) = tasks.join_next().await { + match result { + Ok(Ok(task_result)) => { + results.extend(task_result); + } + Ok(Err(query_error)) => { + error!( + errorType = error_types::GENERIC_DB_LOG, + "Failed to query reserved usernames by userID: {:?}", query_error + ); + tasks.abort_all(); + return Err(query_error); + } + Err(join_error) => { + error!( + errorType = error_types::GENERIC_DB_LOG, + "Failed to join task: {:?}", join_error + ); + tasks.abort_all(); + return Err(Error::Status(tonic::Status::aborted( + tonic_status_messages::UNEXPECTED_ERROR, + ))); + } } }