Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3392529
D9936.id33999.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Referenced Files
None
Subscribers
None
D9936.id33999.diff
View Options
diff --git a/.gitignore b/.gitignore
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,9 @@
services/tunnelbroker/target
services/tunnelbroker/src/libcpp/test/build
+services/search-index-lambda/Cargo.lock
+services/search-index-lambda/target
+
.eslintcache
.vscode
!.vscode/extensions.json
diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "search-index-lambda"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+aws_lambda_events = "0.12.0"
+lambda_runtime = "0.8.1"
+reqwest = "0.11.22"
+serde_json = "1.0.108"
+tokio = { version = "1", features = ["macros"] }
+openssl = { version = "0.10", features = ["vendored"] }
+tracing = "0.1"
+tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
+serde_derive = "1.0.193"
+serde = "1.0.193"
diff --git a/services/search-index-lambda/src/constants.rs b/services/search-index-lambda/src/constants.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/constants.rs
@@ -0,0 +1,4 @@
+pub const DYNAMODB_USER_ID_KEY: &str = "userID";
+pub const DYNAMODB_USERNAME_KEY: &str = "username";
+pub const LOG_LEVEL_ENV_VAR: &str =
+ tracing_subscriber::filter::EnvFilter::DEFAULT_ENV;
diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/main.rs
@@ -0,0 +1,304 @@
+use lambda_runtime::{service_fn, Error, LambdaEvent};
+use reqwest::Response;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use tracing::{self, Level};
+use tracing_subscriber::EnvFilter;
+
+mod constants;
+
+#[derive(Deserialize, Serialize, Debug)]
+struct User {
+ #[serde(rename = "userID")]
+ user_id: String,
+ username: String,
+}
+
+#[derive(Serialize, Deserialize)]
+struct UpdateByQuery {
+ query: Query,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ script: Option<Script>,
+}
+
+#[derive(Serialize, Deserialize)]
+struct Script {
+ source: String,
+ lang: String,
+}
+
+#[derive(Serialize, Deserialize)]
+struct Query {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ r#match: Option<Match>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ term: Option<Term>,
+}
+
+#[derive(Deserialize, Serialize)]
+struct Match {
+ #[serde(rename = "userID")]
+ user_id: String,
+}
+
+#[derive(Deserialize, Serialize)]
+struct Term {
+ #[serde(rename = "userID")]
+ user_id: String,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+enum EventName {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct StreamRecord {
+ new_image: Option<HashMap<String, AttributeValue>>,
+ old_image: Option<HashMap<String, AttributeValue>>,
+}
+
+#[derive(Deserialize)]
+enum AttributeValue {
+ Bool(bool),
+ L(Vec<AttributeValue>),
+ M(HashMap<String, AttributeValue>),
+ N(String),
+ Ns(Vec<String>),
+ Null(bool),
+ S(String),
+ Ss(Vec<String>),
+ #[non_exhaustive]
+ Unknown,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+enum OperationType {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+struct Record {
+ #[serde(rename = "eventName")]
+ event_name: Option<OperationType>,
+ dynamodb: Option<StreamRecord>,
+}
+
+#[derive(Deserialize)]
+struct EventPayload {
+ #[serde(rename = "Records")]
+ records: Vec<Record>,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Error> {
+ let filter = EnvFilter::builder()
+ .with_default_directive(Level::INFO.into())
+ .with_env_var(constants::LOG_LEVEL_ENV_VAR)
+ .from_env_lossy();
+
+ let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish();
+ tracing::subscriber::set_global_default(subscriber)
+ .expect("Unable to configure tracing");
+
+ let func = service_fn(func);
+ lambda_runtime::run(func).await?;
+
+ Ok(())
+}
+
+async fn func(event: LambdaEvent<EventPayload>) -> Result<(), Error> {
+ tracing::info!("Running in debug mode");
+
+ let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
+ .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables.");
+
+ tracing::info!("endpoint: {:?}", endpoint);
+
+ let (payload, _context) = event.into_parts();
+ println!("records: {}", &payload.records.len());
+
+ for record in payload.records {
+ let event_name = record.event_name.expect("failed to get event name");
+ let dynamodb = record
+ .dynamodb
+ .expect("failed to get dynamodb StreamRecord");
+
+ let res = match event_name {
+ OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?,
+ OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?,
+ OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?,
+ };
+
+ match res.status() {
+ reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => {
+ tracing::info!("Successful identity-search {:?} operation", event_name);
+ }
+ _ => {
+ tracing::error!(
+ "failed to update identity-search index, status: {}",
+ res.status()
+ );
+ return Err(
+ format!(
+ "failed to update identity-search index, status: {}",
+ res.status()
+ )
+ .into(),
+ );
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn send_request(
+ url: String,
+ json_body: String,
+) -> Result<Response, String> {
+ let client = reqwest::Client::new();
+
+ client
+ .post(url)
+ .header(reqwest::header::CONTENT_TYPE, "application/json")
+ .body(json_body)
+ .send()
+ .await
+ .map_err(|err| {
+ format!("failed to update identity-search index, err: {}", err)
+ })
+}
+
+async fn handle_insert(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle INSERT event");
+ let new_image = dynamodb
+ .new_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = new_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let username_attribute = new_image
+ .get(constants::DYNAMODB_USERNAME_KEY)
+ .expect("failed to get username");
+
+ let (user_id, username) = match (user_id_attribute, username_attribute) {
+ (AttributeValue::S(user_id), AttributeValue::S(username)) => {
+ (user_id, username)
+ }
+ _ => {
+ return Err(
+ "failed to get user_id and username from AttributeValue".into(),
+ )
+ }
+ };
+
+ let user_body = User {
+ user_id: user_id.clone(),
+ username: username.clone(),
+ };
+
+ let json_body =
+ serde_json::to_string(&user_body).expect("failed to serialize user body");
+
+ let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id);
+
+ send_request(url, json_body).await
+}
+
+async fn handle_modify(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle MODIFY event");
+ let new_image = dynamodb
+ .new_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = new_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let username_attribute = new_image
+ .get(constants::DYNAMODB_USERNAME_KEY)
+ .expect("failed to get username");
+
+ let (user_id, username) = match (user_id_attribute, username_attribute) {
+ (AttributeValue::S(user_id), AttributeValue::S(username)) => {
+ (user_id, username)
+ }
+ _ => {
+ return Err(
+ "failed to get user_id and username from AttributeValue".into(),
+ )
+ }
+ };
+
+ let update_by_query = UpdateByQuery {
+ query: Query {
+ r#match: None,
+ term: Some(Term {
+ user_id: user_id.clone(),
+ }),
+ },
+ script: Some(Script {
+ source: format!("ctx._source.username = \"{}\"", username),
+ lang: "painless".to_string(),
+ }),
+ };
+
+ let json_body = serde_json::to_string(&update_by_query)
+ .expect("failed to serialize update_by_query body");
+ let url = format!("https://{}/users/_update_by_query/", endpoint);
+
+ send_request(url, json_body).await
+}
+
+async fn handle_remove(
+ dynamodb: &StreamRecord,
+ endpoint: &str,
+) -> Result<Response, String> {
+ tracing::info!("Handle REMOVE event");
+ let old_image = dynamodb
+ .old_image
+ .as_ref()
+ .expect("failed to get new image");
+
+ let user_id_attribute = old_image
+ .get(constants::DYNAMODB_USER_ID_KEY)
+ .expect("failed to get userid");
+ let user_id = match user_id_attribute {
+ AttributeValue::S(user_id) => user_id,
+ _ => return Err("failed to get user_id from AttributeValue".into()),
+ };
+
+ let update_by_query = UpdateByQuery {
+ query: Query {
+ r#match: Some(Match {
+ user_id: user_id.clone(),
+ }),
+ term: None,
+ },
+ script: None,
+ };
+
+ let json_body = serde_json::to_string(&update_by_query)
+ .expect("failed to serialize update_by_query body");
+ let url = format!("https://{}/users/_delete_by_query/", endpoint);
+
+ send_request(url, json_body).await
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Dec 1, 9:01 AM (20 h, 12 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2604492
Default Alt Text
D9936.id33999.diff (9 KB)
Attached To
Mode
D9936: [services] [3/n] Rust opensearch indexing lambda
Attached
Detach File
Event Timeline
Log In to Comment