diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs index 160687708..7b37222f3 100644 --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -1,289 +1,289 @@ use aws_sdk_dynamodb::{ operation::put_item::PutItemOutput, types::{AttributeValue, Delete, TransactWriteItem, Update}, Error as DynamoDBError, }; use chrono::Utc; -use comm_services_lib::database::parse_string_attribute; +use comm_services_lib::database::TryFromAttribute; use std::{collections::HashMap, sync::Arc}; use tracing::{debug, error, trace}; use crate::constants::db::*; use super::errors::{BlobDBError, Error as DBError}; use super::types::*; #[derive(Clone)] pub struct DatabaseClient { ddb: Arc, } /// public interface implementation impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { ddb: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } /// Gets a blob item row from the database by its blob hash /// Returns None if the blob item is not found pub async fn get_blob_item( &self, blob_hash: impl Into, ) -> DBResult> { let key = PrimaryKey::for_blob_item(blob_hash); self .get_raw_item(key.clone()) .await? .map(BlobItemRow::try_from) .transpose() } /// Inserts a new blob item row into the database. Returns Error /// if the item already exists. pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> { let item = HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( ATTR_HOLDER.to_string(), AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()), ), ( ATTR_S3_PATH.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes blob item row. Doesn't delete its holders. pub async fn delete_blob_item( &self, blob_hash: impl Into, ) -> DBResult<()> { let key = PrimaryKey::for_blob_item(blob_hash); self .ddb .delete_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to delete blob item: {:?}", err); DBError::AwsSdk(err.into()) })?; Ok(()) } // Inserts a new holder assignment row into the database. Returns Error // if the item already exists or holder format is invalid. pub async fn put_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let item = HashMap::from([ (ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_hash)), (ATTR_HOLDER.to_string(), AttributeValue::S(holder)), (ATTR_UNCHECKED.to_string(), UncheckedKind::Holder.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes a holder assignment row from the table. /// If the blob item for given holder assignment exists, it will be marked as unchecked. /// /// Returns Error if the holder format is invalid or race condition happened. /// Doesn't fail if the holder assignment didn't exist before. pub async fn delete_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let mut transaction = Vec::new(); // delete the holder row let assignment_key = PrimaryKey { blob_hash: blob_hash.clone(), holder: holder.into(), }; let delete_request = Delete::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(assignment_key.into())) .build(); transaction .push(TransactWriteItem::builder().delete(delete_request).build()); // mark the blob item as unchecked if exists let blob_primary_key = PrimaryKey::for_blob_item(blob_hash); if self.get_raw_item(blob_primary_key.clone()).await?.is_some() { let update_request = Update::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(blob_primary_key.into())) // even though we checked that the blob item exists, we still need to check it again // using DDB built-in conditions in case it was deleted in meantime .condition_expression( "attribute_exists(#blob_hash) AND attribute_exists(#holder)", ) .update_expression("SET #unchecked = :unchecked, #last_modified = :now") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":unchecked", UncheckedKind::Blob.into()) .expression_attribute_values( ":now", AttributeValue::N(Utc::now().timestamp_millis().to_string()), ) .build(); transaction .push(TransactWriteItem::builder().update(update_request).build()); } self .ddb .transact_write_items() .set_transact_items(Some(transaction)) .send() .await .map_err(|err| { debug!("DynamoDB client failed to run transaction: {:?}", err); DBError::AwsSdk(err.into()) })?; Ok(()) } /// Queries the table for a list of holders for given blob hash. /// Optionally limits the number of results. pub async fn list_blob_holders( &self, blob_hash: impl Into, limit: Option, ) -> DBResult> { let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .projection_expression("#holder") .key_condition_expression("#blob_hash = :blob_hash") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_values( ":blob_hash", AttributeValue::S(blob_hash.into()), ) .consistent_read(true) .set_limit(limit) .send() .await .map_err(|err| { error!("DynamoDB client failed to query holders: {:?}", err); DBError::AwsSdk(err.into()) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .filter_map(|mut row| { // filter out rows that are blob items // we cannot do it in key condition expression - it doesn't support the <> operator // filter expression doesn't work either - it doesn't support filtering by sort key - match parse_string_attribute(ATTR_HOLDER, row.remove(ATTR_HOLDER)) { + match String::try_from_attr(ATTR_HOLDER, row.remove(ATTR_HOLDER)) { Ok(value) if value.as_str() == BLOB_ITEM_ROW_HOLDER_VALUE => None, holder => Some(holder), } }) .collect::, _>>() .map_err(|err| DBError::Attribute(err)) } } // private helpers impl DatabaseClient { /// inserts a new item into the table using PutItem. Returns /// error if the item already exists async fn insert_item( &self, mut item: RawAttributes, ) -> DBResult { // add metadata attributes common for all types of rows let now = Utc::now().timestamp_millis(); item.insert( ATTR_CREATED_AT.to_string(), AttributeValue::N(now.to_string()), ); item.insert( ATTR_LAST_MODIFIED.to_string(), AttributeValue::N(now.to_string()), ); self .ddb .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) // make sure we don't accidentaly overwrite existing row .condition_expression( "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)", ) .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .send() .await .map_err(|err| match DynamoDBError::from(err) { DynamoDBError::ConditionalCheckFailedException(e) => { debug!("DynamoDB client failed to insert: item already exists"); trace!("Conditional check failed with error: {}", e); DBError::ItemAlreadyExists } err => { debug!("DynamoDB client failed to insert: {:?}", err); DBError::AwsSdk(err) } }) } /// Gets a single row from the table using GetItem, without parsing it async fn get_raw_item( &self, key: PrimaryKey, ) -> DBResult> { self .ddb .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to get item: {:?}", err); DBError::AwsSdk(err.into()) }) .map(|response| response.item) } } fn validate_holder(holder: &str) -> DBResult<()> { if holder == BLOB_ITEM_ROW_HOLDER_VALUE { debug!("Invalid holder: {}", holder); return Err(DBError::Blob(BlobDBError::InvalidInput(holder.to_string()))); } Ok(()) } diff --git a/services/blob/src/database/old.rs b/services/blob/src/database/old.rs index 4d5c743a9..452f99440 100644 --- a/services/blob/src/database/old.rs +++ b/services/blob/src/database/old.rs @@ -1,306 +1,308 @@ +#![allow(deprecated)] + use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::AttributeValue, }; use chrono::{DateTime, Utc}; use comm_services_lib::database; use std::{collections::HashMap, sync::Arc}; use tracing::error; use crate::{ config::CONFIG, constants::{ BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, s3::S3Path, }; use super::errors::{BlobDBError, Error}; #[derive(Clone, Debug)] pub struct BlobItem { pub blob_hash: String, pub s3_path: S3Path, pub created: DateTime, } impl BlobItem { pub fn new(blob_hash: impl Into) -> Self { let hash_str = blob_hash.into(); BlobItem { blob_hash: hash_str.clone(), s3_path: S3Path { bucket_name: CONFIG.s3_bucket_name.clone(), object_name: hash_str, }, created: Utc::now(), } } } #[derive(Clone, Debug)] pub struct ReverseIndexItem { pub holder: String, pub blob_hash: String, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } // Blob item pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> { let item = HashMap::from([ ( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( BLOB_TABLE_S3_PATH_FIELD.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), ( BLOB_TABLE_CREATED_FIELD.to_string(), AttributeValue::S(blob_item.created.to_rfc3339()), ), ]); self .client .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_blob_item( &self, blob_hash: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_hash.to_string()), )]); match self .client .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find blob item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let blob_hash = database::parse_string_attribute( BLOB_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_TABLE_BLOB_HASH_FIELD), )?; let s3_path = database::parse_string_attribute( BLOB_TABLE_S3_PATH_FIELD, item.remove(BLOB_TABLE_S3_PATH_FIELD), )?; let created = database::parse_datetime_attribute( BLOB_TABLE_CREATED_FIELD, item.remove(BLOB_TABLE_CREATED_FIELD), )?; Ok(Some(BlobItem { blob_hash, s3_path: S3Path::from_full_path(&s3_path) .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, created, })) } _ => Ok(None), } } pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_TABLE_NAME) .key( BLOB_TABLE_BLOB_HASH_FIELD, AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } // Reverse index item pub async fn put_reverse_index_item( &self, reverse_index_item: ReverseIndexItem, ) -> Result<(), Error> { let holder = &reverse_index_item.holder; if self.find_reverse_index_by_holder(holder).await?.is_some() { error!("Failed to put reverse index. Holder already exists."); return Err(Error::Blob(BlobDBError::HolderAlreadyExists( holder.to_string(), ))); } let item = HashMap::from([ ( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(reverse_index_item.holder), ), ( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(reverse_index_item.blob_hash), ), ]); self .client .put_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_reverse_index_by_holder( &self, holder: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(holder.to_string()), )]); match self .client .get_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_key(Some(item_key)) .consistent_read(true) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse index by holder"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; Ok(Some(ReverseIndexItem { holder, blob_hash })) } _ => Ok(None), } } pub async fn find_reverse_index_by_hash( &self, blob_hash: &str, ) -> Result, Error> { let response = self .client .query() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) .key_condition_expression("#blobHash = :valueToMatch") .expression_attribute_names( "#blobHash", BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, ) .expression_attribute_values( ":valueToMatch", AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse indexes by hash"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(vec![]); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for mut item in response.items.unwrap_or_default() { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; results.push(ReverseIndexItem { holder, blob_hash }); } return Ok(results); } pub async fn remove_reverse_index_item( &self, holder: &str, ) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .key( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, AttributeValue::S(holder.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } } diff --git a/services/blob/src/database/types.rs b/services/blob/src/database/types.rs index 5639b5786..501132313 100644 --- a/services/blob/src/database/types.rs +++ b/services/blob/src/database/types.rs @@ -1,241 +1,237 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_services_lib::database::{ - parse_string_attribute, parse_timestamp_attribute, DBItemError, Value, + parse_timestamp_attribute, AttributeTryInto, DBItemError, Value, }; use derive_more::Constructor; use std::collections::HashMap; use crate::{config::CONFIG, constants::db::*, s3::S3Path}; use super::errors::Error as DBError; /// Represents a database row in the DynamoDB SDK format pub(super) type RawAttributes = HashMap; /// A convenience Result type for database operations pub(super) type DBResult = Result; /// Represents a type-safe version of a DynamoDB blob table item. /// Each row can be either a blob item or a holder assignment. /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. pub enum DBRow { BlobItem(BlobItemRow), HolderAssignment(HolderAssignmentRow), } impl TryFrom for DBRow { type Error = DBError; fn try_from(attributes: RawAttributes) -> Result { - let holder = parse_string_attribute( - ATTR_HOLDER, - attributes.get(ATTR_HOLDER).cloned(), - )?; + let holder: String = attributes + .get(ATTR_HOLDER) + .cloned() + .attr_try_into(ATTR_HOLDER)?; let row = match holder.as_str() { BLOB_ITEM_ROW_HOLDER_VALUE => DBRow::BlobItem(attributes.try_into()?), _ => DBRow::HolderAssignment(attributes.try_into()?), }; Ok(row) } } /// Represents an input payload for inserting a blob item into the database. /// This contains only the business logic related attributes #[derive(Debug)] pub struct BlobItemInput { pub blob_hash: String, pub s3_path: S3Path, } impl BlobItemInput { pub fn new(blob_hash: impl Into) -> Self { let blob_hash: String = blob_hash.into(); BlobItemInput { blob_hash: blob_hash.clone(), s3_path: S3Path { bucket_name: CONFIG.s3_bucket_name.clone(), object_name: blob_hash, }, } } } /// A struct representing a blob item row in the table in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct BlobItemRow { pub blob_hash: String, pub s3_path: S3Path, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for BlobItemRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { - let blob_hash = parse_string_attribute( - ATTR_BLOB_HASH, - attributes.remove(ATTR_BLOB_HASH), - )?; - let s3_path = - parse_string_attribute(ATTR_S3_PATH, attributes.remove(ATTR_S3_PATH))?; + let blob_hash = attributes + .remove(ATTR_BLOB_HASH) + .attr_try_into(ATTR_BLOB_HASH)?; + let s3_path: String = attributes + .remove(ATTR_S3_PATH) + .attr_try_into(ATTR_S3_PATH)?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Blob)?; let s3_path = S3Path::from_full_path(&s3_path).map_err(DBError::from)?; Ok(BlobItemRow { blob_hash, s3_path, unchecked, created_at, last_modified, }) } } /// A struct representing a holder assignment table row in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct HolderAssignmentRow { pub blob_hash: String, pub holder: String, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for HolderAssignmentRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { - let holder = - parse_string_attribute(ATTR_HOLDER, attributes.remove(ATTR_HOLDER))?; - let blob_hash = parse_string_attribute( - ATTR_BLOB_HASH, - attributes.remove(ATTR_BLOB_HASH), - )?; + let holder = attributes.remove(ATTR_HOLDER).attr_try_into(ATTR_HOLDER)?; + let blob_hash = attributes + .remove(ATTR_BLOB_HASH) + .attr_try_into(ATTR_BLOB_HASH)?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Holder)?; Ok(HolderAssignmentRow { blob_hash, holder, unchecked, created_at, last_modified, }) } } /// Represents a composite primary key for a DynamoDB table row /// /// It implements `TryFrom` and `Into` traits to conveniently use it /// in DynamoDB queries #[derive(Clone, Constructor, Debug)] pub struct PrimaryKey { pub blob_hash: String, pub holder: String, } impl PrimaryKey { /// Creates a primary key for a row containing a blob item data /// Rows queried by primary keys created by this function will /// be of type `BlobItemRow` pub fn for_blob_item(blob_hash: impl Into) -> Self { PrimaryKey { blob_hash: blob_hash.into(), holder: BLOB_ITEM_ROW_HOLDER_VALUE.to_string(), } } } impl TryFrom for PrimaryKey { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { - let blob_hash = parse_string_attribute( - ATTR_BLOB_HASH, - attributes.remove(ATTR_BLOB_HASH), - )?; - let holder = - parse_string_attribute(ATTR_HOLDER, attributes.remove(ATTR_HOLDER))?; + let blob_hash = attributes + .remove(ATTR_BLOB_HASH) + .attr_try_into(ATTR_BLOB_HASH)?; + let holder = attributes.remove(ATTR_HOLDER).attr_try_into(ATTR_HOLDER)?; Ok(PrimaryKey { blob_hash, holder }) } } // useful for convenient calls: // ddb.get_item().set_key(Some(partition_key.into())) impl Into for PrimaryKey { fn into(self) -> RawAttributes { HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(self.blob_hash), ), (ATTR_HOLDER.to_string(), AttributeValue::S(self.holder)), ]) } } /// Represents possible values for the `unchecked` attribute value pub enum UncheckedKind { Blob, Holder, } impl UncheckedKind { pub fn str_value(&self) -> &'static str { match self { UncheckedKind::Blob => "blob", UncheckedKind::Holder => "holder", } } } impl Into for UncheckedKind { fn into(self) -> AttributeValue { AttributeValue::S(self.str_value().to_string()) } } fn is_raw_row_unchecked( row: &RawAttributes, kind: UncheckedKind, ) -> DBResult { let Some(AttributeValue::S(value)) = row.get(ATTR_UNCHECKED) else { // The unchecked attribute not exists return Ok(false); }; if value != kind.str_value() { // The unchecked attribute exists but has an incorrect value return Err(DBError::Attribute(DBItemError::new( ATTR_UNCHECKED.to_string(), Value::String(value.to_string()), comm_services_lib::database::DBItemAttributeError::IncorrectType, ))); } Ok(true) } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index c9f8bc1f7..26de0a746 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,286 +1,286 @@ #![allow(unused)] use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; use async_stream::try_stream; use chrono::Duration; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; use tracing::{debug, error, trace, warn}; use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; use crate::database::types::{ BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, }; use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; use crate::tools::{BoxedError, ByteStream, MemOps}; use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BlobServiceError { BlobNotFound, BlobAlreadyExists, InvalidState, DB(DBError), S3(S3Error), InputError(#[error(ignore)] BoxedError), } type BlobServiceResult = Result; #[derive(Clone, Debug)] pub struct BlobServiceConfig { /// Blob data is streamed from S3 in chunks of this size. pub download_chunk_size: usize, /// If enabled, orphaned blobs will be deleted immediately after /// last holder is removed. This option should be enabled /// if maintenance garbage collection tasks are not run. pub instant_delete_orphaned_blobs: bool, /// Minimum age that a orphan must stay unmodified /// before it can be deleted by a garbage collection task /// This option is ignored if `instant_delete_orphaned_blobs` is `true` pub orphan_protection_period: chrono::Duration, } impl Default for BlobServiceConfig { fn default() -> Self { BlobServiceConfig { download_chunk_size: BLOB_DOWNLOAD_CHUNK_SIZE as usize, instant_delete_orphaned_blobs: false, orphan_protection_period: Duration::hours(1), } } } #[derive(Clone)] pub struct BlobService { db: Arc, s3: S3Client, config: BlobServiceConfig, } impl BlobService { pub fn new( db: DatabaseClient, s3: S3Client, config: BlobServiceConfig, ) -> Self { Self { db: Arc::new(db), s3, config, } } /// Retrieves blob object metadata and returns a download object /// that can be used to download the blob data. pub async fn create_download( &self, blob_hash: impl Into, ) -> BlobServiceResult { // 1. Get S3 path let s3_path = match self.db.get_blob_item(blob_hash.into()).await { Ok(Some(BlobItemRow { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("Blob not found"); Err(BlobServiceError::BlobNotFound) } Err(err) => Err(BlobServiceError::DB(err)), }?; debug!("S3 path: {:?}", s3_path); // 2. Get S3 Object metadata trace!("Getting S3 object metadata..."); let object_metadata = self.s3.get_object_metadata(&s3_path).await?; let blob_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to parse S3 object content length: {:?}", err); BlobServiceError::InvalidState })?; debug!("S3 object size: {} bytes", blob_size); // 3. Create download session let session = BlobDownloadObject { s3_path, blob_size, byte_range: 0..blob_size, chunk_size: self.config.download_chunk_size as u64, s3_client: self.s3.clone(), }; Ok(session) } pub async fn put_blob( &self, blob_hash: impl Into, mut blob_data_stream: impl ByteStream, ) -> Result<(), BlobServiceError> { let blob_hash: String = blob_hash.into(); let blob_item = BlobItemInput::new(&blob_hash); if self.db.get_blob_item(blob_hash).await?.is_some() { debug!("Blob already exists"); return Err(BlobServiceError::BlobAlreadyExists); } let mut upload_session = self.s3.start_upload_session(&blob_item.s3_path).await?; trace!(?blob_item, "Started S3 upload session"); tokio::pin!(blob_data_stream); let mut s3_chunk: Vec = Vec::new(); while let Some(mut chunk) = blob_data_stream.try_next().await.map_err(|err| { warn!("Failed to get data chunk: {:?}", err); BlobServiceError::InputError(err) })? { s3_chunk.append(&mut chunk); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!( chunk_size = s3_chunk.len(), "Chunk size exceeded, adding new S3 part" ); upload_session .add_part(s3_chunk.take_out()) .await .map_err(BlobServiceError::from)?; } } trace!("Upload stream drained"); // add the remaining data as the last S3 part if !s3_chunk.is_empty() { trace!("Uploading remaining {} bytes", s3_chunk.len()); upload_session.add_part(s3_chunk).await?; } // Complete the upload session upload_session.finish_upload().await?; trace!("S3 upload complete, putting item to db"); self.db.put_blob_item(blob_item).await?; Ok(()) } pub async fn assign_holder( &self, blob_hash: impl Into, holder: impl Into, ) -> BlobServiceResult { let blob_hash: String = blob_hash.into(); trace!(blob_hash, "Attempting to assign holder"); self .db .put_holder_assignment(blob_hash.clone(), holder.into()) .await?; trace!("Holder assigned. Checking if data exists"); let data_exists = self.db.get_blob_item(blob_hash).await?.is_some(); Ok(data_exists) } pub async fn revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) -> BlobServiceResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); trace!(blob_hash, holder, "Attempting to revoke holder"); self.db.delete_holder_assignment(&blob_hash, holder).await?; if self.config.instant_delete_orphaned_blobs { trace!("Instant orphan deletion enabled. Looking for holders"); let is_orphan = self .db .list_blob_holders(&blob_hash, Some(1)) .await? .is_empty(); if !is_orphan { trace!("Found holders, nothing to do"); return Ok(()); } debug!("No holders left, deleting blob if exists"); trace!("Getting blob item"); let Some(blob_item) = self.db.get_blob_item(&blob_hash).await? else { trace!("Blob item not found, nothing to do"); - return Ok(()) + return Ok(()); }; trace!("Deleting S3 object"); self.s3.delete_object(&blob_item.s3_path).await?; trace!("Deleting blob item entry from DB"); self.db.delete_blob_item(blob_hash).await?; } Ok(()) } } pub struct BlobDownloadObject { /// Size of the whole blob object in bytes. pub blob_size: u64, /// Range of bytes to be downloaded (exclusive end). byte_range: Range, chunk_size: u64, s3_client: S3Client, s3_path: S3Path, } impl BlobDownloadObject { pub fn set_byte_range(&mut self, range: impl RangeBounds) { let range_start = match range.start_bound() { Bound::Included(&start) => start, Bound::Excluded(&start) => start + 1, Bound::Unbounded => 0, }; let range_end = match range.end_bound() { Bound::Included(&end) => end + 1, Bound::Excluded(&end) => end, Bound::Unbounded => self.blob_size, }; // Clamp range to blob size let start = std::cmp::max(range_start, 0); let end_exclusive = std::cmp::min(range_end, self.blob_size); self.byte_range = start..end_exclusive; debug!("Requested byte range: {}..{}", start, end_exclusive); } /// Size of the data to be downloaded in bytes. pub fn download_size(&self) -> u64 { self.byte_range.end - self.byte_range.start } pub fn into_stream(self) -> impl Stream>> { let BlobDownloadObject { byte_range, chunk_size, s3_path, s3_client, .. } = self; try_stream! { trace!("Starting download stream"); let mut offset: u64 = byte_range.start; while offset < byte_range.end { let next_size = std::cmp::min(chunk_size, byte_range.end - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); yield s3_client .get_object_bytes(&s3_path, range) .await?; offset += next_size; } } } } diff --git a/services/comm-services-lib/src/database.rs b/services/comm-services-lib/src/database.rs index d12180445..85aeccabe 100644 --- a/services/comm-services-lib/src/database.rs +++ b/services/comm-services-lib/src/database.rs @@ -1,350 +1,366 @@ use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::{DateTime, Utc}; -use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::num::ParseIntError; use std::str::FromStr; +// # Useful type aliases + +// Rust exports `pub type` only into the so-called "type namespace", but in +// order to use them e.g. with the `TryFromAttribute` trait, they also need +// to be exported into the "value namespace" which is what `pub use` does. +// +// To overcome that, a dummy module is created and aliases are re-exported +// with `pub use` construct +mod aliases { + use aws_sdk_dynamodb::types::AttributeValue; + use std::collections::HashMap; + pub type AttributeMap = HashMap; +} +pub use self::aliases::AttributeMap; + +// # Error handling + #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), } #[derive(Debug)] pub enum Value { AttributeValue(Option), String(String), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: String, attribute_value: Value, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] TimestampOutOfRange, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] InvalidNumberFormat(ParseIntError), } /// Conversion trait for [`AttributeValue`] /// /// Types implementing this trait are able to do the following: /// ```rust /// use comm_services_lib::database::{TryFromAttribute, AttributeTryInto}; /// /// let foo = SomeType::try_from_attr("MyAttribute", Some(attribute)); /// /// // if `AttributeTryInto` is imported, also: /// let bar = Some(attribute).attr_try_into("MyAttribute"); pub trait TryFromAttribute: Sized { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result; } /// Do NOT implement this trait directly. Implement [`TryFromAttribute`] instead pub trait AttributeTryInto { fn attr_try_into( self, attribute_name: impl Into, ) -> Result; } // Automatic attr_try_into() for all attribute values // that have TryFromAttribute implemented impl AttributeTryInto for Option { fn attr_try_into( self, attribute_name: impl Into, ) -> Result { T::try_from_attr(attribute_name, self) } } impl TryFromAttribute for String { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for bool { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Bool(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for DateTime { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { match &attribute { Some(AttributeValue::S(datetime)) => datetime.parse().map_err(|e| { DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::InvalidTimestamp(e), ) }), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::Missing, )), } } } -impl TryFromAttribute for HashMap { +impl TryFromAttribute for AttributeMap { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for Vec { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::B(data)) => Ok(data.into_inner()), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } #[deprecated = "Use `String::try_from_attr()` instead"] pub fn parse_string_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { String::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `bool::try_from_attr()` instead"] pub fn parse_bool_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { bool::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `DateTime::::try_from_attr()` instead"] pub fn parse_datetime_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { DateTime::::try_from_attr(attribute_name, attribute_value) } -#[deprecated = "Use `HashMap::::try_from_attr()` instead"] +#[deprecated = "Use `AttributeMap::try_from_attr()` instead"] pub fn parse_map_attribute( attribute_name: impl Into, attribute_value: Option, -) -> Result, DBItemError> { +) -> Result { attribute_value.attr_try_into(attribute_name) } pub fn parse_int_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result where T: FromStr, { match &attribute_value { Some(AttributeValue::N(numeric_str)) => { parse_integer(attribute_name, numeric_str) } Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } /// Parses the UTC timestamp in milliseconds from a DynamoDB numeric attribute pub fn parse_timestamp_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { let attribute_name: String = attribute_name.into(); let timestamp = parse_int_attribute::( attribute_name.clone(), attribute_value.clone(), )?; let naive_datetime = chrono::NaiveDateTime::from_timestamp_millis(timestamp) .ok_or_else(|| { DBItemError::new( attribute_name, Value::AttributeValue(attribute_value), DBItemAttributeError::TimestampOutOfRange, ) })?; Ok(DateTime::from_utc(naive_datetime, Utc)) } pub fn parse_integer( attribute_name: impl Into, attribute_value: &str, ) -> Result where T: FromStr, { attribute_value.parse::().map_err(|e| { DBItemError::new( attribute_name.into(), Value::String(attribute_value.into()), DBItemAttributeError::InvalidNumberFormat(e), ) }) } #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_integer() { assert!(parse_integer::("some_attr", "123").is_ok()); assert!(parse_integer::("negative", "-123").is_ok()); assert!(parse_integer::("float", "3.14").is_err()); assert!(parse_integer::("NaN", "foo").is_err()); assert!(parse_integer::("negative_uint", "-123").is_err()); assert!(parse_integer::("too_large", "65536").is_err()); } #[test] fn test_parse_timestamp() { let timestamp = Utc::now().timestamp_millis(); let attr = AttributeValue::N(timestamp.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_ok()); assert_eq!(parsed_timestamp.unwrap().timestamp_millis(), timestamp); } #[test] fn test_parse_invalid_timestamp() { let attr = AttributeValue::N("foo".to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); } #[test] fn test_parse_timestamp_out_of_range() { let attr = AttributeValue::N(i64::MAX.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); assert!(matches!( parsed_timestamp.unwrap_err().attribute_error, DBItemAttributeError::TimestampOutOfRange )); } } diff --git a/services/feature-flags/src/database.rs b/services/feature-flags/src/database.rs index 5b73bd3db..b033c2fe1 100644 --- a/services/feature-flags/src/database.rs +++ b/services/feature-flags/src/database.rs @@ -1,120 +1,122 @@ use crate::constants::{ FEATURE_FLAGS_CONFIG_FIELD, FEATURE_FLAGS_FEATURE_FIELD, FEATURE_FLAGS_NON_STAFF_FIELD, FEATURE_FLAGS_PLATFORM_FIELD, FEATURE_FLAGS_STAFF_FIELD, FEATURE_FLAGS_TABLE_NAME, PLATFORM_ANDROID, PLATFORM_IOS, }; use aws_sdk_dynamodb::types::{AttributeValue, Select}; -use comm_services_lib::database::{self, DBItemError, Error}; +use comm_services_lib::database::{ + self, AttributeMap, DBItemError, Error, TryFromAttribute, +}; use std::collections::HashMap; use std::sync::Arc; use tracing::error; #[derive(Debug)] pub struct CodeVersionSpecificFeatureConfig { pub staff: bool, pub non_staff: bool, } fn parse_code_version_specific_feature_config( value: Option, ) -> Result { let mut code_version_config_map = - database::parse_map_attribute(FEATURE_FLAGS_CONFIG_FIELD, value)?; - let staff = database::parse_bool_attribute( + AttributeMap::try_from_attr(FEATURE_FLAGS_CONFIG_FIELD, value)?; + let staff = bool::try_from_attr( FEATURE_FLAGS_STAFF_FIELD, code_version_config_map.remove(FEATURE_FLAGS_STAFF_FIELD), )?; - let non_staff = database::parse_bool_attribute( + let non_staff = bool::try_from_attr( FEATURE_FLAGS_NON_STAFF_FIELD, code_version_config_map.remove(FEATURE_FLAGS_NON_STAFF_FIELD), )?; Ok(CodeVersionSpecificFeatureConfig { staff, non_staff }) } #[derive(Debug)] pub struct FeatureConfig { pub name: String, pub config: HashMap, } fn parse_feature_config( - mut attribute_value: HashMap, + mut attribute_value: AttributeMap, ) -> Result { - let feature_name = database::parse_string_attribute( + let feature_name = String::try_from_attr( FEATURE_FLAGS_FEATURE_FIELD, attribute_value.remove(FEATURE_FLAGS_FEATURE_FIELD), )?; - let config_map = database::parse_map_attribute( + let config_map = AttributeMap::try_from_attr( FEATURE_FLAGS_CONFIG_FIELD, attribute_value.remove(FEATURE_FLAGS_CONFIG_FIELD), )?; let mut config = HashMap::new(); for (code_version_string, code_version_config) in config_map { let code_version: i32 = database::parse_integer("code_version", code_version_string.as_str())?; let version_config = parse_code_version_specific_feature_config(Some(code_version_config))?; config.insert(code_version, version_config); } Ok(FeatureConfig { name: feature_name, config, }) } pub enum Platform { IOS, ANDROID, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } pub async fn get_features_configuration( &self, platform: Platform, ) -> Result, Error> { let platform_value = match platform { Platform::IOS => PLATFORM_IOS, Platform::ANDROID => PLATFORM_ANDROID, }; let result = self .client .query() .select(Select::AllAttributes) .table_name(FEATURE_FLAGS_TABLE_NAME) .consistent_read(true) .key_condition_expression("#platform = :platform") .expression_attribute_names("#platform", FEATURE_FLAGS_PLATFORM_FIELD) .expression_attribute_values( ":platform", AttributeValue::S(platform_value.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find feature flags configuration"); Error::AwsSdk(e.into()) })?; if let Some(items) = result.items { let mut config = HashMap::new(); for item in items { let feature_config = parse_feature_config(item)?; config.insert(feature_config.name.clone(), feature_config); } Ok(config) } else { Ok(HashMap::new()) } } }