Page MenuHomePhabricator

D9936.id33999.diff
No OneTemporary

D9936.id33999.diff

diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,9 @@
services/tunnelbroker/target
services/tunnelbroker/src/libcpp/test/build
+services/search-index-lambda/Cargo.lock
+services/search-index-lambda/target
+
.eslintcache
.vscode
!.vscode/extensions.json
diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "search-index-lambda"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+aws_lambda_events = "0.12.0"
+lambda_runtime = "0.8.1"
+reqwest = "0.11.22"
+serde_json = "1.0.108"
+tokio = { version = "1", features = ["macros"] }
+openssl = { version = "0.10", features = ["vendored"] }
+tracing = "0.1"
+tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
+serde_derive = "1.0.193"
+serde = "1.0.193"
diff --git a/services/search-index-lambda/src/constants.rs b/services/search-index-lambda/src/constants.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/constants.rs
@@ -0,0 +1,4 @@
+pub const DYNAMODB_USER_ID_KEY: &str = "userID";
+pub const DYNAMODB_USERNAME_KEY: &str = "username";
+pub const LOG_LEVEL_ENV_VAR: &str =
+ tracing_subscriber::filter::EnvFilter::DEFAULT_ENV;
diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/main.rs
@@ -0,0 +1,304 @@
+use lambda_runtime::{service_fn, Error, LambdaEvent};
+use reqwest::Response;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use tracing::{self, Level};
+use tracing_subscriber::EnvFilter;
+
+mod constants;
+
+#[derive(Deserialize, Serialize, Debug)]
+struct User {
+ #[serde(rename = "userID")]
+ user_id: String,
+ username: String,
+}
+
+#[derive(Serialize, Deserialize)]
+struct UpdateByQuery {
+ query: Query,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ script: Option<Script>,
+}
+
+#[derive(Serialize, Deserialize)]
+struct Script {
+ source: String,
+ lang: String,
+}
+
+#[derive(Serialize, Deserialize)]
+struct Query {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ r#match: Option<Match>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ term: Option<Term>,
+}
+
+#[derive(Deserialize, Serialize)]
+struct Match {
+ #[serde(rename = "userID")]
+ user_id: String,
+}
+
+#[derive(Deserialize, Serialize)]
+struct Term {
+ #[serde(rename = "userID")]
+ user_id: String,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+enum EventName {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct StreamRecord {
+ new_image: Option<HashMap<String, AttributeValue>>,
+ old_image: Option<HashMap<String, AttributeValue>>,
+}
+
+#[derive(Deserialize)]
+enum AttributeValue {
+ Bool(bool),
+ L(Vec<AttributeValue>),
+ M(HashMap<String, AttributeValue>),
+ N(String),
+ Ns(Vec<String>),
+ Null(bool),
+ S(String),
+ Ss(Vec<String>),
+ #[non_exhaustive]
+ Unknown,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+enum OperationType {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+struct Record {
+ #[serde(rename = "eventName")]
+ event_name: Option<OperationType>,
+ dynamodb: Option<StreamRecord>,
+}
+
+#[derive(Deserialize)]
+struct EventPayload {
+ #[serde(rename = "Records")]
+ records: Vec<Record>,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+ let filter = EnvFilter::builder()
+ .with_default_directive(Level::INFO.into())
+ .with_env_var(constants::LOG_LEVEL_ENV_VAR)
+ .from_env_lossy();
+
+ let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish();
+ tracing::subscriber::set_global_default(subscriber)
+ .expect("Unable to configure tracing");
+
+ let func = service_fn(func);
+ lambda_runtime::run(func).await?;
+
+ Ok(())
+}
+
+async fn func(event: LambdaEvent<EventPayload>) -> Result<(), Error> {
+ tracing::info!("Running in debug mode");
+
+ let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
+ .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables.");
+
+ tracing::info!("endpoint: {:?}", endpoint);
+
+ let (payload, _context) = event.into_parts();
+ println!("records: {}", &payload.records.len());
+
+ for record in payload.records {
+ let event_name = record.event_name.expect("failed to get event name");
+ let dynamodb = record
+ .dynamodb
+ .expect("failed to get dynamodb StreamRecord");
+
+ let res = match event_name {
+ OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?,
+ OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?,
+ OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?,
+ };
+
+ match res.status() {
+ reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => {
+ tracing::info!("Successful identity-search {:?} operation", event_name);
+ }
+ _ => {
+ tracing::error!(
+ "failed to update identity-search index, status: {}",
+ res.status()
+ );
+ return Err(
+ format!(
+ "failed to update identity-search index, status: {}",
+ res.status()
+ )
+ .into(),
+ );
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn send_request(
+ url: String,
+ json_body: String,
+) -> Result<Response, String> {
+ let client = reqwest::Client::new();
+
+ client
+ .post(url)
+ .header(reqwest::header::CONTENT_TYPE, "application/json")
+ .body(json_body)
+ .send()
+ .await
+ .map_err(|err| {
+ format!("failed to update identity-search index, err: {}", err)
+ })
+}
+
+async fn handle_insert(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle INSERT event");
+ let new_image = dynamodb
+ .new_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = new_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let username_attribute = new_image
+ .get(constants::DYNAMODB_USERNAME_KEY)
+ .expect("failed to get username");
+
+ let (user_id, username) = match (user_id_attribute, username_attribute) {
+ (AttributeValue::S(user_id), AttributeValue::S(username)) => {
+ (user_id, username)
+ }
+ _ => {
+ return Err(
+ "failed to get user_id and username from AttributeValue".into(),
+ )
+ }
+ };
+
+ let user_body = User {
+ user_id: user_id.clone(),
+ username: username.clone(),
+ };
+
+ let json_body =
+ serde_json::to_string(&user_body).expect("failed to serialize user body");
+
+ let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id);
+
+ send_request(url, json_body).await
+}
+
+async fn handle_modify(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle MODIFY event");
+ let new_image = dynamodb
+ .new_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = new_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let username_attribute = new_image
+ .get(constants::DYNAMODB_USERNAME_KEY)
+ .expect("failed to get username");
+
+ let (user_id, username) = match (user_id_attribute, username_attribute) {
+ (AttributeValue::S(user_id), AttributeValue::S(username)) => {
+ (user_id, username)
+ }
+ _ => {
+ return Err(
+ "failed to get user_id and username from AttributeValue".into(),
+ )
+ }
+ };
+
+ let update_by_query = UpdateByQuery {
+ query: Query {
+ r#match: None,
+ term: Some(Term {
+ user_id: user_id.clone(),
+ }),
+ },
+ script: Some(Script {
+ source: format!("ctx._source.username = \"{}\"", username),
+ lang: "painless".to_string(),
+ }),
+ };
+
+ let json_body = serde_json::to_string(&update_by_query)
+ .expect("failed to serialize update_by_query body");
+ let url = format!("https://{}/users/_update_by_query/", endpoint);
+
+ send_request(url, json_body).await
+}
+
+async fn handle_remove(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle REMOVE event");
+ let old_image = dynamodb
+ .old_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = old_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let user_id = match user_id_attribute {
+ AttributeValue::S(user_id) => user_id,
+ _ => return Err("failed to get user_id from AttributeValue".into()),
+ };
+
+ let update_by_query = UpdateByQuery {
+ query: Query {
+ r#match: Some(Match {
+ user_id: user_id.clone(),
+ }),
+ term: None,
+ },
+ script: None,
+ };
+
+ let json_body = serde_json::to_string(&update_by_query)
+ .expect("failed to serialize update_by_query body");
+ let url = format!("https://{}/users/_delete_by_query/", endpoint);
+
+ send_request(url, json_body).await
+}

File Metadata

Mime Type
text/plain
Expires
Sun, Dec 1, 9:01 AM (20 h, 12 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2604492
Default Alt Text
D9936.id33999.diff (9 KB)

Event Timeline