Changeset View
Changeset View
Standalone View
Standalone View
services/blob/src/database.rs
use anyhow::Result; | use anyhow::{anyhow, Context, Result}; | ||||
use aws_sdk_dynamodb::{model::AttributeValue, output::GetItemOutput}; | |||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
use std::sync::Arc; | use std::{collections::HashMap, sync::Arc}; | ||||
use crate::s3::S3Path; | use crate::{ | ||||
constants::{ | |||||
BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, | |||||
BLOB_TABLE_S3_PATH_FIELD, | |||||
}, | |||||
s3::S3Path, | |||||
}; | |||||
#[derive(Clone, Debug)] | #[derive(Clone, Debug)] | ||||
pub struct BlobItem { | pub struct BlobItem { | ||||
pub blob_hash: String, | pub blob_hash: String, | ||||
pub s3_path: S3Path, | pub s3_path: S3Path, | ||||
pub created: DateTime<Utc>, | pub created: DateTime<Utc>, | ||||
} | } | ||||
Show All 13 Lines | pub fn new(aws_config: &aws_types::SdkConfig) -> Self { | ||||
DatabaseClient { | DatabaseClient { | ||||
client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), | client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), | ||||
} | } | ||||
} | } | ||||
// Blob item | // Blob item | ||||
pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<()> { | pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<()> { | ||||
unimplemented!(); | let item = HashMap::from([ | ||||
( | |||||
BLOB_TABLE_BLOB_HASH_FIELD.to_string(), | |||||
AttributeValue::S(blob_item.blob_hash), | |||||
), | |||||
( | |||||
BLOB_TABLE_S3_PATH_FIELD.to_string(), | |||||
AttributeValue::S(blob_item.s3_path.to_full_path()), | |||||
), | |||||
( | |||||
BLOB_TABLE_CREATED_FIELD.to_string(), | |||||
AttributeValue::S(blob_item.created.to_rfc3339()), | |||||
), | |||||
]); | |||||
self | |||||
.client | |||||
.put_item() | |||||
.table_name(BLOB_TABLE_NAME) | |||||
.set_item(Some(item)) | |||||
.send() | |||||
.await | |||||
.context("Failed to put blob item")?; | |||||
Ok(()) | |||||
} | } | ||||
pub async fn find_blob_item( | pub async fn find_blob_item( | ||||
&self, | &self, | ||||
blob_hash: &str, | blob_hash: &str, | ||||
) -> Result<Option<BlobItem>> { | ) -> Result<Option<BlobItem>> { | ||||
unimplemented!(); | let item_key = HashMap::from([( | ||||
BLOB_TABLE_BLOB_HASH_FIELD.to_string(), | |||||
AttributeValue::S(blob_hash.to_string()), | |||||
)]); | |||||
match self | |||||
.client | |||||
.get_item() | |||||
.table_name(BLOB_TABLE_NAME) | |||||
.set_key(Some(item_key)) | |||||
.send() | |||||
.await | |||||
.with_context(|| { | |||||
format!("Failed to find blob item with hash: [{}]", blob_hash) | |||||
})? { | |||||
GetItemOutput { | |||||
item: Some(mut item), | |||||
.. | |||||
} => { | |||||
let blob_hash = | |||||
parse_string_attribute(item.remove(BLOB_TABLE_BLOB_HASH_FIELD))?; | |||||
let s3_path = | |||||
parse_string_attribute(item.remove(BLOB_TABLE_S3_PATH_FIELD))?; | |||||
let created = | |||||
parse_datetime_attribute(item.remove(BLOB_TABLE_CREATED_FIELD))?; | |||||
Ok(Some(BlobItem { | |||||
blob_hash, | |||||
s3_path: S3Path::from_full_path(&s3_path)?, | |||||
created, | |||||
})) | |||||
} | |||||
_ => Ok(None), | |||||
} | |||||
} | } | ||||
pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<()> { | pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<()> { | ||||
unimplemented!(); | self | ||||
.client | |||||
.delete_item() | |||||
.table_name(BLOB_TABLE_NAME) | |||||
.key( | |||||
BLOB_TABLE_BLOB_HASH_FIELD, | |||||
AttributeValue::S(blob_hash.to_string()), | |||||
) | |||||
.send() | |||||
.await | |||||
.with_context(|| { | |||||
format!("Failed to remove blob item with hash: [{}]", blob_hash) | |||||
})?; | |||||
Ok(()) | |||||
} | } | ||||
// Reverse index item | // Reverse index item | ||||
pub async fn put_reverse_index_item( | pub async fn put_reverse_index_item( | ||||
&self, | &self, | ||||
reverse_index_item: ReverseIndexItem, | reverse_index_item: ReverseIndexItem, | ||||
) -> Result<()> { | ) -> Result<()> { | ||||
Show All 13 Lines | impl DatabaseClient { | ||||
) -> Result<Vec<ReverseIndexItem>> { | ) -> Result<Vec<ReverseIndexItem>> { | ||||
unimplemented!(); | unimplemented!(); | ||||
} | } | ||||
pub async fn remove_reverse_index_item(&self, holder: &str) -> Result<()> { | pub async fn remove_reverse_index_item(&self, holder: &str) -> Result<()> { | ||||
unimplemented!(); | unimplemented!(); | ||||
} | } | ||||
} | } | ||||
fn parse_string_attribute(attribute: Option<AttributeValue>) -> Result<String> { | |||||
match attribute { | |||||
Some(AttributeValue::S(str_value)) => Ok(str_value), | |||||
Some(_) => Err(anyhow!("Incorrect type")), | |||||
None => Err(anyhow!("Atrribute missing")), | |||||
} | |||||
} | |||||
fn parse_datetime_attribute( | |||||
attribute: Option<AttributeValue>, | |||||
) -> Result<DateTime<Utc>> { | |||||
if let Some(AttributeValue::S(datetime)) = &attribute { | |||||
// parse() accepts a relaxed RFC3339 string | |||||
datetime | |||||
.parse() | |||||
.with_context(|| format!("Invalid RFC 3339 DateTime: {}", datetime)) | |||||
} else { | |||||
Err(anyhow!("Atrribute missing")) | |||||
} | |||||
} |