diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index 4cf3e0bdc..4837d48b9 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,44 +1,53 @@ // Assorted constants pub const DEFAULT_HTTP_PORT: u16 = 50053; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; // HTTP constants pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // DynamoDB constants pub mod db { /// Reserved holder value that indicates the row is a blob item pub const BLOB_ITEM_ROW_HOLDER_VALUE: &str = "_"; pub const BLOB_TABLE_NAME: &str = "blob-service-blobs"; pub const BLOB_PARTITION_KEY: &str = ATTR_BLOB_HASH; pub const BLOB_SORT_KEY: &str = ATTR_HOLDER; pub const UNCHECKED_INDEX_NAME: &str = "unchecked-index"; pub const UNCHECKED_INDEX_PARTITION_KEY: &str = ATTR_UNCHECKED; pub const UNCHECKED_INDEX_SORT_KEY: &str = ATTR_LAST_MODIFIED; /// attribute names pub const ATTR_BLOB_HASH: &str = "blob_hash"; pub const ATTR_HOLDER: &str = "holder"; pub const ATTR_CREATED_AT: &str = "created_at"; pub const ATTR_LAST_MODIFIED: &str = "last_modified"; pub const ATTR_S3_PATH: &str = "s3_path"; pub const ATTR_UNCHECKED: &str = "unchecked"; } // Environment variables pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // S3 constants pub const S3_BUCKET_ENV_VAR: &str = "BLOB_S3_BUCKET_NAME"; pub const DEFAULT_S3_BUCKET_NAME: &str = "commapp-blob"; pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; pub const INVITE_LINK_BLOB_HASH_PREFIX: &str = "invite_"; + +// Error Types + +pub mod error_types { + pub const S3_ERROR: &str = "S3 Error"; + pub const DDB_ERROR: &str = "DDB Error"; + pub const HTTP_ERROR: &str = "HTTP Error"; + pub const OTHER_ERROR: &str = "Other Error"; +} diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs index a34559586..d419c1036 100644 --- a/services/blob/src/database/client.rs +++ b/services/blob/src/database/client.rs @@ -1,429 +1,436 @@ use aws_sdk_dynamodb::{ operation::put_item::PutItemOutput, types::{ AttributeValue, Delete, DeleteRequest, PutRequest, TransactWriteItem, Update, WriteRequest, }, Error as DynamoDBError, }; use chrono::Utc; use comm_lib::database::{ self, batch_operations::ExponentialBackoffConfig, TryFromAttribute, }; use std::collections::HashMap; use tracing::{debug, error, trace}; use crate::constants::db::*; +use crate::constants::error_types; use super::errors::{BlobDBError, Error as DBError}; use super::types::*; #[derive(Clone)] pub struct DatabaseClient { ddb: aws_sdk_dynamodb::Client, } /// public interface implementation impl DatabaseClient { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { DatabaseClient { ddb: aws_sdk_dynamodb::Client::new(aws_config), } } /// Gets a blob item row from the database by its blob hash /// Returns None if the blob item is not found pub async fn get_blob_item( &self, blob_hash: impl Into, ) -> DBResult> { let key = PrimaryKey::for_blob_item(blob_hash); self .get_raw_item(key.clone()) .await? .map(BlobItemRow::try_from) .transpose() } /// Inserts a new blob item row into the database. Returns Error /// if the item already exists. pub async fn put_blob_item(&self, blob_item: BlobItemInput) -> DBResult<()> { let item = HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( ATTR_HOLDER.to_string(), AttributeValue::S(BLOB_ITEM_ROW_HOLDER_VALUE.into()), ), ( ATTR_S3_PATH.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), (ATTR_UNCHECKED.to_string(), UncheckedKind::Blob.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes blob item row. Doesn't delete its holders. pub async fn delete_blob_item( &self, blob_hash: impl Into, ) -> DBResult<()> { let key = PrimaryKey::for_blob_item(blob_hash); self .ddb .delete_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to delete blob item: {:?}", err); DBError::AwsSdk(Box::new(err.into())) })?; Ok(()) } // Inserts a new holder assignment row into the database. Returns Error // if the item already exists or holder format is invalid. pub async fn put_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let item = HashMap::from([ (ATTR_BLOB_HASH.to_string(), AttributeValue::S(blob_hash)), (ATTR_HOLDER.to_string(), AttributeValue::S(holder)), (ATTR_UNCHECKED.to_string(), UncheckedKind::Holder.into()), ]); self.insert_item(item).await?; Ok(()) } /// Deletes a holder assignment row from the table. /// If the blob item for given holder assignment exists, it will be marked as unchecked. /// /// Returns Error if the holder format is invalid or race condition happened. /// Doesn't fail if the holder assignment didn't exist before. pub async fn delete_holder_assignment( &self, blob_hash: impl Into, holder: impl Into, ) -> DBResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); validate_holder(&holder)?; let mut transaction = Vec::new(); // delete the holder row let assignment_key = PrimaryKey { blob_hash: blob_hash.clone(), holder, }; let delete_request = Delete::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(assignment_key.into())) .build() .expect("key or table_name not set in Delete builder"); transaction .push(TransactWriteItem::builder().delete(delete_request).build()); // mark the blob item as unchecked if exists let blob_primary_key = PrimaryKey::for_blob_item(blob_hash); if self.get_raw_item(blob_primary_key.clone()).await?.is_some() { let update_request = Update::builder() .table_name(BLOB_TABLE_NAME) .set_key(Some(blob_primary_key.into())) // even though we checked that the blob item exists, we still need to check it again // using DDB built-in conditions in case it was deleted in meantime .condition_expression( "attribute_exists(#blob_hash) AND attribute_exists(#holder)", ) .update_expression("SET #unchecked = :unchecked, #last_modified = :now") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":unchecked", UncheckedKind::Blob.into()) .expression_attribute_values( ":now", AttributeValue::N(Utc::now().timestamp_millis().to_string()), ) .build() .expect( "key, table_name or update_expression not set in Update builder", ); transaction .push(TransactWriteItem::builder().update(update_request).build()); } self .ddb .transact_write_items() .set_transact_items(Some(transaction)) .send() .await .map_err(|err| { debug!("DynamoDB client failed to run transaction: {:?}", err); DBError::AwsSdk(Box::new(err.into())) })?; Ok(()) } /// Queries the table for a list of holders for given blob hash. /// Optionally limits the number of results. pub async fn list_blob_holders( &self, blob_hash: impl Into, limit: Option, ) -> DBResult> { let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .projection_expression("#holder") .key_condition_expression("#blob_hash = :blob_hash") .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .expression_attribute_values( ":blob_hash", AttributeValue::S(blob_hash.into()), ) .consistent_read(true) // we need to increase limit by 1 because the blob item itself can be fetched too // it is filtered-out later .set_limit(limit.map(|it| it + 1)) .send() .await .map_err(|err| { - error!("DynamoDB client failed to query holders: {:?}", err); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to query holders: {:?}", err + ); DBError::AwsSdk(Box::new(err.into())) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .filter_map(|mut row| { // filter out rows that are blob items // we cannot do it in key condition expression - it doesn't support the <> operator // filter expression doesn't work either - it doesn't support filtering by sort key match String::try_from_attr(ATTR_HOLDER, row.remove(ATTR_HOLDER)) { Ok(value) if value.as_str() == BLOB_ITEM_ROW_HOLDER_VALUE => None, holder => Some(holder), } }) .collect::, _>>() .map_err(DBError::Attribute) } /// Returns a list of primary keys for rows that already exist in the table pub async fn list_existing_keys( &self, keys: impl IntoIterator, ) -> DBResult> { database::batch_operations::batch_get( &self.ddb, BLOB_TABLE_NAME, keys, Some(format!("{}, {}", ATTR_BLOB_HASH, ATTR_HOLDER)), ExponentialBackoffConfig::default(), ) .await? .into_iter() .map(PrimaryKey::try_from) .collect::, _>>() } /// Returns a list of primary keys for "unchecked" items (blob / holder) /// that were last modified at least `min_age` ago. /// We need to specify if we want to get blob or holder items. pub async fn find_unchecked_items( &self, kind: UncheckedKind, min_age: chrono::Duration, ) -> DBResult> { let created_until = Utc::now() - min_age; let timestamp = created_until.timestamp_millis(); let response = self .ddb .query() .table_name(BLOB_TABLE_NAME) .index_name(UNCHECKED_INDEX_NAME) .key_condition_expression( "#unchecked = :kind AND #last_modified < :timestamp", ) .expression_attribute_names("#unchecked", ATTR_UNCHECKED) .expression_attribute_names("#last_modified", ATTR_LAST_MODIFIED) .expression_attribute_values(":kind", kind.into()) .expression_attribute_values( ":timestamp", AttributeValue::N(timestamp.to_string()), ) .send() .await .map_err(|err| { - error!("DynamoDB client failed to query unchecked items: {:?}", err); + error!( + errorType = error_types::DDB_ERROR, + "DynamoDB client failed to query unchecked items: {:?}", err + ); DBError::AwsSdk(Box::new(err.into())) })?; let Some(items) = response.items else { return Ok(vec![]); }; items .into_iter() .map(PrimaryKey::try_from) .collect::, _>>() } /// For all rows in specified set of primary keys, removes /// the "unchecked" attribute using PutItem operation in batch. pub async fn batch_mark_checked( &self, keys: impl IntoIterator, ) -> DBResult<()> { let items_to_mark = database::batch_operations::batch_get( &self.ddb, BLOB_TABLE_NAME, keys, None, ExponentialBackoffConfig::default(), ) .await?; let write_requests = items_to_mark .into_iter() .filter_map(|mut row| { // filter out rows that are already checked // to save some write capacity row.remove(ATTR_UNCHECKED)?; let put_request = PutRequest::builder() .set_item(Some(row)) .build() .expect("item not set in PutRequest builder"); let request = WriteRequest::builder().put_request(put_request).build(); Some(request) }) .collect(); database::batch_operations::batch_write( &self.ddb, BLOB_TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } /// Performs multiple DeleteItem operations in batch pub async fn batch_delete_rows( &self, keys: impl IntoIterator, ) -> DBResult<()> { let write_requests = keys .into_iter() .map(|key| { DeleteRequest::builder() .set_key(Some(key.into())) .build() .expect("key not set in DeleteRequest builder") }) .map(|request| WriteRequest::builder().delete_request(request).build()) .collect::>(); database::batch_operations::batch_write( &self.ddb, BLOB_TABLE_NAME, write_requests, ExponentialBackoffConfig::default(), ) .await?; Ok(()) } } // private helpers impl DatabaseClient { /// inserts a new item into the table using PutItem. Returns /// error if the item already exists async fn insert_item( &self, mut item: RawAttributes, ) -> DBResult { // add metadata attributes common for all types of rows let now = Utc::now().timestamp_millis(); item.insert( ATTR_CREATED_AT.to_string(), AttributeValue::N(now.to_string()), ); item.insert( ATTR_LAST_MODIFIED.to_string(), AttributeValue::N(now.to_string()), ); self .ddb .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) // make sure we don't accidentaly overwrite existing row .condition_expression( "attribute_not_exists(#blob_hash) AND attribute_not_exists(#holder)", ) .expression_attribute_names("#blob_hash", ATTR_BLOB_HASH) .expression_attribute_names("#holder", ATTR_HOLDER) .send() .await .map_err(|err| match DynamoDBError::from(err) { DynamoDBError::ConditionalCheckFailedException(e) => { debug!("DynamoDB client failed to insert: item already exists"); trace!("Conditional check failed with error: {}", e); DBError::ItemAlreadyExists } err => { debug!("DynamoDB client failed to insert: {:?}", err); DBError::AwsSdk(Box::new(err)) } }) } /// Gets a single row from the table using GetItem, without parsing it async fn get_raw_item( &self, key: PrimaryKey, ) -> DBResult> { self .ddb .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(key.into())) .send() .await .map_err(|err| { debug!("DynamoDB client failed to get item: {:?}", err); DBError::AwsSdk(Box::new(err.into())) }) .map(|response| response.item) } } fn validate_holder(holder: &str) -> DBResult<()> { if holder == BLOB_ITEM_ROW_HOLDER_VALUE { debug!("Invalid holder: {}", holder); return Err(DBError::Blob(BlobDBError::InvalidInput(holder.to_string()))); } Ok(()) } diff --git a/services/blob/src/http/errors.rs b/services/blob/src/http/errors.rs index d00164896..37ad90a42 100644 --- a/services/blob/src/http/errors.rs +++ b/services/blob/src/http/errors.rs @@ -1,86 +1,105 @@ use actix_web::error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; use actix_web::{Error as HttpError, HttpResponse, ResponseError}; use aws_sdk_dynamodb::Error as DynamoDBError; use http::StatusCode; use tracing::{debug, error, trace, warn}; +use crate::constants::error_types; use crate::database::errors::{BlobDBError, Error as DBError}; use crate::s3::Error as S3Error; use crate::service::BlobServiceError; pub(super) fn handle_blob_service_error(err: &BlobServiceError) -> HttpError { trace!("Handling blob service error: {:?}", err); match err { BlobServiceError::BlobNotFound => ErrorNotFound("not found"), BlobServiceError::BlobAlreadyExists | BlobServiceError::DB(DBError::ItemAlreadyExists) => { ErrorConflict("blob already exists") } BlobServiceError::DB(db_err) => match db_err { DBError::AwsSdk(aws_err) => match aws_err.as_ref() { DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } unexpected => { - error!("Received an unexpected AWS error: {0:?} - {0}", unexpected); + error!( + errorType = error_types::OTHER_ERROR, + "Received an unexpected AWS error: {0:?} - {0}", unexpected + ); ErrorInternalServerError("server error") } }, DBError::Blob(BlobDBError::InvalidInput(_)) => { ErrorBadRequest("bad request") } unexpected => { - error!("Received an unexpected DB error: {0:?} - {0}", unexpected); + error!( + errorType = error_types::DDB_ERROR, + "Received an unexpected DB error: {0:?} - {0}", unexpected + ); ErrorInternalServerError("server error") } }, BlobServiceError::S3(s3_err) => match s3_err { S3Error::AwsSdk(aws_err) => match aws_err.as_ref() { aws_sdk_s3::Error::NotFound(_) | aws_sdk_s3::Error::NoSuchKey(_) => { - error!("Data inconsistency! Blob is present in database but not present in S3!"); + error!( + errorType = error_types::S3_ERROR, + "Data inconsistency! Blob is present in database but not present in S3!" + ); ErrorInternalServerError("server error") } err => { - error!("Received an unexpected AWS S3 error: {0:?} - {0}", err); + error!( + errorType = error_types::S3_ERROR, + "Received an unexpected AWS S3 error: {0:?} - {0}", err + ); ErrorInternalServerError("server error") } }, S3Error::EmptyUpload => ErrorBadRequest("empty upload"), unexpected => { - error!("Received an unexpected S3 error: {0:?} - {0}", unexpected); + error!( + errorType = error_types::S3_ERROR, + "Received an unexpected S3 error: {0:?} - {0}", unexpected + ); ErrorInternalServerError("server error") } }, BlobServiceError::InputError(err) => { debug!("Received request input error: {0:?} - {0}", err); ErrorBadRequest("bad request") } BlobServiceError::InviteLinkError(invite_link_error) => { debug!("Received invite link error: {0}", invite_link_error); ErrorBadRequest("bad request") } err => { - error!("Received an unexpected error: {0:?} - {0}", err); + error!( + errorType = error_types::OTHER_ERROR, + "Received an unexpected error: {0:?} - {0}", err + ); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for BlobServiceError { fn error_response(&self) -> HttpResponse { handle_blob_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_blob_service_error(self) .as_response_error() .status_code() } } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs index e177d878e..ceef7b524 100644 --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,372 +1,398 @@ use aws_sdk_s3::{ operation::create_multipart_upload::CreateMultipartUploadOutput, primitives::ByteStream, types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}, Error as S3Error, }; use std::ops::{Bound, RangeBounds}; use tracing::{debug, error, trace}; +use crate::constants::error_types; + #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(Box), #[display(...)] ByteStream(std::io::Error), #[display(...)] InvalidPath(S3PathError), #[display(fmt = "There are no parts to upload")] EmptyUpload, #[display(fmt = "Missing upload ID")] MissingUploadID, } #[derive(Debug, derive_more::Error)] pub enum S3PathError { MissingSeparator(#[error(ignore)] String), MissingBucketName(#[error(ignore)] String), MissingObjectName(#[error(ignore)] String), } impl std::fmt::Display for S3PathError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { S3PathError::MissingSeparator(path) => { write!(f, "S3 path: [{}] should contain the '/' separator", path) } S3PathError::MissingBucketName(path) => { write!(f, "Expected bucket name in S3 path: [{}]", path) } S3PathError::MissingObjectName(path) => { write!(f, "Expected object name in S3 path: [{}]", path) } } } } type S3Result = Result; /// A helper structure representing an S3 object path #[derive(Clone, Debug)] pub struct S3Path { pub bucket_name: String, pub object_name: String, } impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { return Err(S3PathError::MissingSeparator(full_path.to_string())); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() .ok_or_else(|| S3PathError::MissingBucketName(full_path.to_string()))? .to_string(), object_name: split .next() .ok_or_else(|| S3PathError::MissingObjectName(full_path.to_string()))? .to_string(), }) } /// Retrieves full S3 path string in the following format: `[bucket_name]/[object_name]` pub fn to_full_path(&self) -> String { format!("{}/{}", self.bucket_name, self.object_name) } } impl From<&S3Path> for String { fn from(s3_path: &S3Path) -> Self { s3_path.to_full_path() } } impl TryFrom<&str> for S3Path { type Error = S3PathError; fn try_from(full_path: &str) -> Result { Self::from_full_path(full_path) } } #[derive(Clone)] pub struct S3Client { client: aws_sdk_s3::Client, } impl S3Client { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { let s3_config = aws_sdk_s3::config::Builder::from(aws_config) // localstack doesn't support virtual addressing .force_path_style(crate::config::CONFIG.localstack_endpoint.is_some()) .build(); S3Client { client: aws_sdk_s3::Client::from_conf(s3_config), } } /// Creates a new [`MultiPartUploadSession`] pub async fn start_upload_session( &self, s3_path: &S3Path, ) -> S3Result { MultiPartUploadSession::start(&self.client, s3_path).await } /// Returns object metadata (e.g. file size) without downloading the object itself pub async fn get_object_metadata( &self, s3_path: &S3Path, ) -> S3Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() .await .map_err(|e| { - error!("S3 failed to get object metadata"); + error!( + errorType = error_types::S3_ERROR, + "S3 failed to get object metadata" + ); Error::AwsSdk(Box::new(e.into())) })?; Ok(response) } /// Downloads object and retrieves data bytes within provided range /// /// * `range` - Range of object bytes to download. pub async fn get_object_bytes( &self, s3_path: &S3Path, range: impl RangeBounds, ) -> S3Result> { let mut request = self .client .get_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name); if range.start_bound() != Bound::Unbounded || range.end_bound() != Bound::Unbounded { // Create a valid HTTP Range header let from = match range.start_bound() { Bound::Included(start) => start.to_string(), _ => "0".to_string(), }; let to = match range.end_bound() { Bound::Included(end) => end.to_string(), Bound::Excluded(end) => (end - 1).to_string(), _ => "".to_string(), }; let range = format!("bytes={}-{}", from, to); request = request.range(range); } let response = request.send().await.map_err(|e| { - error!("S3 failed to get object"); + error!(errorType = error_types::S3_ERROR, "S3 failed to get object"); Error::AwsSdk(Box::new(e.into())) })?; let data = response.body.collect().await.map_err(|e| { - error!("S3 failed to stream object bytes"); + error!( + errorType = error_types::S3_ERROR, + "S3 failed to stream object bytes" + ); Error::ByteStream(e.into()) })?; Ok(data.to_vec()) } /// Deletes object at provided path pub async fn delete_object(&self, s3_path: &S3Path) -> S3Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { - error!("S3 failed to delete object"); + error!( + errorType = error_types::S3_ERROR, + "S3 failed to delete object" + ); Error::AwsSdk(Box::new(e.into())) })?; Ok(()) } pub async fn batch_delete_objects(&self, paths: Vec) -> S3Result<()> { let Some(first_path) = paths.first() else { debug!("No S3 objects to delete"); return Ok(()); }; let bucket_name = &first_path.bucket_name; let objects = paths .iter() .map(|path| { ObjectIdentifier::builder() .key(&path.object_name) .build() .expect("key not set in ObjectIdentifier builder") }) .collect(); self .client .delete_objects() .bucket(bucket_name) .delete( Delete::builder() .set_objects(Some(objects)) .build() .expect("Objects not set in Delete builder"), ) .send() .await .map_err(|e| { - error!("S3 failed to batch delete objects"); + error!( + errorType = error_types::S3_ERROR, + "S3 failed to batch delete objects" + ); Error::AwsSdk(Box::new(e.into())) })?; Ok(()) } } /// Represents a multipart upload session to the AWS S3 pub struct MultiPartUploadSession { client: aws_sdk_s3::Client, bucket_name: String, object_name: String, upload_id: String, upload_parts: Vec, } impl MultiPartUploadSession { /// Starts a new upload session and returns its instance /// Don't call this directly, use [`S3Client::start_upload_session()`] instead async fn start( client: &aws_sdk_s3::Client, s3_path: &S3Path, ) -> S3Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { - error!("S3 failed to start upload session"); + error!( + errorType = error_types::S3_ERROR, + "S3 failed to start upload session" + ); Error::AwsSdk(Box::new(e.into())) })?; let upload_id = multipart_upload_res.upload_id().ok_or_else(|| { - error!("Upload ID expected to be present"); + error!( + errorType = error_types::S3_ERROR, + "Upload ID expected to be present" + ); Error::MissingUploadID })?; debug!("Started multipart upload session with ID: {}", upload_id); Ok(MultiPartUploadSession { client: client.clone(), bucket_name: String::from(&s3_path.bucket_name), object_name: String::from(&s3_path.object_name), upload_id: String::from(upload_id), upload_parts: Vec::new(), }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> S3Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self .client .upload_part() .key(&self.object_name) .bucket(&self.bucket_name) .upload_id(&self.upload_id) .part_number(part_number) .body(stream) .send() .await .map_err(|e| { - error!("Failed to add upload part"); + error!( + errorType = error_types::S3_ERROR, + "Failed to add upload part" + ); Error::AwsSdk(Box::new(e.into())) })?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) .part_number(part_number) .build(); trace!( upload_id = self.upload_id, e_tag = completed_part.e_tag.as_deref().unwrap_or("N/A"), "Uploaded part {}.", part_number ); self.upload_parts.push(completed_part); Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> S3Result<()> { if self.upload_parts.is_empty() { return Err(Error::EmptyUpload); } let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build(); self .client .complete_multipart_upload() .bucket(&self.bucket_name) .key(&self.object_name) .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() .await .map_err(|e| { - error!("Failed to finish upload session"); + error!( + errorType = error_types::S3_ERROR, + "Failed to finish upload session" + ); Error::AwsSdk(Box::new(e.into())) })?; debug!(upload_id = self.upload_id, "Multipart upload complete"); Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_s3path_from_full_path() { let full_path = "my_bucket/some_object"; let s3_path = S3Path::from_full_path(full_path); assert!(s3_path.is_ok()); let s3_path = s3_path.unwrap(); assert_eq!(&s3_path.bucket_name, "my_bucket"); assert_eq!(&s3_path.object_name, "some_object"); } #[test] fn test_s3path_from_invalid_path() { let result = S3Path::from_full_path("invalid_path"); assert!(result.is_err()) } #[test] fn test_s3path_to_full_path() { let s3_path = S3Path { bucket_name: "my_bucket".to_string(), object_name: "some_object".to_string(), }; let full_path = s3_path.to_full_path(); assert_eq!(full_path, "my_bucket/some_object"); } } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index c0587ae6f..1d6717e40 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,634 +1,643 @@ #![allow(unused)] use regex::RegexSet; use std::collections::{BTreeMap, HashSet}; use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; use async_stream::try_stream; use chrono::Duration; use comm_lib::http::ByteStream; use comm_lib::shared::reserved_users::RESERVED_USERNAME_SET; use comm_lib::tools::BoxedError; use once_cell::sync::Lazy; use tokio_stream::StreamExt; use tonic::codegen::futures_core::Stream; use tracing::{debug, error, info, trace, warn}; use crate::config::{CONFIG, OFFENSIVE_INVITE_LINKS}; use crate::constants::{ INVITE_LINK_BLOB_HASH_PREFIX, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }; use crate::database::types::{ BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, }; use crate::database::DBError; use crate::s3::{Error as S3Error, S3Client, S3Path}; use crate::tools::MemOps; -use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient}; +use crate::{ + constants::error_types, constants::BLOB_DOWNLOAD_CHUNK_SIZE, + database::DatabaseClient, +}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum InviteLinkError { Reserved, Offensive, } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum BlobServiceError { BlobNotFound, BlobAlreadyExists, InvalidState, DB(DBError), S3(S3Error), InputError(#[error(ignore)] BoxedError), InviteLinkError(InviteLinkError), } type BlobServiceResult = Result; #[derive(Clone, Debug)] pub struct BlobServiceConfig { /// Blob data is streamed from S3 in chunks of this size. pub download_chunk_size: usize, /// If enabled, orphaned blobs will be deleted immediately after /// last holder is removed. This option should be enabled /// if maintenance garbage collection tasks are not run. pub instant_delete_orphaned_blobs: bool, /// Minimum age that a orphan must stay unmodified /// before it can be deleted by a garbage collection task /// This option is ignored if `instant_delete_orphaned_blobs` is `true` pub orphan_protection_period: chrono::Duration, } static OFFENSIVE_INVITE_LINKS_REGEX_SET: Lazy = Lazy::new(|| { RegexSet::new(OFFENSIVE_INVITE_LINKS.iter().collect::>()).unwrap() }); impl Default for BlobServiceConfig { fn default() -> Self { BlobServiceConfig { download_chunk_size: BLOB_DOWNLOAD_CHUNK_SIZE as usize, instant_delete_orphaned_blobs: false, orphan_protection_period: Duration::hours(1), } } } #[derive(Clone)] pub struct BlobService { db: Arc, s3: S3Client, config: BlobServiceConfig, } impl BlobService { pub fn new( db: DatabaseClient, s3: S3Client, config: BlobServiceConfig, ) -> Self { Self { db: Arc::new(db), s3, config, } } /// Retrieves blob object metadata and returns a download object /// that can be used to download the blob data. pub async fn create_download( &self, blob_hash: impl Into, ) -> BlobServiceResult { // 1. Get S3 path let s3_path = match self.db.get_blob_item(blob_hash.into()).await { Ok(Some(BlobItemRow { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("Blob not found"); Err(BlobServiceError::BlobNotFound) } Err(err) => Err(BlobServiceError::DB(err)), }?; debug!("S3 path: {:?}", s3_path); // 2. Get S3 Object metadata trace!("Getting S3 object metadata..."); let object_metadata = self.s3.get_object_metadata(&s3_path).await?; let blob_size = object_metadata .content_length() .ok_or_else(|| { - error!("Failed to get S3 object content length"); + error!( + errorType = error_types::S3_ERROR, + "Failed to get S3 object content length" + ); BlobServiceError::InvalidState }) .and_then(|len| { if len >= 0 { Ok(len as u64) } else { - error!("S3 object content length is negative"); + error!( + errorType = error_types::S3_ERROR, + "S3 object content length is negative" + ); Err(BlobServiceError::InvalidState) } })?; debug!("S3 object size: {} bytes", blob_size); // 3. Create download session let session = BlobDownloadObject { s3_path, blob_size, byte_range: 0..blob_size, chunk_size: self.config.download_chunk_size as u64, s3_client: self.s3.clone(), }; Ok(session) } fn validate_invite_link_blob_hash( invite_secret: &str, ) -> Result<(), BlobServiceError> { let lowercase_secret = invite_secret.to_lowercase(); if (OFFENSIVE_INVITE_LINKS_REGEX_SET.is_match(&lowercase_secret)) { debug!("Offensive invite link"); return Err(BlobServiceError::InviteLinkError( InviteLinkError::Offensive, )); } Ok(()) } pub async fn put_blob( &self, blob_hash: impl Into, mut blob_data_stream: impl ByteStream, ) -> Result<(), BlobServiceError> { let blob_hash: String = blob_hash.into(); let blob_item = BlobItemInput::new(&blob_hash); if self.db.get_blob_item(&blob_hash).await?.is_some() { debug!("Blob already exists"); return Err(BlobServiceError::BlobAlreadyExists); } if let Some(invite_secret) = blob_hash.strip_prefix(INVITE_LINK_BLOB_HASH_PREFIX) { Self::validate_invite_link_blob_hash(invite_secret)?; } let mut upload_session = self.s3.start_upload_session(&blob_item.s3_path).await?; trace!(?blob_item, "Started S3 upload session"); tokio::pin!(blob_data_stream); let mut s3_chunk: Vec = Vec::new(); while let Some(chunk) = blob_data_stream.try_next().await.map_err(|err| { warn!("Failed to get data chunk: {:?}", err); BlobServiceError::InputError(err) })? { s3_chunk.extend_from_slice(&chunk); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!( chunk_size = s3_chunk.len(), "Chunk size exceeded, adding new S3 part" ); upload_session .add_part(s3_chunk.take_out()) .await .map_err(BlobServiceError::from)?; } } trace!("Upload stream drained"); // add the remaining data as the last S3 part if !s3_chunk.is_empty() { trace!("Uploading remaining {} bytes", s3_chunk.len()); upload_session.add_part(s3_chunk).await?; } // Complete the upload session upload_session.finish_upload().await?; trace!("S3 upload complete, putting item to db"); self.db.put_blob_item(blob_item).await?; Ok(()) } pub async fn assign_holder( &self, blob_hash: impl Into, holder: impl Into, ) -> BlobServiceResult { let blob_hash: String = blob_hash.into(); trace!(blob_hash, "Attempting to assign holder"); self .db .put_holder_assignment(blob_hash.clone(), holder.into()) .await?; trace!("Holder assigned. Checking if data exists"); let data_exists = self.db.get_blob_item(blob_hash).await?.is_some(); Ok(data_exists) } pub async fn revoke_holder( &self, blob_hash: impl Into, holder: impl Into, instant_delete: bool, ) -> BlobServiceResult<()> { let blob_hash: String = blob_hash.into(); let holder: String = holder.into(); trace!(blob_hash, holder, "Attempting to revoke holder"); self.db.delete_holder_assignment(&blob_hash, holder).await?; if instant_delete || self.config.instant_delete_orphaned_blobs { trace!("Instant orphan deletion enabled. Looking for holders"); let is_orphan = self .db .list_blob_holders(&blob_hash, Some(1)) .await? .is_empty(); if !is_orphan { trace!("Found holders, nothing to do"); return Ok(()); } debug!("No holders left, deleting blob if exists"); trace!("Getting blob item"); let Some(blob_item) = self.db.get_blob_item(&blob_hash).await? else { trace!("Blob item not found, nothing to do"); return Ok(()); }; trace!("Deleting S3 object"); self.s3.delete_object(&blob_item.s3_path).await?; trace!("Deleting blob item entry from DB"); self.db.delete_blob_item(blob_hash).await?; } Ok(()) } pub async fn perform_cleanup(&self) -> anyhow::Result<()> { info!("Starting cleanup..."); // 1. Fetch blobs and holders marked as "unchecked" debug!("Querying for unchecked blobs and holders..."); let protection_periond = self.config.orphan_protection_period; let (unchecked_blobs, unchecked_holders) = tokio::try_join!( self .db .find_unchecked_items(UncheckedKind::Blob, protection_periond), self .db .find_unchecked_items(UncheckedKind::Holder, protection_periond) )?; debug!( "Found {} unchecked blobs and {} unchecked holders", unchecked_blobs.len(), unchecked_holders.len() ); let mut unchecked_items = UncheckedCollection::new(); // 2. construct structures of possibly orphaned blobs debug!("Creating structures of possibly orphaned items..."); for PrimaryKey { blob_hash, .. } in unchecked_blobs { trace!("Creating unchecked item for blob hash '{}'", &blob_hash); unchecked_items.insert( blob_hash.clone(), UncheckedItem { blob_hash: Some(blob_hash), holders: Vec::new(), }, ); } // 3. iterate over possibly orphaned holders and fill the structs for PrimaryKey { blob_hash, holder } in unchecked_holders { if let Some(item) = unchecked_items.get_mut(&blob_hash) { trace!( "Inserting holder '{}' for blob hash '{}'", &holder, &blob_hash ); item.holders.push(holder); } else { trace!( "Creating empty item for holder '{}' (blob hash: '{}')", &holder, &blob_hash ); unchecked_items.insert( blob_hash.clone(), UncheckedItem { blob_hash: None, holders: vec![holder], }, ); } } let mut orphans = HashSet::new(); let mut checked = HashSet::new(); // 4. Filter out items that are for sure not orphaned let checked_items = unchecked_items.filter_out_checked(); debug!("Filtered out {} checked items", checked_items.len()); checked.extend(checked_items); // 5. Query DDB for additional blobs and holders to check if they exist let mut fetch_results = Vec::new(); // 5a. Query holders - Find if possibly orphan blobs have at least one holder debug!("Querying holders for possibly orphaned blobs..."); for blob_hash in unchecked_items.blobs_to_find_holders() { let holders = self .db .list_blob_holders(blob_hash, Some(1)) .await? .into_iter() .map(|holder| PrimaryKey::new(blob_hash.to_string(), holder)); let len_before = fetch_results.len(); fetch_results.extend(holders); trace!( "Found {} holders for blob hash '{}'", fetch_results.len() - len_before, blob_hash ); } // 5b. Query blobs - Find if possibly orphaned holders have blobs debug!("Querying blobs for possibly orphaned holders..."); let blobs_to_get = unchecked_items.blobs_to_check_existence(); let queried_blobs_len = blobs_to_get.len(); let existing_blobs = self.db.list_existing_keys(blobs_to_get).await?; debug!( "Found {} existing blobs out of {} queried", existing_blobs.len(), queried_blobs_len ); fetch_results.extend(existing_blobs); // 6. Update the struct with query results // Then do 2nd pass of filtering out checked items (repeat step 4) debug!("Feeding data structure with query results and filtering again..."); unchecked_items.feed_with_query_results(fetch_results); let checked_items = unchecked_items.filter_out_checked(); debug!("Filtered out {} checked items", checked_items.len()); checked.extend(checked_items); // 7. Perform actual cleanup orphans.extend(unchecked_items.into_primary_keys()); let s3_paths: Vec = orphans .iter() .filter(|pk| pk.is_blob_item()) .map(|PrimaryKey { blob_hash, .. }| S3Path { bucket_name: CONFIG.s3_bucket_name.clone(), object_name: blob_hash.to_string(), }) .collect(); let num_orphans = orphans.len(); let num_checked = checked.len(); let num_s3_blobs = s3_paths.len(); // 7a. Make changes to database debug!("Cleaning up database... Marking {} items as checked and deleting {} orphans", num_checked, num_orphans); tokio::try_join!( self.db.batch_delete_rows(orphans), self.db.batch_mark_checked(checked) )?; // 7b. Delete orphaned blobs from S3 debug!("Cleaning up S3... Deleting {} blobs", num_s3_blobs); self.s3.batch_delete_objects(s3_paths).await?; info!( "Cleanup complete. Deleted orphaned {} DB items and marked {} items as checked. {} blobs were deleted from S3", num_orphans, num_checked, num_s3_blobs ); Ok(()) } } // A B-tree map performs well for both random and sequential access. type BlobHash = String; type UncheckedCollection = BTreeMap; /// Represents an "unchecked" blob entity. It might miss either /// blob hash or holders. #[derive(Debug)] struct UncheckedItem { blob_hash: Option, holders: Vec, } impl UncheckedItem { fn has_blob_hash(&self) -> bool { self.blob_hash.is_some() } fn has_holders(&self) -> bool { !self.holders.is_empty() } /// Returns primary keys for this item. It contains primary heys for holders /// and for blob item (if it has hash). /// A fallback hash is required for holders if item's blob hash is None. fn as_primary_keys(&self, fallback_blob_hash: &str) -> Vec { if !self.has_holders() && !self.has_blob_hash() { warn!( fallback_blob_hash, "Item has no hash and no holders, this should never happen!" ); return Vec::new(); } let hash_for_holders = self.blob_hash.as_deref().unwrap_or(fallback_blob_hash); let mut keys = self .holders .iter() .map(|holder| PrimaryKey { blob_hash: hash_for_holders.to_string(), holder: holder.to_string(), }) .collect::>(); if let Some(blob_hash) = &self.blob_hash { keys.push(PrimaryKey::for_blob_item(blob_hash.to_string())); } keys } } trait CleanupOperations { /// Retains only items that should remain unchecked /// (missing blob hash or holders). /// /// Returns removed items - these items are checked /// (contain both blob hash and at least one holder). fn filter_out_checked(&mut self) -> Vec; /// Returns list of blob hashes for which we need to query if they contain /// at least one holder fn blobs_to_find_holders(&self) -> Vec<&BlobHash>; /// Returns primary keys for blob items that need to be checked if they exist /// /// Technically, this returns all items that have holders but no hash. fn blobs_to_check_existence(&self) -> Vec; /// Updates the structure after fetching additional data from database. fn feed_with_query_results( &mut self, fetched_items: impl IntoIterator, ); /// Turns this collection into a list of DB primary keys fn into_primary_keys(self) -> Vec; } impl CleanupOperations for UncheckedCollection { /// Retains only items that should remain unchecked /// (missing blob hash or holders). /// /// Returns removed items - these items are checked /// (contain both blob hash and at least one holder). fn filter_out_checked(&mut self) -> Vec { let mut checked = Vec::new(); self.retain(|blob_hash, item| { if !item.has_blob_hash() || !item.has_holders() { // blob hash or holder missing, leave unchecked return true; } checked.extend(item.as_primary_keys(blob_hash)); false }); checked } /// Returns list of blob hashes for which we need to query if they contain /// at least one holder fn blobs_to_find_holders(&self) -> Vec<&BlobHash> { self .iter() .filter_map(|(blob_hash, item)| { if item.has_blob_hash() && !item.has_holders() { Some(blob_hash) } else { None } }) .collect() } /// Returns primary keys for blob items that need to be checked if they exist /// /// Technically, this returns all blob items that have holders but no hash. fn blobs_to_check_existence(&self) -> Vec { self .iter() .filter_map(|(blob_hash, item)| { if item.has_holders() && !item.has_blob_hash() { Some(PrimaryKey::for_blob_item(blob_hash)) } else { None } }) .collect() } /// Updates the structure after fetching additional data from database. fn feed_with_query_results( &mut self, fetched_items: impl IntoIterator, ) { for pk in fetched_items.into_iter() { let Some(item) = self.get_mut(&pk.blob_hash) else { warn!("Got fetched item that was not requested: {:?}", pk); continue; }; if pk.is_blob_item() { item.blob_hash = Some(pk.blob_hash) } else { item.holders.push(pk.holder); } } } fn into_primary_keys(self) -> Vec { self .into_iter() .flat_map(|(blob_hash, item)| item.as_primary_keys(&blob_hash)) .collect() } } pub struct BlobDownloadObject { /// Size of the whole blob object in bytes. pub blob_size: u64, /// Range of bytes to be downloaded (exclusive end). byte_range: Range, chunk_size: u64, s3_client: S3Client, s3_path: S3Path, } impl BlobDownloadObject { pub fn set_byte_range(&mut self, range: impl RangeBounds) { let range_start = match range.start_bound() { Bound::Included(&start) => start, Bound::Excluded(&start) => start + 1, Bound::Unbounded => 0, }; let range_end = match range.end_bound() { Bound::Included(&end) => end + 1, Bound::Excluded(&end) => end, Bound::Unbounded => self.blob_size, }; // Clamp range to blob size let start = std::cmp::max(range_start, 0); let end_exclusive = std::cmp::min(range_end, self.blob_size); self.byte_range = start..end_exclusive; debug!("Requested byte range: {}..{}", start, end_exclusive); } /// Size of the data to be downloaded in bytes. pub fn download_size(&self) -> u64 { self.byte_range.end - self.byte_range.start } pub fn into_stream(self) -> impl Stream>> { let BlobDownloadObject { byte_range, chunk_size, s3_path, s3_client, .. } = self; try_stream! { trace!("Starting download stream"); let mut offset: u64 = byte_range.start; while offset < byte_range.end { let next_size = std::cmp::min(chunk_size, byte_range.end - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); yield s3_client.get_object_bytes(&s3_path, range).await?; offset += next_size; } } } }