diff --git a/services/blob/src/config.rs b/services/blob/src/config.rs index f7293e353..f95a807a9 100644 --- a/services/blob/src/config.rs +++ b/services/blob/src/config.rs @@ -1,62 +1,70 @@ use anyhow::{ensure, Result}; use clap::{builder::FalseyValueParser, Parser}; use once_cell::sync::Lazy; use tracing::info; use crate::constants::{ - DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, LOCALSTACK_URL, SANDBOX_ENV_VAR, + DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, LOCALSTACK_URL, + S3_BUCKET_ENV_VAR, SANDBOX_ENV_VAR, }; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { /// gRPC server listening port #[arg(long, default_value_t = DEFAULT_GRPC_PORT)] pub grpc_port: u16, /// HTTP server listening port #[arg(long, default_value_t = DEFAULT_HTTP_PORT)] pub http_port: u16, /// Run the service in sandbox #[arg(long = "sandbox", default_value_t = false)] // support the env var for compatibility reasons #[arg(env = SANDBOX_ENV_VAR)] #[arg(value_parser = FalseyValueParser::new())] pub is_sandbox: bool, /// AWS Localstack service URL, applicable in sandbox mode #[arg(long, default_value_t = LOCALSTACK_URL.to_string())] pub localstack_url: String, + #[arg(env = S3_BUCKET_ENV_VAR)] + #[arg(long, default_value_t = DEFAULT_S3_BUCKET_NAME.to_string())] + pub s3_bucket_name: String, } /// Stores configuration parsed from command-line arguments /// and environment variables pub static CONFIG: Lazy = Lazy::new(AppConfig::parse); /// Processes the command-line arguments and environment variables. /// Should be called at the beginning of the `main()` function. pub(super) fn parse_cmdline_args() -> Result<()> { // force evaluation of the lazy initialized config let cfg = Lazy::force(&CONFIG); // Perform some additional validation for CLI args ensure!( cfg.grpc_port != cfg.http_port, "gRPC and HTTP ports cannot be the same: {}", cfg.grpc_port ); + + if cfg.s3_bucket_name != DEFAULT_S3_BUCKET_NAME { + info!("Using custom S3 bucket: {}", &cfg.s3_bucket_name); + } Ok(()) } /// Provides region/credentials configuration for AWS SDKs pub async fn load_aws_config() -> aws_types::SdkConfig { let mut config_builder = aws_config::from_env(); if CONFIG.is_sandbox { info!( "Running in sandbox environment. Localstack URL: {}", &CONFIG.localstack_url ); config_builder = config_builder.endpoint_url(&CONFIG.localstack_url); } config_builder.load().await } diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index d02f45e3e..bc9a2c8ad 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,74 +1,75 @@ // Assorted constants pub const DEFAULT_GRPC_PORT: u16 = 50051; pub const DEFAULT_HTTP_PORT: u16 = 51001; pub const LOCALSTACK_URL: &str = "http://localstack:4566"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; /// 4MB limit /// /// WARNING: use keeping in mind that grpc adds its own headers to messages /// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md /// so the message that actually is being sent over the network looks like this /// ``` /// [Compressed-Flag] [Message-Length] [Message] /// [Compressed-Flag] 1 byte - added by grpc /// [Message-Length] 4 bytes - added by grpc /// [Message] N bytes - actual data /// ``` /// so for every message we get 5 additional bytes of data /// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), /// gRPC stream may contain more than one message pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; /// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; // HTTP constants pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // DynamoDB constants pub mod db { /// Reserved holder value that indicates the row is a blob item pub const BLOB_ITEM_ROW_HOLDER_VALUE: &str = "_"; pub const BLOB_TABLE_NAME: &str = "blob-service-blobs"; pub const BLOB_PARTITION_KEY: &str = ATTR_BLOB_HASH; pub const BLOB_SORT_KEY: &str = ATTR_HOLDER; pub const UNCHECKED_INDEX_NAME: &str = "unchecked-index"; pub const UNCHECKED_INDEX_PARTITION_KEY: &str = ATTR_UNCHECKED; pub const UNCHECKED_INDEX_SORT_KEY: &str = ATTR_LAST_MODIFIED; /// attribute names pub const ATTR_BLOB_HASH: &str = "blob_hash"; pub const ATTR_HOLDER: &str = "holder"; pub const ATTR_CREATED_AT: &str = "created_at"; pub const ATTR_LAST_MODIFIED: &str = "last_modified"; pub const ATTR_S3_PATH: &str = "s3_path"; pub const ATTR_UNCHECKED: &str = "unchecked"; } // old DynamoDB constants pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path"; pub const BLOB_TABLE_CREATED_FIELD: &str = "created"; pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index"; pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder"; pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index"; // Environment variables pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // S3 constants -pub const BLOB_S3_BUCKET_NAME: &str = "commapp-blob"; +pub const S3_BUCKET_ENV_VAR: &str = "BLOB_S3_BUCKET_NAME"; +pub const DEFAULT_S3_BUCKET_NAME: &str = "commapp-blob"; pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; diff --git a/services/blob/src/database/old.rs b/services/blob/src/database/old.rs index dac3f5f82..4d5c743a9 100644 --- a/services/blob/src/database/old.rs +++ b/services/blob/src/database/old.rs @@ -1,305 +1,306 @@ use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::AttributeValue, }; use chrono::{DateTime, Utc}; use comm_services_lib::database; use std::{collections::HashMap, sync::Arc}; use tracing::error; use crate::{ + config::CONFIG, constants::{ BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, - BLOB_S3_BUCKET_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, - BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, + BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, + BLOB_TABLE_S3_PATH_FIELD, }, s3::S3Path, }; use super::errors::{BlobDBError, Error}; #[derive(Clone, Debug)] pub struct BlobItem { pub blob_hash: String, pub s3_path: S3Path, pub created: DateTime, } impl BlobItem { pub fn new(blob_hash: impl Into) -> Self { let hash_str = blob_hash.into(); BlobItem { blob_hash: hash_str.clone(), s3_path: S3Path { - bucket_name: BLOB_S3_BUCKET_NAME.to_string(), + bucket_name: CONFIG.s3_bucket_name.clone(), object_name: hash_str, }, created: Utc::now(), } } } #[derive(Clone, Debug)] pub struct ReverseIndexItem { pub holder: String, pub blob_hash: String, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } // Blob item pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> { let item = HashMap::from([ ( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( BLOB_TABLE_S3_PATH_FIELD.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), ( BLOB_TABLE_CREATED_FIELD.to_string(), AttributeValue::S(blob_item.created.to_rfc3339()), ), ]); self .client .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_blob_item( &self, blob_hash: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_hash.to_string()), )]); match self .client .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find blob item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let blob_hash = database::parse_string_attribute( BLOB_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_TABLE_BLOB_HASH_FIELD), )?; let s3_path = database::parse_string_attribute( BLOB_TABLE_S3_PATH_FIELD, item.remove(BLOB_TABLE_S3_PATH_FIELD), )?; let created = database::parse_datetime_attribute( BLOB_TABLE_CREATED_FIELD, item.remove(BLOB_TABLE_CREATED_FIELD), )?; Ok(Some(BlobItem { blob_hash, s3_path: S3Path::from_full_path(&s3_path) .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, created, })) } _ => Ok(None), } } pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_TABLE_NAME) .key( BLOB_TABLE_BLOB_HASH_FIELD, AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } // Reverse index item pub async fn put_reverse_index_item( &self, reverse_index_item: ReverseIndexItem, ) -> Result<(), Error> { let holder = &reverse_index_item.holder; if self.find_reverse_index_by_holder(holder).await?.is_some() { error!("Failed to put reverse index. Holder already exists."); return Err(Error::Blob(BlobDBError::HolderAlreadyExists( holder.to_string(), ))); } let item = HashMap::from([ ( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(reverse_index_item.holder), ), ( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(reverse_index_item.blob_hash), ), ]); self .client .put_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_reverse_index_by_holder( &self, holder: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(holder.to_string()), )]); match self .client .get_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_key(Some(item_key)) .consistent_read(true) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse index by holder"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; Ok(Some(ReverseIndexItem { holder, blob_hash })) } _ => Ok(None), } } pub async fn find_reverse_index_by_hash( &self, blob_hash: &str, ) -> Result, Error> { let response = self .client .query() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) .key_condition_expression("#blobHash = :valueToMatch") .expression_attribute_names( "#blobHash", BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, ) .expression_attribute_values( ":valueToMatch", AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse indexes by hash"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(vec![]); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for mut item in response.items.unwrap_or_default() { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; results.push(ReverseIndexItem { holder, blob_hash }); } return Ok(results); } pub async fn remove_reverse_index_item( &self, holder: &str, ) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .key( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, AttributeValue::S(holder.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } } diff --git a/services/blob/src/database/types.rs b/services/blob/src/database/types.rs index 88ac1b69c..e9e6c5cd3 100644 --- a/services/blob/src/database/types.rs +++ b/services/blob/src/database/types.rs @@ -1,244 +1,241 @@ use aws_sdk_dynamodb::types::AttributeValue; use chrono::{DateTime, Utc}; use comm_services_lib::database::{ parse_string_attribute, parse_timestamp_attribute, DBItemError, Value, }; use derive_more::Constructor; use std::collections::HashMap; -use crate::{ - constants::{db::*, BLOB_S3_BUCKET_NAME}, - s3::S3Path, -}; +use crate::{config::CONFIG, constants::db::*, s3::S3Path}; use super::errors::Error as DBError; /// Represents a database row in the DynamoDB SDK format pub(super) type RawAttributes = HashMap; /// A convenience Result type for database operations pub(super) type DBResult = Result; /// Represents a type-safe version of a DynamoDB blob table item. /// Each row can be either a blob item or a holder assignment. /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. pub enum DBRow { BlobItem(BlobItemRow), HolderAssignment(HolderAssignmentRow), } impl TryFrom for DBRow { type Error = DBError; fn try_from(attributes: RawAttributes) -> Result { let holder = parse_string_attribute( ATTR_HOLDER, attributes.get(ATTR_HOLDER).cloned(), )?; let row = match holder.as_str() { BLOB_ITEM_ROW_HOLDER_VALUE => DBRow::BlobItem(attributes.try_into()?), _ => DBRow::HolderAssignment(attributes.try_into()?), }; Ok(row) } } /// Represents an input payload for inserting a blob item into the database. /// This contains only the business logic related attributes #[derive(Debug)] pub struct BlobItemInput { pub blob_hash: String, pub s3_path: S3Path, } impl BlobItemInput { pub fn new(blob_hash: impl Into) -> Self { let blob_hash: String = blob_hash.into(); BlobItemInput { blob_hash: blob_hash.clone(), s3_path: S3Path { - bucket_name: BLOB_S3_BUCKET_NAME.into(), + bucket_name: CONFIG.s3_bucket_name.clone(), object_name: blob_hash, }, } } } /// A struct representing a blob item row in the table in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct BlobItemRow { pub blob_hash: String, pub s3_path: S3Path, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for BlobItemRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let blob_hash = parse_string_attribute( ATTR_BLOB_HASH, attributes.remove(ATTR_BLOB_HASH), )?; let s3_path = parse_string_attribute(ATTR_S3_PATH, attributes.remove(ATTR_S3_PATH))?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Blob)?; let s3_path = S3Path::from_full_path(&s3_path).map_err(DBError::from)?; Ok(BlobItemRow { blob_hash, s3_path, unchecked, created_at, last_modified, }) } } /// A struct representing a holder assignment table row in a type-safe way /// /// It implements the `TryFrom` trait to convert from raw DynamoDB /// `AttributeValue`s to the type-safe version. #[derive(Debug)] pub struct HolderAssignmentRow { pub blob_hash: String, pub holder: String, pub unchecked: bool, pub created_at: DateTime, pub last_modified: DateTime, } impl TryFrom for HolderAssignmentRow { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let holder = parse_string_attribute(ATTR_HOLDER, attributes.remove(ATTR_HOLDER))?; let blob_hash = parse_string_attribute( ATTR_BLOB_HASH, attributes.remove(ATTR_BLOB_HASH), )?; let created_at = parse_timestamp_attribute( ATTR_CREATED_AT, attributes.remove(ATTR_CREATED_AT), )?; let last_modified = parse_timestamp_attribute( ATTR_LAST_MODIFIED, attributes.remove(ATTR_LAST_MODIFIED), )?; let unchecked = is_raw_row_unchecked(&attributes, UncheckedKind::Holder)?; Ok(HolderAssignmentRow { blob_hash, holder, unchecked, created_at, last_modified, }) } } /// Represents a composite primary key for a DynamoDB table row /// /// It implements `TryFrom` and `Into` traits to conveniently use it /// in DynamoDB queries #[derive(Clone, Constructor, Debug)] pub struct PrimaryKey { pub blob_hash: String, pub holder: String, } impl PrimaryKey { /// Creates a primary key for a row containing a blob item data /// Rows queried by primary keys created by this function will /// be of type `BlobItemRow` pub fn for_blob_item(blob_hash: impl Into) -> Self { PrimaryKey { blob_hash: blob_hash.into(), holder: BLOB_ITEM_ROW_HOLDER_VALUE.to_string(), } } } impl TryFrom for PrimaryKey { type Error = DBError; fn try_from(mut attributes: RawAttributes) -> Result { let blob_hash = parse_string_attribute( ATTR_BLOB_HASH, attributes.remove(ATTR_BLOB_HASH), )?; let holder = parse_string_attribute(ATTR_HOLDER, attributes.remove(ATTR_HOLDER))?; Ok(PrimaryKey { blob_hash, holder }) } } // useful for convenient calls: // ddb.get_item().set_key(Some(partition_key.into())) impl Into for PrimaryKey { fn into(self) -> RawAttributes { HashMap::from([ ( ATTR_BLOB_HASH.to_string(), AttributeValue::S(self.blob_hash), ), (ATTR_HOLDER.to_string(), AttributeValue::S(self.holder)), ]) } } /// Represents possible values for the `unchecked` attribute value pub enum UncheckedKind { Blob, Holder, } impl UncheckedKind { pub fn str_value(&self) -> &'static str { match self { UncheckedKind::Blob => "blob", UncheckedKind::Holder => "holder", } } } impl Into for UncheckedKind { fn into(self) -> AttributeValue { AttributeValue::S(self.str_value().to_string()) } } fn is_raw_row_unchecked( row: &RawAttributes, kind: UncheckedKind, ) -> DBResult { let Some(AttributeValue::S(value)) = row.get(ATTR_UNCHECKED) else { // The unchecked attribute not exists return Ok(false); }; if value != kind.str_value() { // The unchecked attribute exists but has an incorrect value return Err(DBError::Attribute(DBItemError::new( ATTR_UNCHECKED, Value::String(value.to_string()), comm_services_lib::database::DBItemAttributeError::IncorrectType, ))); } Ok(true) }