Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/database.rs
use aws_sdk_dynamodb::{ | use aws_sdk_dynamodb::{ | ||||
model::AttributeValue, output::GetItemOutput, Error as DynamoDBError, | model::AttributeValue, output::GetItemOutput, Error as DynamoDBError, | ||||
}; | }; | ||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
use comm_services_lib::database::{self, DBItemError}; | |||||
use std::{ | use std::{ | ||||
collections::HashMap, | collections::HashMap, | ||||
fmt::{Display, Formatter}, | fmt::{Display, Formatter}, | ||||
sync::Arc, | sync::Arc, | ||||
}; | }; | ||||
use tracing::error; | use tracing::error; | ||||
use crate::{ | use crate::{ | ||||
▲ Show 20 Lines • Show All 83 Lines • ▼ Show 20 Lines | match self | ||||
.map_err(|e| { | .map_err(|e| { | ||||
error!("DynamoDB client failed to find blob item"); | error!("DynamoDB client failed to find blob item"); | ||||
Error::AwsSdk(e.into()) | Error::AwsSdk(e.into()) | ||||
})? { | })? { | ||||
GetItemOutput { | GetItemOutput { | ||||
item: Some(mut item), | item: Some(mut item), | ||||
.. | .. | ||||
} => { | } => { | ||||
let blob_hash = parse_string_attribute( | let blob_hash = database::parse_string_attribute( | ||||
BLOB_TABLE_BLOB_HASH_FIELD, | BLOB_TABLE_BLOB_HASH_FIELD, | ||||
item.remove(BLOB_TABLE_BLOB_HASH_FIELD), | item.remove(BLOB_TABLE_BLOB_HASH_FIELD), | ||||
)?; | )?; | ||||
let s3_path = parse_string_attribute( | let s3_path = database::parse_string_attribute( | ||||
BLOB_TABLE_S3_PATH_FIELD, | BLOB_TABLE_S3_PATH_FIELD, | ||||
item.remove(BLOB_TABLE_S3_PATH_FIELD), | item.remove(BLOB_TABLE_S3_PATH_FIELD), | ||||
)?; | )?; | ||||
let created = parse_datetime_attribute( | let created = database::parse_datetime_attribute( | ||||
BLOB_TABLE_CREATED_FIELD, | BLOB_TABLE_CREATED_FIELD, | ||||
item.remove(BLOB_TABLE_CREATED_FIELD), | item.remove(BLOB_TABLE_CREATED_FIELD), | ||||
)?; | )?; | ||||
Ok(Some(BlobItem { | Ok(Some(BlobItem { | ||||
blob_hash, | blob_hash, | ||||
s3_path: S3Path::from_full_path(&s3_path) | s3_path: S3Path::from_full_path(&s3_path) | ||||
.map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, | .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, | ||||
created, | created, | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | match self | ||||
.map_err(|e| { | .map_err(|e| { | ||||
error!("DynamoDB client failed to find reverse index by holder"); | error!("DynamoDB client failed to find reverse index by holder"); | ||||
Error::AwsSdk(e.into()) | Error::AwsSdk(e.into()) | ||||
})? { | })? { | ||||
GetItemOutput { | GetItemOutput { | ||||
item: Some(mut item), | item: Some(mut item), | ||||
.. | .. | ||||
} => { | } => { | ||||
let holder = parse_string_attribute( | let holder = database::parse_string_attribute( | ||||
BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, | BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, | ||||
item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), | item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), | ||||
)?; | )?; | ||||
let blob_hash = parse_string_attribute( | let blob_hash = database::parse_string_attribute( | ||||
BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, | BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, | ||||
item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), | item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), | ||||
)?; | )?; | ||||
Ok(Some(ReverseIndexItem { holder, blob_hash })) | Ok(Some(ReverseIndexItem { holder, blob_hash })) | ||||
} | } | ||||
_ => Ok(None), | _ => Ok(None), | ||||
} | } | ||||
Show All 26 Lines | ) -> Result<Vec<ReverseIndexItem>, Error> { | ||||
if response.count == 0 { | if response.count == 0 { | ||||
return Ok(vec![]); | return Ok(vec![]); | ||||
} | } | ||||
let mut results: Vec<ReverseIndexItem> = | let mut results: Vec<ReverseIndexItem> = | ||||
Vec::with_capacity(response.count() as usize); | Vec::with_capacity(response.count() as usize); | ||||
for mut item in response.items.unwrap_or_default() { | for mut item in response.items.unwrap_or_default() { | ||||
let holder = parse_string_attribute( | let holder = database::parse_string_attribute( | ||||
BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, | BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, | ||||
item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), | item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), | ||||
)?; | )?; | ||||
let blob_hash = parse_string_attribute( | let blob_hash = database::parse_string_attribute( | ||||
BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, | BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, | ||||
item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), | item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), | ||||
)?; | )?; | ||||
results.push(ReverseIndexItem { holder, blob_hash }); | results.push(ReverseIndexItem { holder, blob_hash }); | ||||
} | } | ||||
return Ok(results); | return Ok(results); | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | match self { | ||||
write!(f, "Item for given holder [{}] already exists", holder) | write!(f, "Item for given holder [{}] already exists", holder) | ||||
} | } | ||||
BlobDBError::InvalidS3Path(err) => err.fmt(f), | BlobDBError::InvalidS3Path(err) => err.fmt(f), | ||||
} | } | ||||
} | } | ||||
} | } | ||||
impl std::error::Error for BlobDBError {} | impl std::error::Error for BlobDBError {} | ||||
#[derive(Debug, derive_more::Error, derive_more::Constructor)] | |||||
pub struct DBItemError { | |||||
attribute_name: &'static str, | |||||
attribute_value: Option<AttributeValue>, | |||||
attribute_error: DBItemAttributeError, | |||||
} | |||||
impl Display for DBItemError { | |||||
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { | |||||
match &self.attribute_error { | |||||
DBItemAttributeError::Missing => { | |||||
write!(f, "Attribute {} is missing", self.attribute_name) | |||||
} | |||||
DBItemAttributeError::IncorrectType => write!( | |||||
f, | |||||
"Value for attribute {} has incorrect type: {:?}", | |||||
self.attribute_name, self.attribute_value | |||||
), | |||||
error => write!( | |||||
f, | |||||
"Error regarding attribute {} with value {:?}: {}", | |||||
self.attribute_name, self.attribute_value, error | |||||
), | |||||
} | |||||
} | |||||
} | |||||
#[derive(Debug, derive_more::Display, derive_more::Error)] | |||||
pub enum DBItemAttributeError { | |||||
#[display(...)] | |||||
Missing, | |||||
#[display(...)] | |||||
IncorrectType, | |||||
#[display(...)] | |||||
InvalidTimestamp(chrono::ParseError), | |||||
} | |||||
fn parse_string_attribute( | |||||
attribute_name: &'static str, | |||||
attribute_value: Option<AttributeValue>, | |||||
) -> Result<String, DBItemError> { | |||||
match attribute_value { | |||||
Some(AttributeValue::S(value)) => Ok(value), | |||||
Some(_) => Err(DBItemError::new( | |||||
attribute_name, | |||||
attribute_value, | |||||
DBItemAttributeError::IncorrectType, | |||||
)), | |||||
None => Err(DBItemError::new( | |||||
attribute_name, | |||||
attribute_value, | |||||
DBItemAttributeError::Missing, | |||||
)), | |||||
} | |||||
} | |||||
fn parse_datetime_attribute( | |||||
attribute_name: &'static str, | |||||
attribute_value: Option<AttributeValue>, | |||||
) -> Result<DateTime<Utc>, DBItemError> { | |||||
if let Some(AttributeValue::S(datetime)) = &attribute_value { | |||||
// parse() accepts a relaxed RFC3339 string | |||||
datetime.parse().map_err(|e| { | |||||
DBItemError::new( | |||||
attribute_name, | |||||
attribute_value, | |||||
DBItemAttributeError::InvalidTimestamp(e), | |||||
) | |||||
}) | |||||
} else { | |||||
Err(DBItemError::new( | |||||
attribute_name, | |||||
attribute_value, | |||||
DBItemAttributeError::Missing, | |||||
)) | |||||
} | |||||
} |