Page MenuHomePhabricator

D10736.id35863.diff
No OneTemporary

D10736.id35863.diff

diff --git a/services/search-index-lambda/Cargo.lock b/services/search-index-lambda/Cargo.lock
--- a/services/search-index-lambda/Cargo.lock
+++ b/services/search-index-lambda/Cargo.lock
@@ -41,12 +41,6 @@
"libc",
]
-[[package]]
-name = "anyhow"
-version = "1.0.75"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
-
[[package]]
name = "async-stream"
version = "0.3.5"
@@ -66,7 +60,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -217,6 +211,12 @@
"windows-targets",
]
+[[package]]
+name = "convert_case"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
+
[[package]]
name = "core-foundation"
version = "0.9.3"
@@ -263,7 +263,7 @@
"proc-macro2",
"quote",
"strsim",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -274,7 +274,7 @@
dependencies = [
"darling_core",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -287,6 +287,19 @@
"serde",
]
+[[package]]
+name = "derive_more"
+version = "0.99.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
+dependencies = [
+ "convert_case",
+ "proc-macro2",
+ "quote",
+ "rustc_version",
+ "syn 1.0.109",
+]
+
[[package]]
name = "either"
version = "1.9.0"
@@ -420,7 +433,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -861,7 +874,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -927,7 +940,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1080,6 +1093,15 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
[[package]]
name = "rustix"
version = "0.38.25"
@@ -1112,9 +1134,9 @@
name = "search-index-lambda"
version = "0.1.0"
dependencies = [
- "anyhow",
"aws-smithy-types",
"aws_lambda_events",
+ "derive_more",
"lambda_runtime",
"openssl",
"reqwest",
@@ -1149,6 +1171,12 @@
"libc",
]
+[[package]]
+name = "semver"
+version = "1.0.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
+
[[package]]
name = "serde"
version = "1.0.193"
@@ -1166,7 +1194,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1238,7 +1266,7 @@
"darling",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1291,6 +1319,17 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+[[package]]
+name = "syn"
+version = "1.0.109"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
[[package]]
name = "syn"
version = "2.0.39"
@@ -1415,7 +1454,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1500,7 +1539,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1634,7 +1673,7 @@
"once_cell",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-shared",
]
@@ -1668,7 +1707,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml
--- a/services/search-index-lambda/Cargo.toml
+++ b/services/search-index-lambda/Cargo.toml
@@ -16,4 +16,4 @@
serde_derive = "1.0.193"
serde = "1.0.193"
aws-smithy-types = "1.1.1"
-anyhow = "1.0.75"
+derive_more = "0.99.17"
diff --git a/services/search-index-lambda/src/error.rs b/services/search-index-lambda/src/error.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/error.rs
@@ -0,0 +1,28 @@
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum Error {
+ #[display(fmt = "Payload Error: {:?}", _0)]
+ PayloadError(RecordError),
+ #[display(fmt = "Missing OPENSEARCH_ENDPOINT: {:?}", _0)]
+ MissingOpenSearchEndpoint(std::env::VarError),
+ #[display(fmt = "Serialization Error: {:?}", _0)]
+ SerializationError(serde_json::Error),
+ #[display(fmt = "Tracing Error {:?}", _0)]
+ TracingError(tracing::subscriber::SetGlobalDefaultError),
+ #[display(fmt = "Reqwest Error {:?}", _0)]
+ ReqwestError(reqwest::Error),
+ #[display(fmt = "Update Index Error: Status Code: {:?}", _0)]
+ UpdateIndexError(#[error(ignore)] reqwest::StatusCode),
+}
+
+#[derive(Debug, derive_more::Display, derive_more::Error)]
+pub enum RecordError {
+ InvalidAttributeType,
+ MissingEventName,
+ MissingDynamoDBStreamRecord,
+ MissingNewImage,
+ MissingOldImage,
+ MissingUserId,
+ MissingUsername,
+}
diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs
--- a/services/search-index-lambda/src/main.rs
+++ b/services/search-index-lambda/src/main.rs
@@ -1,109 +1,27 @@
-use anyhow::{anyhow, Result};
-use lambda_runtime::{service_fn, Error, LambdaEvent};
+use lambda_runtime::{service_fn, LambdaEvent};
use reqwest::Response;
-use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
+use serde_derive::Serialize;
use tracing::{self, Level};
use tracing_subscriber::EnvFilter;
mod constants;
+mod error;
+mod payload;
+mod query;
-#[derive(Deserialize, Serialize, Debug)]
-struct User {
- #[serde(rename = "userID")]
- user_id: String,
- username: String,
-}
-
-#[derive(Serialize, Deserialize)]
-struct UpdateByQuery {
- query: Query,
-
- #[serde(skip_serializing_if = "Option::is_none")]
- script: Option<Script>,
-}
-
-#[derive(Serialize, Deserialize)]
-struct Script {
- source: String,
- lang: String,
-}
-
-#[derive(Serialize, Deserialize)]
-struct Query {
- #[serde(skip_serializing_if = "Option::is_none")]
- r#match: Option<Match>,
+use error::{Error, RecordError};
+use payload::{AttributeValue, EventPayload, OperationType, StreamRecord};
+use query::{Match, Query, Script, Term, UpdateByQuery};
- #[serde(skip_serializing_if = "Option::is_none")]
- term: Option<Term>,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Match {
- #[serde(rename = "userID")]
- user_id: String,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Term {
+#[derive(Serialize, Debug)]
+pub struct User {
#[serde(rename = "userID")]
- user_id: String,
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "UPPERCASE")]
-enum EventName {
- Insert,
- Modify,
- Remove,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "PascalCase")]
-struct StreamRecord {
- new_image: Option<HashMap<String, AttributeValue>>,
- old_image: Option<HashMap<String, AttributeValue>>,
-}
-
-#[derive(Deserialize)]
-enum AttributeValue {
- B(String),
- Bool(bool),
- BS(Vec<String>),
- L(Vec<AttributeValue>),
- M(HashMap<String, AttributeValue>),
- N(String),
- Ns(Vec<String>),
- Null(bool),
- S(String),
- Ss(Vec<String>),
- #[non_exhaustive]
- Unknown,
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "UPPERCASE")]
-enum OperationType {
- Insert,
- Modify,
- Remove,
-}
-
-#[derive(Deserialize)]
-struct Record {
- #[serde(rename = "eventName")]
- event_name: Option<OperationType>,
- dynamodb: Option<StreamRecord>,
-}
-
-#[derive(Deserialize)]
-struct EventPayload {
- #[serde(rename = "Records")]
- records: Vec<Record>,
+ pub user_id: String,
+ pub username: String,
}
#[tokio::main]
-async fn main() -> Result<(), Error> {
+async fn main() -> Result<(), lambda_runtime::Error> {
let filter = EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.with_env_var(constants::LOG_LEVEL_ENV_VAR)
@@ -116,7 +34,7 @@
.finish();
tracing::subscriber::set_global_default(subscriber)
- .expect("Unable to configure tracing");
+ .map_err(|e| Error::TracingError(e))?;
let func = service_fn(func);
lambda_runtime::run(func).await?;
@@ -124,11 +42,11 @@
Ok(())
}
-async fn func(event: LambdaEvent<EventPayload>) -> Result<()> {
+async fn func(event: LambdaEvent<EventPayload>) -> Result<(), error::Error> {
tracing::debug!("Running in debug mode");
- let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
- .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables.");
+ let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
+ .map_err(|e| Error::MissingOpenSearchEndpoint(e))?;
tracing::info!("endpoint: {:?}", endpoint);
@@ -136,16 +54,18 @@
println!("records: {}", &payload.records.len());
for record in payload.records {
- let event_name = record.event_name.expect("failed to get event name");
- let dynamodb = record
- .dynamodb
- .expect("failed to get dynamodb StreamRecord");
+ let event_name = record
+ .event_name
+ .ok_or(Error::PayloadError(RecordError::MissingEventName))?;
+ let dynamodb = record.dynamodb.ok_or(Error::PayloadError(
+ RecordError::MissingDynamoDBStreamRecord,
+ ))?;
let res = match event_name {
- OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?,
- OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?,
- OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?,
- };
+ OperationType::Insert => handle_insert(&dynamodb, &endpoint).await,
+ OperationType::Modify => handle_modify(&dynamodb, &endpoint).await,
+ OperationType::Remove => handle_remove(&dynamodb, &endpoint).await,
+ }?;
match res.status() {
reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => {
@@ -157,10 +77,7 @@
res.status()
);
- return Err(anyhow!(
- "failed to update identity-search index, status: {}",
- res.status()
- ));
+ return Err(Error::UpdateIndexError(res.status()));
}
}
}
@@ -168,10 +85,10 @@
Ok(())
}
-async fn send_request(
+async fn update_index(
url: String,
json_body: String,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
let client = reqwest::Client::new();
client
@@ -180,37 +97,35 @@
.body(json_body)
.send()
.await
- .map_err(|err| {
- anyhow!("failed to update identity-search index, err: {}", err)
+ .map_err(|e| {
+ tracing::error!("Reqwest Error: {:?}", e);
+ Error::ReqwestError(e)
})
}
async fn handle_insert(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle INSERT event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue"
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let user_body = User {
@@ -218,40 +133,39 @@
username: username.clone(),
};
- let json_body =
- serde_json::to_string(&user_body).expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&user_body).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_modify(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle MODIFY event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue",
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -267,29 +181,32 @@
}),
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
+
let url = format!("https://{}/users/_update_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_remove(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle REMOVE event");
let old_image = dynamodb
.old_image
.as_ref()
- .expect("failed to get old image");
+ .ok_or(Error::PayloadError(RecordError::MissingOldImage))?;
let user_id_attribute = old_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
let user_id = match user_id_attribute {
AttributeValue::S(user_id) => user_id,
- _ => return Err(anyhow!("failed to get user_id from AttributeValue")),
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -302,9 +219,11 @@
script: None,
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_delete_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
diff --git a/services/search-index-lambda/src/payload.rs b/services/search-index-lambda/src/payload.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/payload.rs
@@ -0,0 +1,54 @@
+use serde::Deserialize;
+use std::collections::HashMap;
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum EventName {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct StreamRecord {
+ pub new_image: Option<HashMap<String, AttributeValue>>,
+ pub old_image: Option<HashMap<String, AttributeValue>>,
+}
+
+#[derive(Deserialize)]
+pub enum AttributeValue {
+ B(String),
+ Bool(bool),
+ BS(Vec<String>),
+ L(Vec<AttributeValue>),
+ M(HashMap<String, AttributeValue>),
+ N(String),
+ Ns(Vec<String>),
+ Null(bool),
+ S(String),
+ Ss(Vec<String>),
+ #[non_exhaustive]
+ Unknown,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum OperationType {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+pub struct Record {
+ #[serde(rename = "eventName")]
+ pub event_name: Option<OperationType>,
+ pub dynamodb: Option<StreamRecord>,
+}
+
+#[derive(Deserialize)]
+pub struct EventPayload {
+ #[serde(rename = "Records")]
+ pub records: Vec<Record>,
+}
diff --git a/services/search-index-lambda/src/query.rs b/services/search-index-lambda/src/query.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/query.rs
@@ -0,0 +1,36 @@
+use serde::Serialize;
+
+#[derive(Serialize)]
+pub struct UpdateByQuery {
+ pub query: Query,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub script: Option<Script>,
+}
+
+#[derive(Serialize)]
+pub struct Script {
+ pub source: String,
+ pub lang: String,
+}
+
+#[derive(Serialize)]
+pub struct Query {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub r#match: Option<Match>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub term: Option<Term>,
+}
+
+#[derive(Serialize)]
+pub struct Match {
+ #[serde(rename = "userID")]
+ pub user_id: String,
+}
+
+#[derive(Serialize)]
+pub struct Term {
+ #[serde(rename = "userID")]
+ pub user_id: String,
+}

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 3, 8:21 AM (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2610986
Default Alt Text
D10736.id35863.diff (17 KB)

Event Timeline