diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -1,8 +1,15 @@ -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; +use aws_sdk_dynamodb::{model::AttributeValue, output::GetItemOutput}; use chrono::{DateTime, Utc}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; -use crate::s3::S3Path; +use crate::{ + constants::{ + BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, + BLOB_TABLE_S3_PATH_FIELD, + }, + s3::S3Path, +}; #[derive(Clone, Debug)] pub struct BlobItem { @@ -32,18 +39,87 @@ // Blob item pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<()> { - unimplemented!(); + let item = HashMap::from([ + ( + BLOB_TABLE_BLOB_HASH_FIELD.to_string(), + AttributeValue::S(blob_item.blob_hash), + ), + ( + BLOB_TABLE_S3_PATH_FIELD.to_string(), + AttributeValue::S(blob_item.s3_path.to_full_path()), + ), + ( + BLOB_TABLE_CREATED_FIELD.to_string(), + AttributeValue::S(blob_item.created.to_rfc3339()), + ), + ]); + + self + .client + .put_item() + .table_name(BLOB_TABLE_NAME) + .set_item(Some(item)) + .send() + .await + .context("Failed to put blob item")?; + + Ok(()) } pub async fn find_blob_item( &self, blob_hash: &str, ) -> Result> { - unimplemented!(); + let item_key = HashMap::from([( + BLOB_TABLE_BLOB_HASH_FIELD.to_string(), + AttributeValue::S(blob_hash.to_string()), + )]); + match self + .client + .get_item() + .table_name(BLOB_TABLE_NAME) + .set_key(Some(item_key)) + .send() + .await + .with_context(|| { + format!("Failed to find blob item with hash: [{}]", blob_hash) + })? { + GetItemOutput { + item: Some(mut item), + .. + } => { + let blob_hash = + parse_string_attribute(item.remove(BLOB_TABLE_BLOB_HASH_FIELD))?; + let s3_path = + parse_string_attribute(item.remove(BLOB_TABLE_S3_PATH_FIELD))?; + let created = + parse_datetime_attribute(item.remove(BLOB_TABLE_CREATED_FIELD))?; + Ok(Some(BlobItem { + blob_hash, + s3_path: S3Path::from_full_path(&s3_path)?, + created, + })) + } + _ => Ok(None), + } } pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<()> { - unimplemented!(); + self + .client + .delete_item() + .table_name(BLOB_TABLE_NAME) + .key( + BLOB_TABLE_BLOB_HASH_FIELD, + AttributeValue::S(blob_hash.to_string()), + ) + .send() + .await + .with_context(|| { + format!("Failed to remove blob item with hash: [{}]", blob_hash) + })?; + + Ok(()) } // Reverse index item @@ -73,3 +149,24 @@ unimplemented!(); } } + +fn parse_string_attribute(attribute: Option) -> Result { + match attribute { + Some(AttributeValue::S(str_value)) => Ok(str_value), + Some(_) => Err(anyhow!("Incorrect type")), + None => Err(anyhow!("Atrribute missing")), + } +} + +fn parse_datetime_attribute( + attribute: Option, +) -> Result> { + if let Some(AttributeValue::S(datetime)) = &attribute { + // parse() accepts a relaxed RFC3339 string + datetime + .parse() + .with_context(|| format!("Invalid RFC 3339 DateTime: {}", datetime)) + } else { + Err(anyhow!("Atrribute missing")) + } +}