diff --git a/services/blob/src/database/types.rs b/services/blob/src/database/types.rs index 501132313..f93012569 100644 --- a/services/blob/src/database/types.rs +++ b/services/blob/src/database/types.rs @@ -1,237 +1,241 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_services_lib::database::{ parse_timestamp_attribute, AttributeTryInto, DBItemError, Value, }; use derive_more::Constructor; use std::collections::HashMap; use crate::{config::CONFIG, constants::db::*, s3::S3Path}; use super::errors::Error as DBError; /// Represents a database row in the DynamoDB SDK format pub(super) type RawAttributes = HashMap; /// A convenience Result type for database operations pub(super) type DBResult = Result; /// Represents a type-safe version of a DynamoDB blob table item. /// Each row can be either a blob item or a holder assignment. /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. pub enum DBRow { BlobItem(BlobItemRow), HolderAssignment(HolderAssignmentRow), } impl TryFrom for DBRow { type Error = DBError; fn try_from(attributes: RawAttributes) -> Result { let holder: String = attributes .get(ATTR_HOLDER) .cloned() .attr_try_into(ATTR_HOLDER)?; let row = match holder.as_str() { BLOB_ITEM_ROW_HOLDER_VALUE => DBRow::BlobItem(attributes.try_into()?), _ => DBRow::HolderAssignment(attributes.try_into()?), }; Ok(row) } } /// Represents an input payload for inserting a blob item into the database. /// This contains only the business logic related attributes #[derive(Debug)] pub struct BlobItemInput { pub blob_hash: String, pub s3_path: S3Path, } impl BlobItemInput { pub fn new(blob_hash: impl Into) -> Self { let blob_hash: String = blob_hash.into(); BlobItemInput { blob_hash: blob_hash.clone(), s3_path: S3Path { bucket_name: CONFIG.s3_bucket_name.clone(), object_name: blob_hash, }, } } } /// A struct representing a blob item row in the table in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct BlobItemRow { pub blob_hash: String, pub s3_path: S3Path, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for BlobItemRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let blob_hash = attributes .remove(ATTR_BLOB_HASH) .attr_try_into(ATTR_BLOB_HASH)?; let s3_path: String = attributes .remove(ATTR_S3_PATH) .attr_try_into(ATTR_S3_PATH)?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Blob)?; let s3_path = S3Path::from_full_path(&s3_path).map_err(DBError::from)?; Ok(BlobItemRow { blob_hash, s3_path, unchecked, created_at, last_modified, }) } } /// A struct representing a holder assignment table row in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct HolderAssignmentRow { pub blob_hash: String, pub holder: String, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for HolderAssignmentRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let holder = attributes.remove(ATTR_HOLDER).attr_try_into(ATTR_HOLDER)?; let blob_hash = attributes .remove(ATTR_BLOB_HASH) .attr_try_into(ATTR_BLOB_HASH)?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Holder)?; Ok(HolderAssignmentRow { blob_hash, holder, unchecked, created_at, last_modified, }) } } /// Represents a composite primary key for a DynamoDB table row /// /// It implements `TryFrom` and `Into` traits to conveniently use it /// in DynamoDB queries -#[derive(Clone, Constructor, Debug)] +#[derive(Clone, Constructor, Debug, Hash, Eq, PartialEq)] pub struct PrimaryKey { pub blob_hash: String, pub holder: String, } impl PrimaryKey { /// Creates a primary key for a row containing a blob item data /// Rows queried by primary keys created by this function will /// be of type `BlobItemRow` pub fn for_blob_item(blob_hash: impl Into) -> Self { PrimaryKey { blob_hash: blob_hash.into(), holder: BLOB_ITEM_ROW_HOLDER_VALUE.to_string(), } } + + pub fn is_blob_item(&self) -> bool { + self.holder == BLOB_ITEM_ROW_HOLDER_VALUE + } } impl TryFrom for PrimaryKey { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let blob_hash = attributes .remove(ATTR_BLOB_HASH) .attr_try_into(ATTR_BLOB_HASH)?; let holder = attributes.remove(ATTR_HOLDER).attr_try_into(ATTR_HOLDER)?; Ok(PrimaryKey { blob_hash, holder }) } } // useful for convenient calls: // ddb.get_item().set_key(Some(partition_key.into())) impl Into for PrimaryKey { fn into(self) -> RawAttributes { HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(self.blob_hash), ), (ATTR_HOLDER.to_string(), AttributeValue::S(self.holder)), ]) } } /// Represents possible values for the `unchecked` attribute value pub enum UncheckedKind { Blob, Holder, } impl UncheckedKind { pub fn str_value(&self) -> &'static str { match self { UncheckedKind::Blob => "blob", UncheckedKind::Holder => "holder", } } } impl Into for UncheckedKind { fn into(self) -> AttributeValue { AttributeValue::S(self.str_value().to_string()) } } fn is_raw_row_unchecked( row: &RawAttributes, kind: UncheckedKind, ) -> DBResult { let Some(AttributeValue::S(value)) = row.get(ATTR_UNCHECKED) else { // The unchecked attribute not exists return Ok(false); }; if value != kind.str_value() { // The unchecked attribute exists but has an incorrect value return Err(DBError::Attribute(DBItemError::new( ATTR_UNCHECKED.to_string(), Value::String(value.to_string()), comm_services_lib::database::DBItemAttributeError::IncorrectType, ))); } Ok(true) } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index 09d6f7353..2b57e1a41 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,365 +1,444 @@ #![allow(unused)] use std::collections::BTreeMap; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; use async_stream::try_stream; use chrono::Duration; use comm_services_lib::http::ByteStream; use comm_services_lib::tools::BoxedError; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; use tracing::{debug, error, trace, warn}; use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; use crate::database::types::{ BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, }; use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; use crate::tools::MemOps; use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BlobServiceError { BlobNotFound, BlobAlreadyExists, InvalidState, DB(DBError), S3(S3Error), InputError(#[error(ignore)] BoxedError), } type BlobServiceResult = Result; #[derive(Clone, Debug)] pub struct BlobServiceConfig { /// Blob data is streamed from S3 in chunks of this size. pub download_chunk_size: usize, /// If enabled, orphaned blobs will be deleted immediately after /// last holder is removed. This option should be enabled /// if maintenance garbage collection tasks are not run. pub instant_delete_orphaned_blobs: bool, /// Minimum age that a orphan must stay unmodified /// before it can be deleted by a garbage collection task /// This option is ignored if `instant_delete_orphaned_blobs` is `true` pub orphan_protection_period: chrono::Duration, } impl Default for BlobServiceConfig { fn default() -> Self { BlobServiceConfig { download_chunk_size: BLOB_DOWNLOAD_CHUNK_SIZE as usize, instant_delete_orphaned_blobs: false, orphan_protection_period: Duration::hours(1), } } } #[derive(Clone)] pub struct BlobService { db: Arc, s3: S3Client, config: BlobServiceConfig, } impl BlobService { pub fn new( db: DatabaseClient, s3: S3Client, config: BlobServiceConfig, ) -> Self { Self { db: Arc::new(db), s3, config, } } /// Retrieves blob object metadata and returns a download object /// that can be used to download the blob data. pub async fn create_download( &self, blob_hash: impl Into, ) -> BlobServiceResult { // 1. Get S3 path let s3_path = match self.db.get_blob_item(blob_hash.into()).await { Ok(Some(BlobItemRow { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("Blob not found"); Err(BlobServiceError::BlobNotFound) } Err(err) => Err(BlobServiceError::DB(err)), }?; debug!("S3 path: {:?}", s3_path); // 2. Get S3 Object metadata trace!("Getting S3 object metadata..."); let object_metadata = self.s3.get_object_metadata(&s3_path).await?; let blob_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to parse S3 object content length: {:?}", err); BlobServiceError::InvalidState })?; debug!("S3 object size: {} bytes", blob_size); // 3. Create download session let session = BlobDownloadObject { s3_path, blob_size, byte_range: 0..blob_size, chunk_size: self.config.download_chunk_size as u64, s3_client: self.s3.clone(), }; Ok(session) } pub async fn put_blob( &self, blob_hash: impl Into, mut blob_data_stream: impl ByteStream, ) -> Result<(), BlobServiceError> { let blob_hash: String = blob_hash.into(); let blob_item = BlobItemInput::new(&blob_hash); if self.db.get_blob_item(blob_hash).await?.is_some() { debug!("Blob already exists"); return Err(BlobServiceError::BlobAlreadyExists); } let mut upload_session = self.s3.start_upload_session(&blob_item.s3_path).await?; trace!(?blob_item, "Started S3 upload session"); tokio::pin!(blob_data_stream); let mut s3_chunk: Vec = Vec::new(); while let Some(chunk) = blob_data_stream.try_next().await.map_err(|err| { warn!("Failed to get data chunk: {:?}", err); BlobServiceError::InputError(err) })? { s3_chunk.extend_from_slice(&chunk); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!( chunk_size = s3_chunk.len(), "Chunk size exceeded, adding new S3 part" ); upload_session .add_part(s3_chunk.take_out()) .await .map_err(BlobServiceError::from)?; } } trace!("Upload stream drained"); // add the remaining data as the last S3 part if !s3_chunk.is_empty() { trace!("Uploading remaining {} bytes", s3_chunk.len()); upload_session.add_part(s3_chunk).await?; } // Complete the upload session upload_session.finish_upload().await?; trace!("S3 upload complete, putting item to db"); self.db.put_blob_item(blob_item).await?; Ok(()) } pub async fn assign_holder( &self, blob_hash: impl Into, holder: impl Into, ) -> BlobServiceResult { let blob_hash: String = blob_hash.into(); trace!(blob_hash, "Attempting to assign holder"); self .db .put_holder_assignment(blob_hash.clone(), holder.into()) .await?; trace!("Holder assigned. Checking if data exists"); let data_exists = self.db.get_blob_item(blob_hash).await?.is_some(); Ok(data_exists) } pub async fn revoke_holder( &self, blob_hash: impl Into, holder: impl Into, ) -> BlobServiceResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); trace!(blob_hash, holder, "Attempting to revoke holder"); self.db.delete_holder_assignment(&blob_hash, holder).await?; if self.config.instant_delete_orphaned_blobs { trace!("Instant orphan deletion enabled. Looking for holders"); let is_orphan = self .db .list_blob_holders(&blob_hash, Some(1)) .await? .is_empty(); if !is_orphan { trace!("Found holders, nothing to do"); return Ok(()); } debug!("No holders left, deleting blob if exists"); trace!("Getting blob item"); let Some(blob_item) = self.db.get_blob_item(&blob_hash).await? else { trace!("Blob item not found, nothing to do"); return Ok(()); }; trace!("Deleting S3 object"); self.s3.delete_object(&blob_item.s3_path).await?; trace!("Deleting blob item entry from DB"); self.db.delete_blob_item(blob_hash).await?; } Ok(()) } } // A B-tree map performs well for both random and sequential access. type BlobHash = String; type UncheckedCollection = BTreeMap; /// Represents an "unchecked" blob entity. It might miss either /// blob hash or holders. #[derive(Debug)] struct UncheckedItem { blob_hash: Option, holders: Vec, } impl UncheckedItem { fn has_blob_hash(&self) -> bool { self.blob_hash.is_some() } fn has_holders(&self) -> bool { !self.holders.is_empty() } /// Returns primary keys for this item. It contains primary heys for holders /// and for blob item (if it has hash). /// A fallback hash is required for holders if item's blob hash is None. fn as_primary_keys(&self, fallback_blob_hash: &str) -> Vec { if !self.has_holders() && !self.has_blob_hash() { warn!( fallback_blob_hash, "Item has no hash and no holders, this should never happen!" ); return Vec::new(); } let hash_for_holders = self.blob_hash.as_deref().unwrap_or(fallback_blob_hash); let mut keys = self .holders .iter() .map(|holder| PrimaryKey { blob_hash: hash_for_holders.to_string(), holder: holder.to_string(), }) .collect::>(); if let Some(blob_hash) = &self.blob_hash { keys.push(PrimaryKey::for_blob_item(blob_hash.to_string())); } keys } } trait CleanupOperations { /// Retains only items that should remain unchecked /// (missing blob hash or holders). /// /// Returns removed items - these items are checked /// (contain both blob hash and at least one holder). fn filter_out_checked(&mut self) -> Vec; /// Returns list of blob hashes for which we need to query if they contain /// at least one holder fn blobs_to_find_holders(&self) -> Vec<&BlobHash>; /// Returns primary keys for blob items that need to be checked if they exist /// /// Technically, this returns all items that have holders but no hash. fn blobs_to_check_existence(&self) -> Vec; /// Updates the structure after fetching additional data from database. fn feed_with_query_results( &mut self, fetched_items: impl IntoIterator, ); /// Turns this collection into a list of DB primary keys fn into_primary_keys(self) -> Vec; } +impl CleanupOperations for UncheckedCollection { + /// Retains only items that should remain unchecked + /// (missing blob hash or holders). + /// + /// Returns removed items - these items are checked + /// (contain both blob hash and at least one holder). + fn filter_out_checked(&mut self) -> Vec { + let mut checked = Vec::new(); + + self.retain(|blob_hash, item| { + if !item.has_blob_hash() || !item.has_holders() { + // blob hash or holder missing, leave unchecked + return true; + } + + checked.extend(item.as_primary_keys(blob_hash)); + false + }); + checked + } + + /// Returns list of blob hashes for which we need to query if they contain + /// at least one holder + fn blobs_to_find_holders(&self) -> Vec<&BlobHash> { + self + .iter() + .filter_map(|(blob_hash, item)| { + if item.has_blob_hash() && !item.has_holders() { + Some(blob_hash) + } else { + None + } + }) + .collect() + } + + /// Returns primary keys for blob items that need to be checked if they exist + /// + /// Technically, this returns all blob items that have holders but no hash. + fn blobs_to_check_existence(&self) -> Vec { + self + .iter() + .filter_map(|(blob_hash, item)| { + if item.has_holders() && !item.has_blob_hash() { + Some(PrimaryKey::for_blob_item(blob_hash)) + } else { + None + } + }) + .collect() + } + + /// Updates the structure after fetching additional data from database. + fn feed_with_query_results( + &mut self, + fetched_items: impl IntoIterator, + ) { + for pk in fetched_items.into_iter() { + let Some(item) = self.get_mut(&pk.blob_hash) else { + warn!("Got fetched item that was not requested: {:?}", pk); + continue; + }; + + if pk.is_blob_item() { + item.blob_hash = Some(pk.blob_hash) + } else { + item.holders.push(pk.holder); + } + } + } + + fn into_primary_keys(self) -> Vec { + self + .into_iter() + .flat_map(|(blob_hash, item)| item.as_primary_keys(&blob_hash)) + .collect() + } +} + pub struct BlobDownloadObject { /// Size of the whole blob object in bytes. pub blob_size: u64, /// Range of bytes to be downloaded (exclusive end). byte_range: Range, chunk_size: u64, s3_client: S3Client, s3_path: S3Path, } impl BlobDownloadObject { pub fn set_byte_range(&mut self, range: impl RangeBounds) { let range_start = match range.start_bound() { Bound::Included(&start) => start, Bound::Excluded(&start) => start + 1, Bound::Unbounded => 0, }; let range_end = match range.end_bound() { Bound::Included(&end) => end + 1, Bound::Excluded(&end) => end, Bound::Unbounded => self.blob_size, }; // Clamp range to blob size let start = std::cmp::max(range_start, 0); let end_exclusive = std::cmp::min(range_end, self.blob_size); self.byte_range = start..end_exclusive; debug!("Requested byte range: {}..{}", start, end_exclusive); } /// Size of the data to be downloaded in bytes. pub fn download_size(&self) -> u64 { self.byte_range.end - self.byte_range.start } pub fn into_stream(self) -> impl Stream>> { let BlobDownloadObject { byte_range, chunk_size, s3_path, s3_client, .. } = self; try_stream! { trace!("Starting download stream"); let mut offset: u64 = byte_range.start; while offset < byte_range.end { let next_size = std::cmp::min(chunk_size, byte_range.end - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); yield s3_client.get_object_bytes(&s3_path, range).await?; offset += next_size; } } } }