diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs new file mode 100644 index 000000000..4f24fe47e --- /dev/null +++ b/services/blob/src/database/client.rs @@ -0,0 +1,156 @@ +// TODO: Remove this when possible +#![allow(unused)] + +use aws_sdk_dynamodb::{ + operation::put_item::PutItemOutput, + types::{ + AttributeValue, Delete, DeleteRequest, KeysAndAttributes, PutRequest, + TransactWriteItem, Update, WriteRequest, + }, + Error as DynamoDBError, +}; +use chrono::Utc; +use comm_services_lib::database::parse_string_attribute; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, error, trace}; + +use crate::constants::db::*; + +use super::errors::{BlobDBError, Error as DBError}; +use super::types::*; + +#[derive(Clone)] +pub struct DatabaseClient { + ddb: Arc, +} + +/// public interface implementation +impl DatabaseClient { + pub fn new(aws_config: &aws_types::SdkConfig) -> Self { + DatabaseClient { + ddb: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), + } + } + + /// Gets a blob item row from the database by its blob hash + /// Returns None if the blob item is not found + pub async fn get_blob_item( + &self, + blob_hash: impl Into, + ) -> DBResult> { + let key = PrimaryKey::for_blob_item(blob_hash); + self + .get_raw_item(key.clone()) + .await? + .map(BlobItemRow::try_from) + .transpose() + } + + /// Inserts a new blob item row into the database. Returns Error + /// if the item already exists. + pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> { + let item = HashMap::from([ + ( + ATTR_BLOB_HASH.to_string(), + AttributeValue::S(blob_item.blob_hash), + ), + ( + ATTR_HOLDER.to_string(), + AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()), + ), + ( + ATTR_S3_PATH.to_string(), + AttributeValue::S(blob_item.s3_path.to_full_path()), + ), + (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()), + ]); + + self.insert_item(item).await?; + Ok(()) + } + + /// Deletes blob item row. Doesn't delete its holders. + pub async fn delete_blob_item( + &self, + blob_hash: impl Into, + ) -> DBResult<()> { + let key = PrimaryKey::for_blob_item(blob_hash); + self + .ddb + .delete_item() + .table_name(BLOB_TABLE_NAME) + .set_key(Some(key.into())) + .send() + .await + .map_err(|err| { + debug!("DynamoDB client failed to delete blob item: {:?}", err); + DBError::AwsSdk(err.into()) + })?; + Ok(()) + } +} + +// private helpers +impl DatabaseClient { + /// inserts a new item into the table using PutItem. Returns + /// error if the item already exists + async fn insert_item( + &self, + mut item: RawAttributes, + ) -> DBResult { + // add metadata attributes common for all types of rows + let now = Utc::now().timestamp_millis(); + item.insert( + ATTR_CREATED_AT.to_string(), + AttributeValue::N(now.to_string()), + ); + item.insert( + ATTR_LAST_MODIFIED.to_string(), + AttributeValue::N(now.to_string()), + ); + + self + .ddb + .put_item() + .table_name(BLOB_TABLE_NAME) + .set_item(Some(item)) + // make sure we don't accidentaly overwrite existing row + .condition_expression( + "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)", + ) + .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) + .expression_attribute_names("#holder", ATTR_HOLDER) + .send() + .await + .map_err(|err| match DynamoDBError::from(err) { + DynamoDBError::ConditionalCheckFailedException(e) => { + debug!("DynamoDB client failed to insert: item already exists"); + trace!("Conditional check failed with error: {}", e); + DBError::ItemAlreadyExists + } + err => { + debug!("DynamoDB client failed to insert: {:?}", err); + DBError::AwsSdk(err) + } + }) + } + + /// Gets a single row from the table using GetItem, without parsing it + async fn get_raw_item( + &self, + key: PrimaryKey, + ) -> DBResult> { + self + .ddb + .get_item() + .table_name(BLOB_TABLE_NAME) + .set_key(Some(key.into())) + .send() + .await + .map_err(|err| { + debug!("DynamoDB client failed to get item: {:?}", err); + DBError::AwsSdk(err.into()) + }) + .map(|response| response.item) + } +} diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs index 875d5f400..dd0a742a1 100644 --- a/services/blob/src/database/mod.rs +++ b/services/blob/src/database/mod.rs @@ -1,3 +1,7 @@ +pub mod client; pub mod errors; pub mod old; pub mod types; + +pub use client::DatabaseClient; +pub use errors::Error as DBError;