diff --git a/services/blob/src/config.rs b/services/blob/src/config.rs index f95a807a9..8ae653781 100644 --- a/services/blob/src/config.rs +++ b/services/blob/src/config.rs @@ -1,70 +1,62 @@ use anyhow::{ensure, Result}; -use clap::{builder::FalseyValueParser, Parser}; +use clap::Parser; use once_cell::sync::Lazy; use tracing::info; use crate::constants::{ - DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, LOCALSTACK_URL, - S3_BUCKET_ENV_VAR, SANDBOX_ENV_VAR, + DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, + S3_BUCKET_ENV_VAR, }; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { /// gRPC server listening port #[arg(long, default_value_t = DEFAULT_GRPC_PORT)] pub grpc_port: u16, /// HTTP server listening port #[arg(long, default_value_t = DEFAULT_HTTP_PORT)] pub http_port: u16, - /// Run the service in sandbox - #[arg(long = "sandbox", default_value_t = false)] - // support the env var for compatibility reasons - #[arg(env = SANDBOX_ENV_VAR)] - #[arg(value_parser = FalseyValueParser::new())] - pub is_sandbox: bool, - /// AWS Localstack service URL, applicable in sandbox mode - #[arg(long, default_value_t = LOCALSTACK_URL.to_string())] - pub localstack_url: String, + /// AWS Localstack service URL + #[arg(env = "LOCALSTACK_ENDPOINT")] + #[arg(long)] + pub localstack_endpoint: Option, #[arg(env = S3_BUCKET_ENV_VAR)] #[arg(long, default_value_t = DEFAULT_S3_BUCKET_NAME.to_string())] pub s3_bucket_name: String, } /// Stores configuration parsed from command-line arguments /// and environment variables pub static CONFIG: Lazy = Lazy::new(AppConfig::parse); /// Processes the command-line arguments and environment variables. /// Should be called at the beginning of the `main()` function. pub(super) fn parse_cmdline_args() -> Result<()> { // force evaluation of the lazy initialized config let cfg = Lazy::force(&CONFIG); // Perform some additional validation for CLI args ensure!( cfg.grpc_port != cfg.http_port, "gRPC and HTTP ports cannot be the same: {}", cfg.grpc_port ); if cfg.s3_bucket_name != DEFAULT_S3_BUCKET_NAME { info!("Using custom S3 bucket: {}", &cfg.s3_bucket_name); } Ok(()) } /// Provides region/credentials configuration for AWS SDKs pub async fn load_aws_config() -> aws_types::SdkConfig { let mut config_builder = aws_config::from_env(); - if CONFIG.is_sandbox { - info!( - "Running in sandbox environment. Localstack URL: {}", - &CONFIG.localstack_url - ); - config_builder = config_builder.endpoint_url(&CONFIG.localstack_url); + if let Some(endpoint) = &CONFIG.localstack_endpoint { + info!("Using Localstack. AWS endpoint URL: {}", endpoint); + config_builder = config_builder.endpoint_url(endpoint); } config_builder.load().await } diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index bc9a2c8ad..63b98d413 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,75 +1,73 @@ // Assorted constants pub const DEFAULT_GRPC_PORT: u16 = 50051; pub const DEFAULT_HTTP_PORT: u16 = 51001; -pub const LOCALSTACK_URL: &str = "http://localstack:4566"; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; /// 4MB limit /// /// WARNING: use keeping in mind that grpc adds its own headers to messages /// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md /// so the message that actually is being sent over the network looks like this /// ``` /// [Compressed-Flag] [Message-Length] [Message] /// [Compressed-Flag] 1 byte - added by grpc /// [Message-Length] 4 bytes - added by grpc /// [Message] N bytes - actual data /// ``` /// so for every message we get 5 additional bytes of data /// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), /// gRPC stream may contain more than one message pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; /// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; // HTTP constants pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // DynamoDB constants pub mod db { /// Reserved holder value that indicates the row is a blob item pub const BLOB_ITEM_ROW_HOLDER_VALUE: &str = "_"; pub const BLOB_TABLE_NAME: &str = "blob-service-blobs"; pub const BLOB_PARTITION_KEY: &str = ATTR_BLOB_HASH; pub const BLOB_SORT_KEY: &str = ATTR_HOLDER; pub const UNCHECKED_INDEX_NAME: &str = "unchecked-index"; pub const UNCHECKED_INDEX_PARTITION_KEY: &str = ATTR_UNCHECKED; pub const UNCHECKED_INDEX_SORT_KEY: &str = ATTR_LAST_MODIFIED; /// attribute names pub const ATTR_BLOB_HASH: &str = "blob_hash"; pub const ATTR_HOLDER: &str = "holder"; pub const ATTR_CREATED_AT: &str = "created_at"; pub const ATTR_LAST_MODIFIED: &str = "last_modified"; pub const ATTR_S3_PATH: &str = "s3_path"; pub const ATTR_UNCHECKED: &str = "unchecked"; } // old DynamoDB constants pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path"; pub const BLOB_TABLE_CREATED_FIELD: &str = "created"; pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index"; pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder"; pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index"; // Environment variables -pub const SANDBOX_ENV_VAR: &str = "COMM_SERVICES_SANDBOX"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // S3 constants pub const S3_BUCKET_ENV_VAR: &str = "BLOB_S3_BUCKET_NAME"; pub const DEFAULT_S3_BUCKET_NAME: &str = "commapp-blob"; pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index a58c6b66a..78e7151c7 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,57 +1,58 @@ use crate::{config::CONFIG, service::BlobService}; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod errors; mod utils; mod handlers { pub(super) mod blob; } fn cors_config() -> Cors { - if CONFIG.is_sandbox { + // For local development, use relaxed CORS config + if CONFIG.localstack_endpoint.is_some() { // All origins, methods, request headers and exposed headers allowed. // Credentials supported. Max age 1 hour. Does not send wildcard. return Cors::permissive(); } Cors::default() .allowed_origin("https://web.comm.app") // for local development using prod service .allowed_origin("http://localhost:3000") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allow_any_header() .expose_any_header() } pub async fn run_http_server(blob_service: BlobService) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(cors_config()) .app_data(web::Data::new(blob_service.to_owned())) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)), ) .service( web::resource("/blob") .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)) .route(web::delete().to(handlers::blob::remove_holder_handler)), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/blob/src/s3.rs b/services/blob/src/s3.rs index 03fa67968..fcb24edf6 100644 --- a/services/blob/src/s3.rs +++ b/services/blob/src/s3.rs @@ -1,338 +1,338 @@ use aws_sdk_s3::{ operation::create_multipart_upload::CreateMultipartUploadOutput, primitives::ByteStream, types::{CompletedMultipartUpload, CompletedPart}, Error as S3Error, }; use std::{ ops::{Bound, RangeBounds}, sync::Arc, }; use tracing::{debug, error, trace}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(S3Error), #[display(...)] ByteStream(std::io::Error), #[display(...)] InvalidPath(S3PathError), #[display(fmt = "There are no parts to upload")] EmptyUpload, #[display(fmt = "Missing upload ID")] MissingUploadID, } #[derive(Debug, derive_more::Error)] pub enum S3PathError { MissingSeparator(#[error(ignore)] String), MissingBucketName(#[error(ignore)] String), MissingObjectName(#[error(ignore)] String), } impl std::fmt::Display for S3PathError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { S3PathError::MissingSeparator(path) => { write!(f, "S3 path: [{}] should contain the '/' separator", path) } S3PathError::MissingBucketName(path) => { write!(f, "Expected bucket name in S3 path: [{}]", path) } S3PathError::MissingObjectName(path) => { write!(f, "Expected object name in S3 path: [{}]", path) } } } } type S3Result = Result; /// A helper structure representing an S3 object path #[derive(Clone, Debug)] pub struct S3Path { pub bucket_name: String, pub object_name: String, } impl S3Path { /// Constructs an [`S3Path`] from given string /// The path should be in the following format: `[bucket_name]/[object_name]` pub fn from_full_path(full_path: &str) -> Result { if !full_path.contains('/') { return Err(S3PathError::MissingSeparator(full_path.to_string())); } let mut split = full_path.split('/'); Ok(S3Path { bucket_name: split .next() .ok_or_else(|| S3PathError::MissingBucketName(full_path.to_string()))? .to_string(), object_name: split .next() .ok_or_else(|| S3PathError::MissingObjectName(full_path.to_string()))? .to_string(), }) } /// Retrieves full S3 path string in the following format: `[bucket_name]/[object_name]` pub fn to_full_path(&self) -> String { format!("{}/{}", self.bucket_name, self.object_name) } } impl From<&S3Path> for String { fn from(s3_path: &S3Path) -> Self { s3_path.to_full_path() } } impl TryFrom<&str> for S3Path { type Error = S3PathError; fn try_from(full_path: &str) -> Result { Self::from_full_path(full_path) } } #[derive(Clone)] pub struct S3Client { client: Arc, } impl S3Client { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { let s3_config = aws_sdk_s3::config::Builder::from(aws_config) // localstack doesn't support virtual addressing - .force_path_style(crate::config::CONFIG.is_sandbox) + .force_path_style(crate::config::CONFIG.localstack_endpoint.is_some()) .build(); S3Client { client: Arc::new(aws_sdk_s3::Client::from_conf(s3_config)), } } /// Creates a new [`MultiPartUploadSession`] pub async fn start_upload_session( &self, s3_path: &S3Path, ) -> S3Result { MultiPartUploadSession::start(&self.client, s3_path).await } /// Returns object metadata (e.g. file size) without downloading the object itself pub async fn get_object_metadata( &self, s3_path: &S3Path, ) -> S3Result { let response = self .client .head_object() .bucket(s3_path.bucket_name.clone()) .key(s3_path.object_name.clone()) .send() .await .map_err(|e| { error!("S3 failed to get object metadata"); Error::AwsSdk(e.into()) })?; Ok(response) } /// Downloads object and retrieves data bytes within provided range /// /// * `range` - Range of object bytes to download. pub async fn get_object_bytes( &self, s3_path: &S3Path, range: impl RangeBounds, ) -> S3Result> { let mut request = self .client .get_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name); if range.start_bound() != Bound::Unbounded || range.end_bound() != Bound::Unbounded { // Create a valid HTTP Range header let from = match range.start_bound() { Bound::Included(start) => start.to_string(), _ => "0".to_string(), }; let to = match range.end_bound() { Bound::Included(end) => end.to_string(), Bound::Excluded(end) => (end - 1).to_string(), _ => "".to_string(), }; let range = format!("bytes={}-{}", from, to); request = request.range(range); } let response = request.send().await.map_err(|e| { error!("S3 failed to get object"); Error::AwsSdk(e.into()) })?; let data = response.body.collect().await.map_err(|e| { error!("S3 failed to stream object bytes"); Error::ByteStream(e.into()) })?; Ok(data.to_vec()) } /// Deletes object at provided path pub async fn delete_object(&self, s3_path: &S3Path) -> S3Result<()> { self .client .delete_object() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { error!("S3 failed to delete object"); Error::AwsSdk(e.into()) })?; Ok(()) } } /// Represents a multipart upload session to the AWS S3 pub struct MultiPartUploadSession { client: Arc, bucket_name: String, object_name: String, upload_id: String, upload_parts: Vec, } impl MultiPartUploadSession { /// Starts a new upload session and returns its instance /// Don't call this directly, use [`S3Client::start_upload_session()`] instead async fn start( client: &Arc, s3_path: &S3Path, ) -> S3Result { let multipart_upload_res: CreateMultipartUploadOutput = client .create_multipart_upload() .bucket(&s3_path.bucket_name) .key(&s3_path.object_name) .send() .await .map_err(|e| { error!("S3 failed to start upload session"); Error::AwsSdk(e.into()) })?; let upload_id = multipart_upload_res.upload_id().ok_or_else(|| { error!("Upload ID expected to be present"); Error::MissingUploadID })?; debug!("Started multipart upload session with ID: {}", upload_id); Ok(MultiPartUploadSession { client: client.clone(), bucket_name: String::from(&s3_path.bucket_name), object_name: String::from(&s3_path.object_name), upload_id: String::from(upload_id), upload_parts: Vec::new(), }) } /// adds data part to the multipart upload pub async fn add_part(&mut self, part: Vec) -> S3Result<()> { let stream = ByteStream::from(part); let part_number: i32 = self.upload_parts.len() as i32 + 1; let upload_result = self .client .upload_part() .key(&self.object_name) .bucket(&self.bucket_name) .upload_id(&self.upload_id) .part_number(part_number) .body(stream) .send() .await .map_err(|e| { error!("Failed to add upload part"); Error::AwsSdk(e.into()) })?; let completed_part = CompletedPart::builder() .e_tag(upload_result.e_tag.unwrap_or_default()) .part_number(part_number) .build(); trace!( upload_id = self.upload_id, e_tag = completed_part.e_tag.as_deref().unwrap_or("N/A"), "Uploaded part {}.", part_number ); self.upload_parts.push(completed_part); Ok(()) } /// finishes the upload pub async fn finish_upload(&self) -> S3Result<()> { if self.upload_parts.is_empty() { return Err(Error::EmptyUpload); } let completed_multipart_upload = CompletedMultipartUpload::builder() .set_parts(Some(self.upload_parts.clone())) .build(); self .client .complete_multipart_upload() .bucket(&self.bucket_name) .key(&self.object_name) .multipart_upload(completed_multipart_upload) .upload_id(&self.upload_id) .send() .await .map_err(|e| { error!("Failed to finish upload session"); Error::AwsSdk(e.into()) })?; debug!(upload_id = self.upload_id, "Multipart upload complete"); Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_s3path_from_full_path() { let full_path = "my_bucket/some_object"; let s3_path = S3Path::from_full_path(full_path); assert!(s3_path.is_ok()); let s3_path = s3_path.unwrap(); assert_eq!(&s3_path.bucket_name, "my_bucket"); assert_eq!(&s3_path.object_name, "some_object"); } #[test] fn test_s3path_from_invalid_path() { let result = S3Path::from_full_path("invalid_path"); assert!(result.is_err()) } #[test] fn test_s3path_to_full_path() { let s3_path = S3Path { bucket_name: "my_bucket".to_string(), object_name: "some_object".to_string(), }; let full_path = s3_path.to_full_path(); assert_eq!(full_path, "my_bucket/some_object"); } } diff --git a/services/blob/src/tools.rs b/services/blob/src/tools.rs index 966970110..d6414748c 100644 --- a/services/blob/src/tools.rs +++ b/services/blob/src/tools.rs @@ -1,72 +1,60 @@ -use std::{env, error::Error as StdError}; +use std::error::Error as StdError; use tonic::codegen::futures_core::Stream; -use crate::constants; - -fn is_env_flag_set(env_var_name: &str) -> bool { - let flag_value = env::var(env_var_name).unwrap_or_default().to_lowercase(); - return ["1", "true"].contains(&flag_value.as_str()); -} - -/// Returns true if the `COMM_SERVICES_SANDBOX` environment variable is set -pub fn is_sandbox_env() -> bool { - return is_env_flag_set(constants::SANDBOX_ENV_VAR); -} - pub type BoxedError = Box; // Trait type aliases aren't supported in Rust, but // we can workaround this by creating an empty trait // that extends the traits we want to alias. #[rustfmt::skip] pub trait ByteStream: Stream, BoxedError>> {} #[rustfmt::skip] impl ByteStream for T where T: Stream, BoxedError>> {} pub trait MemOps { fn take_out(&mut self) -> Self; } impl MemOps for Vec { /// Moves all the elements of `self` into a new [`Vec`] instance, /// leaving `self` empty. **No copying is performed.** /// The memory capacity of `self` stays unchanged. /// /// In fact, this is the same as [`std::mem::take`] but maintains capacity. /// /// # Example /// ``` /// let mut a = vec![1,2,3,4]; /// let b = a.take_out(); /// assert_eq!(b.len(), 4); /// assert!(a.is_empty()); /// assert_eq!(a.capacity(), b.capacity()); /// ``` fn take_out(&mut self) -> Self { let mut new_vec = Vec::with_capacity(self.capacity()); std::mem::swap(self, &mut new_vec); new_vec } } #[cfg(test)] mod tests { use super::*; #[test] fn test_memops_move_and_clear() { let mut a = vec![1, 2, 3, 4]; let a_ptr_before = a.as_ptr(); let b = a.take_out(); let a_ptr_after = a.as_ptr(); let b_ptr_after = b.as_ptr(); assert_ne!(a_ptr_before, a_ptr_after, "Old ptr didn't change"); assert_eq!(a_ptr_before, b_ptr_after, "Ptr addresses don't match"); assert_eq!(a.capacity(), b.capacity(), "Capacities don't match"); assert!(a.is_empty(), "Original vec isn't empty after move"); assert_eq!(b.len(), 4, "Moved length don't match"); } }