diff --git a/services/blob/src/database.rs b/services/blob/src/database.rs --- a/services/blob/src/database.rs +++ b/services/blob/src/database.rs @@ -5,6 +5,9 @@ use crate::{ constants::{ + BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, + BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, + BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, @@ -128,25 +131,135 @@ &self, reverse_index_item: ReverseIndexItem, ) -> Result<()> { - unimplemented!(); + if self + .find_reverse_index_by_holder(&reverse_index_item.holder) + .await? + .is_some() + { + return Err(anyhow!( + "An item for the given holder [{}] already exists", + reverse_index_item.holder + )); + } + + let item = HashMap::from([ + ( + BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), + AttributeValue::S(reverse_index_item.holder), + ), + ( + BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), + AttributeValue::S(reverse_index_item.blob_hash), + ), + ]); + self + .client + .put_item() + .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) + .set_item(Some(item)) + .send() + .await + .context("Failed to put reverse index item")?; + + Ok(()) } pub async fn find_reverse_index_by_holder( &self, holder: &str, ) -> Result> { - unimplemented!(); + let item_key = HashMap::from([( + BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), + AttributeValue::S(holder.to_string()), + )]); + match self + .client + .get_item() + .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) + .set_key(Some(item_key)) + .consistent_read(true) + .send() + .await + .with_context(|| { + format!("Failed to find reverse index for holder: [{}]", holder) + })? { + GetItemOutput { + item: Some(mut item), + .. + } => { + let holder = parse_string_attribute( + item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), + )?; + let blob_hash = parse_string_attribute( + item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), + )?; + + Ok(Some(ReverseIndexItem { holder, blob_hash })) + } + _ => Ok(None), + } } pub async fn find_reverse_index_by_hash( &self, blob_hash: &str, ) -> Result> { - unimplemented!(); + let response = self + .client + .query() + .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) + .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) + .key_condition_expression("#blobHash = :valueToMatch") + .expression_attribute_names( + "#blobHash", + BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, + ) + .expression_attribute_values( + ":valueToMatch", + AttributeValue::S(blob_hash.to_string()), + ) + .send() + .await + .with_context(|| { + format!("Failed to find reverse index for hash: [{}]", blob_hash) + })?; + + if response.count == 0 { + return Ok(vec![]); + } + + let mut results: Vec = + Vec::with_capacity(response.count() as usize); + for mut item in response.items.unwrap_or_default() { + let holder = parse_string_attribute( + item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), + )?; + let blob_hash = parse_string_attribute( + item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), + )?; + + results.push(ReverseIndexItem { holder, blob_hash }); + } + + return Ok(results); } pub async fn remove_reverse_index_item(&self, holder: &str) -> Result<()> { - unimplemented!(); + self + .client + .delete_item() + .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) + .key( + BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, + AttributeValue::S(holder.to_string()), + ) + .send() + .await + .with_context(|| { + format!("Failed to remove reverse index for holder: [{}]", holder) + })?; + + Ok(()) } }