Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3400146
D10736.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Referenced Files
None
Subscribers
None
D10736.diff
View Options
diff --git a/services/search-index-lambda/Cargo.lock b/services/search-index-lambda/Cargo.lock
--- a/services/search-index-lambda/Cargo.lock
+++ b/services/search-index-lambda/Cargo.lock
@@ -41,12 +41,6 @@
"libc",
]
-[[package]]
-name = "anyhow"
-version = "1.0.79"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
-
[[package]]
name = "async-stream"
version = "0.3.5"
@@ -66,7 +60,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -217,6 +211,12 @@
"windows-targets",
]
+[[package]]
+name = "convert_case"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
+
[[package]]
name = "core-foundation"
version = "0.9.3"
@@ -263,7 +263,7 @@
"proc-macro2",
"quote",
"strsim",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -274,7 +274,7 @@
dependencies = [
"darling_core",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -287,6 +287,19 @@
"serde",
]
+[[package]]
+name = "derive_more"
+version = "0.99.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
+dependencies = [
+ "convert_case",
+ "proc-macro2",
+ "quote",
+ "rustc_version",
+ "syn 1.0.109",
+]
+
[[package]]
name = "either"
version = "1.9.0"
@@ -420,7 +433,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -861,7 +874,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -927,7 +940,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1080,6 +1093,15 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
[[package]]
name = "rustix"
version = "0.38.25"
@@ -1112,9 +1134,9 @@
name = "search-index-lambda"
version = "0.1.0"
dependencies = [
- "anyhow",
"aws-smithy-types",
"aws_lambda_events",
+ "derive_more",
"lambda_runtime",
"openssl",
"reqwest",
@@ -1149,6 +1171,12 @@
"libc",
]
+[[package]]
+name = "semver"
+version = "1.0.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
+
[[package]]
name = "serde"
version = "1.0.193"
@@ -1166,7 +1194,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1238,7 +1266,7 @@
"darling",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1291,6 +1319,17 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+[[package]]
+name = "syn"
+version = "1.0.109"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
[[package]]
name = "syn"
version = "2.0.39"
@@ -1415,7 +1454,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1500,7 +1539,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
]
[[package]]
@@ -1634,7 +1673,7 @@
"once_cell",
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-shared",
]
@@ -1668,7 +1707,7 @@
dependencies = [
"proc-macro2",
"quote",
- "syn",
+ "syn 2.0.39",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml
--- a/services/search-index-lambda/Cargo.toml
+++ b/services/search-index-lambda/Cargo.toml
@@ -16,4 +16,4 @@
serde_derive = "1.0.193"
serde = "1.0.193"
aws-smithy-types = "1.1.1"
-anyhow = "1.0.75"
+derive_more = "0.99.17"
diff --git a/services/search-index-lambda/src/error.rs b/services/search-index-lambda/src/error.rs
new file mode 100644
--- /dev/null
+++ b/services/search-index-lambda/src/error.rs
@@ -0,0 +1,28 @@
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum Error {
+ #[display(fmt = "Payload Error: {:?}", _0)]
+ PayloadError(RecordError),
+ #[display(fmt = "Missing OPENSEARCH_ENDPOINT: {:?}", _0)]
+ MissingOpenSearchEndpoint(std::env::VarError),
+ #[display(fmt = "Serialization Error: {:?}", _0)]
+ SerializationError(serde_json::Error),
+ #[display(fmt = "Tracing Error {:?}", _0)]
+ TracingError(tracing::subscriber::SetGlobalDefaultError),
+ #[display(fmt = "Reqwest Error {:?}", _0)]
+ ReqwestError(reqwest::Error),
+ #[display(fmt = "Update Index Error: Status Code: {:?}", _0)]
+ UpdateIndexError(#[error(ignore)] reqwest::StatusCode),
+}
+
+#[derive(Debug, derive_more::Display, derive_more::Error)]
+pub enum RecordError {
+ InvalidAttributeType,
+ MissingEventName,
+ MissingDynamoDBStreamRecord,
+ MissingNewImage,
+ MissingOldImage,
+ MissingUserId,
+ MissingUsername,
+}
diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs
--- a/services/search-index-lambda/src/main.rs
+++ b/services/search-index-lambda/src/main.rs
@@ -1,14 +1,15 @@
-use anyhow::{anyhow, Result};
-use lambda_runtime::{service_fn, Error, LambdaEvent};
+use lambda_runtime::{service_fn, LambdaEvent};
use reqwest::Response;
use serde::Serialize;
use tracing::{self, Level};
use tracing_subscriber::EnvFilter;
mod constants;
+mod error;
mod payload;
mod query;
+use error::{Error, RecordError};
use payload::{AttributeValue, EventPayload, OperationType, StreamRecord};
use query::{Match, Query, Script, Term, UpdateByQuery};
@@ -20,7 +21,7 @@
}
#[tokio::main]
-async fn main() -> Result<(), Error> {
+async fn main() -> Result<(), lambda_runtime::Error> {
let filter = EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.with_env_var(constants::LOG_LEVEL_ENV_VAR)
@@ -33,7 +34,7 @@
.finish();
tracing::subscriber::set_global_default(subscriber)
- .expect("Unable to configure tracing");
+ .map_err(|e| Error::TracingError(e))?;
let func = service_fn(func);
lambda_runtime::run(func).await?;
@@ -41,11 +42,11 @@
Ok(())
}
-async fn func(event: LambdaEvent<EventPayload>) -> Result<()> {
+async fn func(event: LambdaEvent<EventPayload>) -> Result<(), error::Error> {
tracing::debug!("Running in debug mode");
- let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
- .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables.");
+ let endpoint = std::env::var("OPENSEARCH_ENDPOINT")
+ .map_err(|e| Error::MissingOpenSearchEndpoint(e))?;
tracing::info!("endpoint: {:?}", endpoint);
@@ -53,16 +54,18 @@
println!("records: {}", &payload.records.len());
for record in payload.records {
- let event_name = record.event_name.expect("failed to get event name");
- let dynamodb = record
- .dynamodb
- .expect("failed to get dynamodb StreamRecord");
+ let event_name = record
+ .event_name
+ .ok_or(Error::PayloadError(RecordError::MissingEventName))?;
+ let dynamodb = record.dynamodb.ok_or(Error::PayloadError(
+ RecordError::MissingDynamoDBStreamRecord,
+ ))?;
let res = match event_name {
- OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?,
- OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?,
- OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?,
- };
+ OperationType::Insert => handle_insert(&dynamodb, &endpoint).await,
+ OperationType::Modify => handle_modify(&dynamodb, &endpoint).await,
+ OperationType::Remove => handle_remove(&dynamodb, &endpoint).await,
+ }?;
match res.status() {
reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => {
@@ -74,10 +77,7 @@
res.status()
);
- return Err(anyhow!(
- "failed to update identity-search index, status: {}",
- res.status()
- ));
+ return Err(Error::UpdateIndexError(res.status()));
}
}
}
@@ -85,10 +85,10 @@
Ok(())
}
-async fn send_request(
+async fn update_index(
url: String,
json_body: String,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
let client = reqwest::Client::new();
client
@@ -97,37 +97,35 @@
.body(json_body)
.send()
.await
- .map_err(|err| {
- anyhow!("failed to update identity-search index, err: {}", err)
+ .map_err(|e| {
+ tracing::error!("Reqwest Error: {:?}", e);
+ Error::ReqwestError(e)
})
}
async fn handle_insert(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle INSERT event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue"
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let user_body = User {
@@ -135,40 +133,39 @@
username: username.clone(),
};
- let json_body =
- serde_json::to_string(&user_body).expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&user_body).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_modify(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle MODIFY event");
let new_image = dynamodb
.new_image
.as_ref()
- .expect("failed to get new image");
+ .ok_or(Error::PayloadError(RecordError::MissingNewImage))?;
let user_id_attribute = new_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
+
let username_attribute = new_image
.get(constants::DYNAMODB_USERNAME_KEY)
- .expect("failed to get username");
+ .ok_or(Error::PayloadError(RecordError::MissingUsername))?;
let (user_id, username) = match (user_id_attribute, username_attribute) {
(AttributeValue::S(user_id), AttributeValue::S(username)) => {
(user_id, username)
}
- _ => {
- return Err(anyhow!(
- "failed to get user_id and username from AttributeValue",
- ))
- }
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -184,29 +181,32 @@
}),
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
+
let url = format!("https://{}/users/_update_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
async fn handle_remove(
dynamodb: &StreamRecord,
endpoint: &str,
-) -> Result<Response, anyhow::Error> {
+) -> Result<Response, error::Error> {
tracing::info!("Handle REMOVE event");
let old_image = dynamodb
.old_image
.as_ref()
- .expect("failed to get old image");
+ .ok_or(Error::PayloadError(RecordError::MissingOldImage))?;
let user_id_attribute = old_image
.get(constants::DYNAMODB_USER_ID_KEY)
- .expect("failed to get userid");
+ .ok_or(Error::PayloadError(RecordError::MissingUserId))?;
let user_id = match user_id_attribute {
AttributeValue::S(user_id) => user_id,
- _ => return Err(anyhow!("failed to get user_id from AttributeValue")),
+ _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)),
};
let update_by_query = UpdateByQuery {
@@ -219,9 +219,11 @@
script: None,
};
- let json_body = serde_json::to_string(&update_by_query)
- .expect("failed to serialize user body");
+ let json_body = serde_json::to_string(&update_by_query).map_err(|e| {
+ tracing::error!("Serialization Error: {:?}", e);
+ Error::SerializationError(e)
+ })?;
let url = format!("https://{}/users/_delete_by_query/", endpoint);
- send_request(url, json_body).await
+ update_index(url, json_body).await
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 3, 6:23 AM (17 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2610573
Default Alt Text
D10736.diff (13 KB)
Attached To
Mode
D10736: [services] [2/n] Replace panic with custom errors on search index lambda
Attached
Detach File
Event Timeline
Log In to Comment