diff --git a/services/search-index-lambda/Cargo.lock b/services/search-index-lambda/Cargo.lock --- a/services/search-index-lambda/Cargo.lock +++ b/services/search-index-lambda/Cargo.lock @@ -41,12 +41,6 @@ "libc", ] -[[package]] -name = "anyhow" -version = "1.0.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" - [[package]] name = "async-stream" version = "0.3.5" @@ -66,7 +60,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -217,6 +211,12 @@ "windows-targets", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.3" @@ -263,7 +263,7 @@ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.39", ] [[package]] @@ -274,7 +274,7 @@ dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -287,6 +287,19 @@ "serde", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "either" version = "1.9.0" @@ -420,7 +433,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -861,7 +874,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -927,7 +940,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -1080,6 +1093,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.25" @@ -1112,9 +1134,9 @@ name = "search-index-lambda" version = "0.1.0" dependencies = [ - "anyhow", "aws-smithy-types", "aws_lambda_events", + "derive_more", "lambda_runtime", "openssl", "reqwest", @@ -1149,6 +1171,12 @@ "libc", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + [[package]] name = "serde" version = "1.0.193" @@ -1166,7 +1194,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -1238,7 +1266,7 @@ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -1291,6 +1319,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.39" @@ -1415,7 +1454,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -1500,7 +1539,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] @@ -1634,7 +1673,7 @@ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.39", "wasm-bindgen-shared", ] @@ -1668,7 +1707,7 @@ dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/services/search-index-lambda/Cargo.toml b/services/search-index-lambda/Cargo.toml --- a/services/search-index-lambda/Cargo.toml +++ b/services/search-index-lambda/Cargo.toml @@ -16,4 +16,4 @@ serde_derive = "1.0.193" serde = "1.0.193" aws-smithy-types = "1.1.1" -anyhow = "1.0.75" +derive_more = "0.99.17" diff --git a/services/search-index-lambda/src/error.rs b/services/search-index-lambda/src/error.rs new file mode 100644 --- /dev/null +++ b/services/search-index-lambda/src/error.rs @@ -0,0 +1,28 @@ +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum Error { + #[display(fmt = "Payload Error: {:?}", _0)] + PayloadError(RecordError), + #[display(fmt = "Missing OPENSEARCH_ENDPOINT: {:?}", _0)] + MissingOpenSearchEndpoint(std::env::VarError), + #[display(fmt = "Serialization Error: {:?}", _0)] + SerializationError(serde_json::Error), + #[display(fmt = "Tracing Error {:?}", _0)] + TracingError(tracing::subscriber::SetGlobalDefaultError), + #[display(fmt = "Reqwest Error {:?}", _0)] + ReqwestError(reqwest::Error), + #[display(fmt = "Update Index Error: Status Code: {:?}", _0)] + UpdateIndexError(#[error(ignore)] reqwest::StatusCode), +} + +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum RecordError { + InvalidAttributeType, + MissingEventName, + MissingDynamoDBStreamRecord, + MissingNewImage, + MissingOldImage, + MissingUserId, + MissingUsername, +} diff --git a/services/search-index-lambda/src/main.rs b/services/search-index-lambda/src/main.rs --- a/services/search-index-lambda/src/main.rs +++ b/services/search-index-lambda/src/main.rs @@ -1,14 +1,15 @@ -use anyhow::{anyhow, Result}; -use lambda_runtime::{service_fn, Error, LambdaEvent}; +use lambda_runtime::{service_fn, LambdaEvent}; use reqwest::Response; use serde::Serialize; use tracing::{self, Level}; use tracing_subscriber::EnvFilter; mod constants; +mod error; mod payload; mod query; +use error::{Error, RecordError}; use payload::{AttributeValue, EventPayload, OperationType, StreamRecord}; use query::{Match, Query, Script, Term, UpdateByQuery}; @@ -20,7 +21,7 @@ } #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), lambda_runtime::Error> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) @@ -33,7 +34,7 @@ .finish(); tracing::subscriber::set_global_default(subscriber) - .expect("Unable to configure tracing"); + .map_err(|e| Error::TracingError(e))?; let func = service_fn(func); lambda_runtime::run(func).await?; @@ -41,11 +42,11 @@ Ok(()) } -async fn func(event: LambdaEvent) -> Result<()> { +async fn func(event: LambdaEvent) -> Result<(), error::Error> { tracing::debug!("Running in debug mode"); - let endpoint = std::env::var("OPENSEARCH_ENDPOINT") - .expect("An OPENSEARCH_ENDPOINT must be set in this app's Lambda environment variables."); + let endpoint = std::env::var("OPENSEARCH_ENDPOINT") + .map_err(|e| Error::MissingOpenSearchEndpoint(e))?; tracing::info!("endpoint: {:?}", endpoint); @@ -53,16 +54,18 @@ println!("records: {}", &payload.records.len()); for record in payload.records { - let event_name = record.event_name.expect("failed to get event name"); - let dynamodb = record - .dynamodb - .expect("failed to get dynamodb StreamRecord"); + let event_name = record + .event_name + .ok_or(Error::PayloadError(RecordError::MissingEventName))?; + let dynamodb = record.dynamodb.ok_or(Error::PayloadError( + RecordError::MissingDynamoDBStreamRecord, + ))?; let res = match event_name { - OperationType::Insert => handle_insert(&dynamodb, &endpoint).await?, - OperationType::Modify => handle_modify(&dynamodb, &endpoint).await?, - OperationType::Remove => handle_remove(&dynamodb, &endpoint).await?, - }; + OperationType::Insert => handle_insert(&dynamodb, &endpoint).await, + OperationType::Modify => handle_modify(&dynamodb, &endpoint).await, + OperationType::Remove => handle_remove(&dynamodb, &endpoint).await, + }?; match res.status() { reqwest::StatusCode::OK | reqwest::StatusCode::CREATED => { @@ -74,10 +77,7 @@ res.status() ); - return Err(anyhow!( - "failed to update identity-search index, status: {}", - res.status() - )); + return Err(Error::UpdateIndexError(res.status())); } } } @@ -85,10 +85,10 @@ Ok(()) } -async fn send_request( +async fn update_index( url: String, json_body: String, -) -> Result { +) -> Result { let client = reqwest::Client::new(); client @@ -97,37 +97,35 @@ .body(json_body) .send() .await - .map_err(|err| { - anyhow!("failed to update identity-search index, err: {}", err) + .map_err(|e| { + tracing::error!("Reqwest Error: {:?}", e); + Error::ReqwestError(e) }) } async fn handle_insert( dynamodb: &StreamRecord, endpoint: &str, -) -> Result { +) -> Result { tracing::info!("Handle INSERT event"); let new_image = dynamodb .new_image .as_ref() - .expect("failed to get new image"); + .ok_or(Error::PayloadError(RecordError::MissingNewImage))?; let user_id_attribute = new_image .get(constants::DYNAMODB_USER_ID_KEY) - .expect("failed to get userid"); + .ok_or(Error::PayloadError(RecordError::MissingUserId))?; + let username_attribute = new_image .get(constants::DYNAMODB_USERNAME_KEY) - .expect("failed to get username"); + .ok_or(Error::PayloadError(RecordError::MissingUsername))?; let (user_id, username) = match (user_id_attribute, username_attribute) { (AttributeValue::S(user_id), AttributeValue::S(username)) => { (user_id, username) } - _ => { - return Err(anyhow!( - "failed to get user_id and username from AttributeValue" - )) - } + _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)), }; let user_body = User { @@ -135,40 +133,39 @@ username: username.clone(), }; - let json_body = - serde_json::to_string(&user_body).expect("failed to serialize user body"); + let json_body = serde_json::to_string(&user_body).map_err(|e| { + tracing::error!("Serialization Error: {:?}", e); + Error::SerializationError(e) + })?; let url = format!("https://{}/users/_doc/{}", endpoint, user_body.user_id); - send_request(url, json_body).await + update_index(url, json_body).await } async fn handle_modify( dynamodb: &StreamRecord, endpoint: &str, -) -> Result { +) -> Result { tracing::info!("Handle MODIFY event"); let new_image = dynamodb .new_image .as_ref() - .expect("failed to get new image"); + .ok_or(Error::PayloadError(RecordError::MissingNewImage))?; let user_id_attribute = new_image .get(constants::DYNAMODB_USER_ID_KEY) - .expect("failed to get userid"); + .ok_or(Error::PayloadError(RecordError::MissingUserId))?; + let username_attribute = new_image .get(constants::DYNAMODB_USERNAME_KEY) - .expect("failed to get username"); + .ok_or(Error::PayloadError(RecordError::MissingUsername))?; let (user_id, username) = match (user_id_attribute, username_attribute) { (AttributeValue::S(user_id), AttributeValue::S(username)) => { (user_id, username) } - _ => { - return Err(anyhow!( - "failed to get user_id and username from AttributeValue", - )) - } + _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)), }; let update_by_query = UpdateByQuery { @@ -184,29 +181,32 @@ }), }; - let json_body = serde_json::to_string(&update_by_query) - .expect("failed to serialize user body"); + let json_body = serde_json::to_string(&update_by_query).map_err(|e| { + tracing::error!("Serialization Error: {:?}", e); + Error::SerializationError(e) + })?; + let url = format!("https://{}/users/_update_by_query/", endpoint); - send_request(url, json_body).await + update_index(url, json_body).await } async fn handle_remove( dynamodb: &StreamRecord, endpoint: &str, -) -> Result { +) -> Result { tracing::info!("Handle REMOVE event"); let old_image = dynamodb .old_image .as_ref() - .expect("failed to get old image"); + .ok_or(Error::PayloadError(RecordError::MissingOldImage))?; let user_id_attribute = old_image .get(constants::DYNAMODB_USER_ID_KEY) - .expect("failed to get userid"); + .ok_or(Error::PayloadError(RecordError::MissingUserId))?; let user_id = match user_id_attribute { AttributeValue::S(user_id) => user_id, - _ => return Err(anyhow!("failed to get user_id from AttributeValue")), + _ => return Err(Error::PayloadError(RecordError::InvalidAttributeType)), }; let update_by_query = UpdateByQuery { @@ -219,9 +219,11 @@ script: None, }; - let json_body = serde_json::to_string(&update_by_query) - .expect("failed to serialize user body"); + let json_body = serde_json::to_string(&update_by_query).map_err(|e| { + tracing::error!("Serialization Error: {:?}", e); + Error::SerializationError(e) + })?; let url = format!("https://{}/users/_delete_by_query/", endpoint); - send_request(url, json_body).await + update_index(url, json_body).await }