Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3400650
D10736.id35863.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Referenced Files
None
Subscribers
None
D10736.id35863.diff
View Options
diff --git a/services/search-index-lambda/Cargo.lock b/services/search-index-lambda/Cargo.lock
--- a/services/search-index-lambda/Cargo.lock
+++ b/services/search-index-lambda/Cargo.lock
@@ -41,12 +41,6 @@
"libc",
]
-[[package]]
-name = "anyhow"
-version = "1.0.75"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
-
[[package]]
name = "async-stream"
version = "0.3.5"
@@ -66,7 +60,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -217,6 +211,12 @@
"windows-targets",
]
+[[package]]
+name = "convert_case"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
+
[[package]]
name = "core-foundation"
version = "0.9.3"
@@ -263,7 +263,7 @@
"proc-macro2",
"quote",
"strsim",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -274,7 +274,7 @@
dependencies = [
"darling_core",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -287,6 +287,19 @@
"serde",
]
+[[package]]
+name = "derive_more"
+version = "0.99.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
+dependencies = [
+ "convert_case",
+ "proc-macro2",
+ "quote",
+ "rustc_version",
+ "syn 1.0.109",
+]
+
[[package]]
name = "either"
version = "1.9.0"
@@ -420,7 +433,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -861,7 +874,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -927,7 +940,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1080,6 +1093,15 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
[[package]]
name = "rustix"
version = "0.38.25"
@@ -1112,9 +1134,9 @@
name = "search-index-lambda"
version = "0.1.0"
dependencies = [
- "anyhow",
"aws-smithy-types",
"aws_lambda_events",
+ "derive_more",
"lambda_runtime",
"openssl",
"reqwest",
@@ -1149,6 +1171,12 @@
"libc",
]
+[[package]]
+name = "semver"
+version = "1.0.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
+
[[package]]
name = "serde"
version = "1.0.193"
@@ -1166,7 +1194,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1238,7 +1266,7 @@
"darling",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1291,6 +1319,17 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+[[package]]
+name = "syn"
+version = "1.0.109"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
[[package]]
name = "syn"
version = "2.0.39"
@@ -1415,7 +1454,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1500,7 +1539,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1634,7 +1673,7 @@
"once_cell",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-shared",
]
@@ -1668,7 +1707,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml
--- a/services/search-index-lambda/Cargo.toml
+++ b/services/search-index-lambda/Cargo.toml
@@ -16,4 +16,4 @@
serde_derive = "1.0.193"
serde = "1.0.193"
aws-smithy-types = "1.1.1"
-anyhow = "1.0.75"
+derive_more = "0.99.17"
diff --git a/services/search-index-lambda/src/error.rs b/services/search-index-lambda/src/error.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/error.rs
@@ -0,0 +1,28 @@
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum Error {
+ #[display(fmt = "Payload Error: {:?}", _0)]
+ PayloadError(RecordError),
+ #[display(fmt = "Missing OPENSEARCH_ENDPOINT: {:?}", _0)]
+ MissingOpenSearchEndpoint(std::env::VarError),
+ #[display(fmt = "Serialization Error: {:?}", _0)]
+ SerializationError(serde_json::Error),
+ #[display(fmt = "Tracing Error {:?}", _0)]
+ TracingError(tracing::subscriber::SetGlobalDefaultError),
+ #[display(fmt = "Reqwest Error {:?}", _0)]
+ ReqwestError(reqwest::Error),
+ #[display(fmt = "Update Index Error: Status Code: {:?}", _0)]
+ UpdateIndexError(#[error(ignore)] reqwest::StatusCode),
+}
+
+#[derive(Debug, derive_more::Display, derive_more::Error)]
+pub enum RecordError {
+ InvalidAttributeType,
+ MissingEventName,
+ MissingDynamoDBStreamRecord,
+ MissingNewImage,
+ MissingOldImage,
+ MissingUserId,
+ MissingUsername,
+}
diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs
--- a/services/search-index-lambda/src/main.rs
+++ b/services/search-index-lambda/src/main.rs
@@ -1,109 +1,27 @@
-use anyhow::{anyhow, Result};
-use lambda_runtime::{service_fn, Error, LambdaEvent};
+use lambda_runtime::{service_fn, LambdaEvent};
use reqwest::Response;
-use serde::{Deserialize, Serialize};
-use std::collections::HashMap;
+use serde_derive::Serialize;
use tracing::{self, Level};
use tracing_subscriber::EnvFilter;
mod constants;
+mod error;
+mod payload;
+mod query;
-#[derive(Deserialize, Serialize, Debug)]
-struct User {
- #[serde(rename = "userID")]
- user_id: String,
- username: String,
-}
-
-#[derive(Serialize, Deserialize)]
-struct UpdateByQuery {
- query: Query,
-
- #[serde(skip_serializing_if = "Option::is_none")]
- script: Option<Script>,
-}
-
-#[derive(Serialize, Deserialize)]
-struct Script {
- source: String,
- lang: String,
-}
-
-#[derive(Serialize, Deserialize)]
-struct Query {
- #[serde(skip_serializing_if = "Option::is_none")]
- r#match: Option<Match>,
+use error::{Error, RecordError};
+use payload::{AttributeValue, EventPayload, OperationType, StreamRecord};
+use query::{Match, Query, Script, Term, UpdateByQuery};
- #[serde(skip_serializing_if = "Option::is_none")]
- term: Option<Term>,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Match {
- #[serde(rename = "userID")]
- user_id: String,
-}
-
-#[derive(Deserialize, Serialize)]
-struct Term {
+#[derive(Serialize, Debug)]
+pub struct User {
#[serde(rename = "userID")]
- user_id: String,
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "UPPERCASE")]
-enum EventName {
- Insert,
- Modify,
- Remove,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "PascalCase")]
-struct StreamRecord {
- new_image: Option<HashMap<String, AttributeValue>>,
- old_image: Option<HashMap<String, AttributeValue>>,
-}
-
-#[derive(Deserialize)]
-enum AttributeValue {
- B(String),
- Bool(bool),
- BS(Vec<String>),
- L(Vec<AttributeValue>),
- M(HashMap<String, AttributeValue>),
- N(String),
- Ns(Vec<String>),
- Null(bool),
- S(String),
- Ss(Vec<String>),
- #[non_exhaustive]
- Unknown,
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "UPPERCASE")]
-enum OperationType {
- Insert,
- Modify,
- Remove,
-}
-
-#[derive(Deserialize)]
-struct Record {
- #[serde(rename = "eventName")]
- event_name: Option<OperationType>,
- dynamodb: Option<StreamRecord>,
-}
-
-#[derive(Deserialize)]
-struct EventPayload {
- #[serde(rename = "Records")]
- records: Vec<Record>,
+ pub user_id: String,
+ pub username: String,
}
#[tokio::main]
-async fn main() -> Result<(), Error> {
+async fn main() -> Result<(), lambda_runtime::Error> {
let filter = EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.with_env_var(constants::LOG_LEVEL_ENV_VAR)
@@ -116,7 +34,7 @@
.finish();
tracing::subscriber::set_global_default(subscriber)
- .expect("Unable to configure tracing");
+ .map_err(|e| Error::TracingError(e))?;
let func = service_fn(func);
lambda_runtime::run(func).await?;
@@ -124,11 +42,11 @@
Ok(())
}
-async fn func(event: LambdaEvent<EventPayload>) -> Result<()> {
+async fn func(event: LambdaEvent<EventPayload>) -> Result<(), error::Error> {
tracing::debug!("Running in debug mode");
- let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
- .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables.");
+ let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
+ .map_err(|e| Error::MissingOpenSearchEndpoint(e))?;
tracing::info!("endpoint: {:?}", endpoint);
@@ -136,16 +54,18 @@
println!("records: {}", &payload.records.len());
for record in payload.records {
- let event_name = record.event_name.expect("failed to get event name");
- let dynamodb = record
- .dynamodb
- .expect("failed to get dynamodb StreamRecord");
+ let event_name = record
+ .event_name
+ .ok_or(Error::PayloadError(RecordError::MissingEventName))?;
+ let dynamodb = record.dynamodb.ok_or(Error::PayloadError(
+ RecordError::MissingDynamoDBStreamRecord,
+ ))?;
let res = match event_name {
- OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?,
- OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?,
- OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?,
- };
+ OperationType::Insert => handle_insert(&dynamodb, &endpoint).await,
+ OperationType::Modify => handle_modify(&dynamodb, &endpoint).await,
+ OperationType::Remove => handle_remove(&dynamodb, &endpoint).await,
+ }?;
match res.status() {
reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => {
@@ -157,10 +77,7 @@
res.status()
);
- return Err(anyhow!(
- "failed to update identity-search index, status: {}",
- res.status()
- ));
+ return Err(Error::UpdateIndexError(res.status()));
}
}
}
@@ -168,10 +85,10 @@
Ok(())
}
-async fn send_request(
+async fn update_index(
url: String,
json_body: String,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
let client = reqwest::Client::new();
client
@@ -180,37 +97,35 @@
.body(json_body)
.send()
.await
- .map_err(|err| {
- anyhow!("failed to update identity-search index, err: {}", err)
+ .map_err(|e| {
+ tracing::error!("Reqwest Error: {:?}", e);
+ Error::ReqwestError(e)
})
}
async fn handle_insert(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle INSERT event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue"
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let user_body = User {
@@ -218,40 +133,39 @@
username: username.clone(),
};
- let json_body =
- serde_json::to_string(&user_body).expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&user_body).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_modify(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle MODIFY event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue",
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -267,29 +181,32 @@
}),
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
+
let url = format!("https://{}/users/_update_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_remove(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle REMOVE event");
let old_image = dynamodb
.old_image
.as_ref()
- .expect("failed to get old image");
+ .ok_or(Error::PayloadError(RecordError::MissingOldImage))?;
let user_id_attribute = old_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
let user_id = match user_id_attribute {
AttributeValue::S(user_id) => user_id,
- _ => return Err(anyhow!("failed to get user_id from AttributeValue")),
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -302,9 +219,11 @@
script: None,
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_delete_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
diff --git a/services/search-index-lambda/src/payload.rs b/services/search-index-lambda/src/payload.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/payload.rs
@@ -0,0 +1,54 @@
+use serde::Deserialize;
+use std::collections::HashMap;
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum EventName {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct StreamRecord {
+ pub new_image: Option<HashMap<String, AttributeValue>>,
+ pub old_image: Option<HashMap<String, AttributeValue>>,
+}
+
+#[derive(Deserialize)]
+pub enum AttributeValue {
+ B(String),
+ Bool(bool),
+ BS(Vec<String>),
+ L(Vec<AttributeValue>),
+ M(HashMap<String, AttributeValue>),
+ N(String),
+ Ns(Vec<String>),
+ Null(bool),
+ S(String),
+ Ss(Vec<String>),
+ #[non_exhaustive]
+ Unknown,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum OperationType {
+ Insert,
+ Modify,
+ Remove,
+}
+
+#[derive(Deserialize)]
+pub struct Record {
+ #[serde(rename = "eventName")]
+ pub event_name: Option<OperationType>,
+ pub dynamodb: Option<StreamRecord>,
+}
+
+#[derive(Deserialize)]
+pub struct EventPayload {
+ #[serde(rename = "Records")]
+ pub records: Vec<Record>,
+}
diff --git a/services/search-index-lambda/src/query.rs b/services/search-index-lambda/src/query.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/query.rs
@@ -0,0 +1,36 @@
+use serde::Serialize;
+
+#[derive(Serialize)]
+pub struct UpdateByQuery {
+ pub query: Query,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub script: Option<Script>,
+}
+
+#[derive(Serialize)]
+pub struct Script {
+ pub source: String,
+ pub lang: String,
+}
+
+#[derive(Serialize)]
+pub struct Query {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub r#match: Option<Match>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub term: Option<Term>,
+}
+
+#[derive(Serialize)]
+pub struct Match {
+ #[serde(rename = "userID")]
+ pub user_id: String,
+}
+
+#[derive(Serialize)]
+pub struct Term {
+ #[serde(rename = "userID")]
+ pub user_id: String,
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 3, 8:21 AM (19 h, 52 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2610986
Default Alt Text
D10736.id35863.diff (17 KB)
Attached To
Mode
D10736: [services] [2/n] Replace panic with custom errors on search index lambda
Attached
Detach File
Event Timeline
Log In to Comment