diff --git a/services/identity/src/config.rs b/services/identity/src/config.rs --- a/services/identity/src/config.rs +++ b/services/identity/src/config.rs @@ -61,6 +61,8 @@ #[arg(default_value = SECRETS_DIRECTORY)] dir: String, }, + /// Syncs DynamoDB users with identity-search search index + SyncIdentitySearch, } #[derive(Clone)] @@ -75,8 +77,8 @@ impl ServerConfig { fn from_cli(cli: &Cli) -> Result { - if !matches!(cli.command, Command::Server) { - panic!("ServerConfig is only available for the `server` command"); + if !matches!(cli.command, Command::Server | Command::SyncIdentitySearch) { + panic!("ServerConfig is only available for the `server` or `sync-identity-search` command"); } info!("Tunnelbroker endpoint: {}", &cli.tunnelbroker_endpoint); diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -984,6 +984,61 @@ Ok(result) } + pub async fn get_all_user_details(&self) -> Result, Error> { + let scan_output = self + .client + .scan() + .table_name(USERS_TABLE) + .projection_expression(format!( + "{USERS_TABLE_USERNAME_ATTRIBUTE}, {USERS_TABLE_PARTITION_KEY}" + )) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))?; + + let mut result = Vec::new(); + if let Some(attributes) = scan_output.items { + for mut attribute in attributes { + if let (Ok(username), Ok(user_id)) = ( + attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), + attribute.take_attr(USERS_TABLE_PARTITION_KEY), + ) { + result.push(UserDetail { username, user_id }); + } + } + } + Ok(result) + } + + pub async fn get_all_reserved_user_details( + &self, + ) -> Result, Error> { + let scan_output = self + .client + .scan() + .table_name(RESERVED_USERNAMES_TABLE) + .projection_expression(format!( + "{RESERVED_USERNAMES_TABLE_PARTITION_KEY},\ + {RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE}" + )) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))?; + + let mut result = Vec::new(); + if let Some(attributes) = scan_output.items { + for mut attribute in attributes { + if let (Ok(username), Ok(user_id)) = ( + attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), + attribute.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE), + ) { + result.push(UserDetail { username, user_id }); + } + } + } + Ok(result) + } + pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -21,6 +21,7 @@ mod nonce; mod reserved_users; mod siwe; +mod sync_identity_search; mod token; mod tunnelbroker; mod websockets; @@ -28,6 +29,7 @@ use constants::IDENTITY_SERVICE_SOCKET_ADDR; use cors::cors_layer; use keygen::generate_and_persist_keypair; +use sync_identity_search::sync_index; use tracing::{self, info, Level}; use tracing_subscriber::EnvFilter; @@ -89,6 +91,13 @@ grpc_result = grpc_server => { grpc_result.map_err(|e| e.into()) }, }; } + Command::SyncIdentitySearch => { + let aws_config = aws::config::from_env().region("us-east-2").load().await; + let database_client = DatabaseClient::new(&aws_config); + let sync_result = sync_index(&database_client).await; + + error::consume_error(sync_result); + } } Ok(()) diff --git a/services/identity/src/sync_identity_search.rs b/services/identity/src/sync_identity_search.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/sync_identity_search.rs @@ -0,0 +1,102 @@ +use crate::config::CONFIG; +use crate::constants::IDENTITY_SEARCH_INDEX; +use crate::database::DatabaseClient; +use crate::error; +use identity_search_messages::IdentitySearchUser; +use serde_json::json; +use tracing::error; + +pub async fn sync_index( + database_client: &DatabaseClient, +) -> Result<(), error::Error> { + let identity_reserved_users = + database_client.get_all_reserved_user_details().await?; + let identity_users = database_client.get_all_user_details().await?; + + let mut identity_search_users: Vec = + identity_reserved_users + .into_iter() + .map(|user| IdentitySearchUser { + username: user.username, + user_id: user.user_id, + }) + .collect(); + + for user in identity_users { + identity_search_users.push(IdentitySearchUser { + username: user.username, + user_id: user.user_id, + }); + } + + let client = reqwest::Client::new(); + + clear_index(&client).await?; + restore_index(&client, &identity_search_users).await?; + + Ok(()) +} + +pub async fn clear_index( + reqwest_client: &reqwest::Client, +) -> Result<(), error::Error> { + let url = format!( + "https://{}/{}/_delete_by_query", + &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX + ); + + let query = serde_json::json!({ + "query": { + "match_all": {} + } + }); + + let response = reqwest_client + .post(&url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .json(&query) + .send() + .await + .unwrap(); + + if !response.status().is_success() { + error!("Sync Error: Failed to clear index"); + } + + Ok(()) +} + +pub async fn restore_index( + reqwest_client: &reqwest::Client, + identity_search_users: &Vec, +) -> Result<(), error::Error> { + let mut bulk_data = String::new(); + + for user in identity_search_users { + let action = json!({ "index": { "_index": IDENTITY_SEARCH_INDEX, "_id": user.user_id } }); + bulk_data.push_str(&action.to_string()); + bulk_data.push('\n'); + + bulk_data.push_str(&serde_json::to_string(&user).unwrap()); + bulk_data.push('\n'); + } + + let url = format!( + "https://{}/{}/_bulk/", + &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX + ); + + let response = reqwest_client + .post(&url) + .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") + .body(bulk_data) + .send() + .await + .unwrap(); + + if !response.status().is_success() { + error!("Sync Error: Failed to restore index"); + } + + Ok(()) +}