Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3346820
D8450.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D8450.diff
View Options
diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
new file mode 100644
--- /dev/null
+++ b/services/blob/src/database/client.rs
@@ -0,0 +1,156 @@
+// TODO: Remove this when possible
+#![allow(unused)]
+
+use aws_sdk_dynamodb::{
+ operation::put_item::PutItemOutput,
+ types::{
+ AttributeValue, Delete, DeleteRequest, KeysAndAttributes, PutRequest,
+ TransactWriteItem, Update, WriteRequest,
+ },
+ Error as DynamoDBError,
+};
+use chrono::Utc;
+use comm_services_lib::database::parse_string_attribute;
+use std::{collections::HashMap, sync::Arc};
+use tracing::{debug, error, trace};
+
+use crate::constants::db::*;
+
+use super::errors::{BlobDBError, Error as DBError};
+use super::types::*;
+
+#[derive(Clone)]
+pub struct DatabaseClient {
+ ddb: Arc<aws_sdk_dynamodb::Client>,
+}
+
+/// public interface implementation
+impl DatabaseClient {
+ pub fn new(aws_config: &aws_types::SdkConfig) -> Self {
+ DatabaseClient {
+ ddb: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)),
+ }
+ }
+
+ /// Gets a blob item row from the database by its blob hash
+ /// Returns None if the blob item is not found
+ pub async fn get_blob_item(
+ &self,
+ blob_hash: impl Into<String>,
+ ) -> DBResult<Option<BlobItemRow>> {
+ let key = PrimaryKey::for_blob_item(blob_hash);
+ self
+ .get_raw_item(key.clone())
+ .await?
+ .map(BlobItemRow::try_from)
+ .transpose()
+ }
+
+ /// Inserts a new blob item row into the database. Returns Error
+ /// if the item already exists.
+ pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> {
+ let item = HashMap::from([
+ (
+ ATTR_BLOB_HASH.to_string(),
+ AttributeValue::S(blob_item.blob_hash),
+ ),
+ (
+ ATTR_HOLDER.to_string(),
+ AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()),
+ ),
+ (
+ ATTR_S3_PATH.to_string(),
+ AttributeValue::S(blob_item.s3_path.to_full_path()),
+ ),
+ (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()),
+ ]);
+
+ self.insert_item(item).await?;
+ Ok(())
+ }
+
+ /// Deletes blob item row. Doesn't delete its holders.
+ pub async fn delete_blob_item(
+ &self,
+ blob_hash: impl Into<String>,
+ ) -> DBResult<()> {
+ let key = PrimaryKey::for_blob_item(blob_hash);
+ self
+ .ddb
+ .delete_item()
+ .table_name(BLOB_TABLE_NAME)
+ .set_key(Some(key.into()))
+ .send()
+ .await
+ .map_err(|err| {
+ debug!("DynamoDB client failed to delete blob item: {:?}", err);
+ DBError::AwsSdk(err.into())
+ })?;
+ Ok(())
+ }
+}
+
+// private helpers
+impl DatabaseClient {
+ /// inserts a new item into the table using PutItem. Returns
+ /// error if the item already exists
+ async fn insert_item(
+ &self,
+ mut item: RawAttributes,
+ ) -> DBResult<PutItemOutput> {
+ // add metadata attributes common for all types of rows
+ let now = Utc::now().timestamp_millis();
+ item.insert(
+ ATTR_CREATED_AT.to_string(),
+ AttributeValue::N(now.to_string()),
+ );
+ item.insert(
+ ATTR_LAST_MODIFIED.to_string(),
+ AttributeValue::N(now.to_string()),
+ );
+
+ self
+ .ddb
+ .put_item()
+ .table_name(BLOB_TABLE_NAME)
+ .set_item(Some(item))
+ // make sure we don't accidentaly overwrite existing row
+ .condition_expression(
+ "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)",
+ )
+ .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH)
+ .expression_attribute_names("#holder", ATTR_HOLDER)
+ .send()
+ .await
+ .map_err(|err| match DynamoDBError::from(err) {
+ DynamoDBError::ConditionalCheckFailedException(e) => {
+ debug!("DynamoDB client failed to insert: item already exists");
+ trace!("Conditional check failed with error: {}", e);
+ DBError::ItemAlreadyExists
+ }
+ err => {
+ debug!("DynamoDB client failed to insert: {:?}", err);
+ DBError::AwsSdk(err)
+ }
+ })
+ }
+
+ /// Gets a single row from the table using GetItem, without parsing it
+ async fn get_raw_item(
+ &self,
+ key: PrimaryKey,
+ ) -> DBResult<Option<RawAttributes>> {
+ self
+ .ddb
+ .get_item()
+ .table_name(BLOB_TABLE_NAME)
+ .set_key(Some(key.into()))
+ .send()
+ .await
+ .map_err(|err| {
+ debug!("DynamoDB client failed to get item: {:?}", err);
+ DBError::AwsSdk(err.into())
+ })
+ .map(|response| response.item)
+ }
+}
diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs
--- a/services/blob/src/database/mod.rs
+++ b/services/blob/src/database/mod.rs
@@ -1,3 +1,7 @@
+pub mod client;
pub mod errors;
pub mod old;
pub mod types;
+
+pub use client::DatabaseClient;
+pub use errors::Error as DBError;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 9:57 AM (18 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2570150
Default Alt Text
D8450.diff (4 KB)
Attached To
Mode
D8450: [blob-service] Introduce database client
Attached
Detach File
Event Timeline
Log In to Comment