Page MenuHomePhabricator

D11122.diff
No OneTemporary

D11122.diff

diff --git a/services/identity/src/config.rs b/services/identity/src/config.rs
--- a/services/identity/src/config.rs
+++ b/services/identity/src/config.rs
@@ -61,6 +61,8 @@
#[arg(default_value = SECRETS_DIRECTORY)]
dir: String,
},
+ /// Syncs DynamoDB users with identity-search search index
+ SyncIdentitySearch,
}
#[derive(Clone)]
@@ -75,8 +77,8 @@
impl ServerConfig {
fn from_cli(cli: &Cli) -> Result<Self, Error> {
- if !matches!(cli.command, Command::Server) {
- panic!("ServerConfig is only available for the `server` command");
+ if !matches!(cli.command, Command::Server | Command::SyncIdentitySearch) {
+ panic!("ServerConfig is only available for the `server` or `sync-identity-search` command");
}
info!("Tunnelbroker endpoint: {}", &cli.tunnelbroker_endpoint);
diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs
--- a/services/identity/src/database.rs
+++ b/services/identity/src/database.rs
@@ -984,6 +984,61 @@
Ok(result)
}
+ pub async fn get_all_user_details(&self) -> Result<Vec<UserDetail>, Error> {
+ let scan_output = self
+ .client
+ .scan()
+ .table_name(USERS_TABLE)
+ .projection_expression(format!(
+ "{USERS_TABLE_USERNAME_ATTRIBUTE}, {USERS_TABLE_PARTITION_KEY}"
+ ))
+ .send()
+ .await
+ .map_err(|e| Error::AwsSdk(e.into()))?;
+
+ let mut result = Vec::new();
+ if let Some(attributes) = scan_output.items {
+ for mut attribute in attributes {
+ if let (Ok(username), Ok(user_id)) = (
+ attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE),
+ attribute.take_attr(USERS_TABLE_PARTITION_KEY),
+ ) {
+ result.push(UserDetail { username, user_id });
+ }
+ }
+ }
+ Ok(result)
+ }
+
+ pub async fn get_all_reserved_user_details(
+ &self,
+ ) -> Result<Vec<UserDetail>, Error> {
+ let scan_output = self
+ .client
+ .scan()
+ .table_name(RESERVED_USERNAMES_TABLE)
+ .projection_expression(format!(
+ "{RESERVED_USERNAMES_TABLE_PARTITION_KEY},\
+ {RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE}"
+ ))
+ .send()
+ .await
+ .map_err(|e| Error::AwsSdk(e.into()))?;
+
+ let mut result = Vec::new();
+ if let Some(attributes) = scan_output.items {
+ for mut attribute in attributes {
+ if let (Ok(username), Ok(user_id)) = (
+ attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE),
+ attribute.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE),
+ ) {
+ result.push(UserDetail { username, user_id });
+ }
+ }
+ }
+ Ok(result)
+ }
+
pub async fn add_nonce_to_nonces_table(
&self,
nonce_data: NonceData,
diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs
--- a/services/identity/src/main.rs
+++ b/services/identity/src/main.rs
@@ -21,6 +21,7 @@
mod nonce;
mod reserved_users;
mod siwe;
+mod sync;
mod token;
mod tunnelbroker;
mod websockets;
@@ -28,6 +29,7 @@
use constants::IDENTITY_SERVICE_SOCKET_ADDR;
use cors::cors_layer;
use keygen::generate_and_persist_keypair;
+use sync::sync_identity_search;
use tracing::{self, info, Level};
use tracing_subscriber::EnvFilter;
@@ -89,6 +91,13 @@
grpc_result = grpc_server => { grpc_result.map_err(|e| e.into()) },
};
}
+ Command::SyncIdentitySearch => {
+ let aws_config = aws::config::from_env().region("us-east-2").load().await;
+ let database_client = DatabaseClient::new(&aws_config);
+ let sync_result = sync_identity_search(&database_client).await;
+
+ error::consume_error(sync_result);
+ }
}
Ok(())
diff --git a/services/identity/src/sync.rs b/services/identity/src/sync.rs
new file mode 100644
--- /dev/null
+++ b/services/identity/src/sync.rs
@@ -0,0 +1,102 @@
+use crate::config::CONFIG;
+use crate::constants::IDENTITY_SEARCH_INDEX;
+use crate::database::DatabaseClient;
+use crate::error;
+use identity_search_messages::IdentitySearchUser;
+use serde_json::json;
+use tracing::error;
+
+pub async fn sync_identity_search(
+ database_client: &DatabaseClient,
+) -> Result<(), error::Error> {
+ let identity_reserved_users =
+ database_client.get_all_reserved_user_details().await?;
+ let identity_users = database_client.get_all_user_details().await?;
+
+ let mut identity_search_users: Vec<IdentitySearchUser> =
+ identity_reserved_users
+ .into_iter()
+ .map(|user| IdentitySearchUser {
+ username: user.username,
+ user_id: user.user_id,
+ })
+ .collect();
+
+ for user in identity_users {
+ identity_search_users.push(IdentitySearchUser {
+ username: user.username,
+ user_id: user.user_id,
+ });
+ }
+
+ let client = reqwest::Client::new();
+
+ clear_index(&client).await?;
+ restore_index(&client, &identity_search_users).await?;
+
+ Ok(())
+}
+
+pub async fn clear_index(
+ reqwest_client: &reqwest::Client,
+) -> Result<(), error::Error> {
+ let url = format!(
+ "https://{}/{}/_delete_by_query",
+ &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX
+ );
+
+ let query = serde_json::json!({
+ "query": {
+ "match_all": {}
+ }
+ });
+
+ let response = reqwest_client
+ .post(&url)
+ .header("Content-Type", "application/json")
+ .json(&query)
+ .send()
+ .await
+ .unwrap();
+
+ if !response.status().is_success() {
+ error!("Sync Error: Failed to clear index");
+ }
+
+ Ok(())
+}
+
+pub async fn restore_index(
+ reqwest_client: &reqwest::Client,
+ identity_search_users: &Vec<IdentitySearchUser>,
+) -> Result<(), error::Error> {
+ let mut bulk_data = String::new();
+
+ for user in identity_search_users {
+ let action = json!({ "index": { "_index": IDENTITY_SEARCH_INDEX, "_id": user.user_id } });
+ bulk_data.push_str(&action.to_string());
+ bulk_data.push('\n');
+
+ bulk_data.push_str(&serde_json::to_string(&user).unwrap());
+ bulk_data.push('\n');
+ }
+
+ let url = format!(
+ "https://{}/{}/_bulk/",
+ &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX
+ );
+
+ let response = reqwest_client
+ .post(&url)
+ .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson")
+ .body(bulk_data)
+ .send()
+ .await
+ .unwrap();
+
+ if !response.status().is_success() {
+ error!("Sync Error: Failed to restore index");
+ }
+
+ Ok(())
+}

File Metadata

Mime Type
text/plain
Expires
Mon, Sep 30, 11:33 PM (20 h, 5 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2208394
Default Alt Text
D11122.diff (6 KB)

Event Timeline