diff --git a/services/blob/Dockerfile b/services/blob/Dockerfile --- a/services/blob/Dockerfile +++ b/services/blob/Dockerfile @@ -21,7 +21,6 @@ # Copy actual application sources COPY services/blob . -COPY shared/protos/blob.proto ../../shared/protos/ # Remove the previously-built binary so that only the application itself is # rebuilt diff --git a/services/blob/build.rs b/services/blob/build.rs deleted file mode 100644 --- a/services/blob/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -fn main() { - println!("cargo:rerun-if-changed=src/main.rs"); - - println!("cargo:rerun-if-changed=../../shared/protos/blob.proto"); - tonic_build::compile_protos("../../shared/protos/blob.proto") - .expect("Failed to compile protobuf file"); -} diff --git a/services/blob/src/config.rs b/services/blob/src/config.rs --- a/services/blob/src/config.rs +++ b/services/blob/src/config.rs @@ -1,19 +1,15 @@ -use anyhow::{ensure, Result}; +use anyhow::Result; use clap::Parser; use once_cell::sync::Lazy; use tracing::info; use crate::constants::{ - DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, - S3_BUCKET_ENV_VAR, + DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, S3_BUCKET_ENV_VAR, }; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { - /// gRPC server listening port - #[arg(long, default_value_t = DEFAULT_GRPC_PORT)] - pub grpc_port: u16, /// HTTP server listening port #[arg(long, default_value_t = DEFAULT_HTTP_PORT)] pub http_port: u16, @@ -32,21 +28,14 @@ /// Processes the command-line arguments and environment variables. /// Should be called at the beginning of the `main()` function. -pub(super) fn parse_cmdline_args() -> Result<()> { +pub(super) fn parse_cmdline_args() -> Result<&'static AppConfig> { // force evaluation of the lazy initialized config let cfg = Lazy::force(&CONFIG); - // Perform some additional validation for CLI args - ensure!( - cfg.grpc_port != cfg.http_port, - "gRPC and HTTP ports cannot be the same: {}", - cfg.grpc_port - ); - if cfg.s3_bucket_name != DEFAULT_S3_BUCKET_NAME { info!("Using custom S3 bucket: {}", &cfg.s3_bucket_name); } - Ok(()) + Ok(cfg) } /// Provides region/credentials configuration for AWS SDKs diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,28 +1,8 @@ // Assorted constants -pub const DEFAULT_GRPC_PORT: u16 = 50051; -pub const DEFAULT_HTTP_PORT: u16 = 51001; +pub const DEFAULT_HTTP_PORT: u16 = 50053; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; -/// 4MB limit -/// -/// WARNING: use keeping in mind that grpc adds its own headers to messages -/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md -/// so the message that actually is being sent over the network looks like this -/// ``` -/// [Compressed-Flag] [Message-Length] [Message] -/// [Compressed-Flag] 1 byte - added by grpc -/// [Message-Length] 4 bytes - added by grpc -/// [Message] N bytes - actual data -/// ``` -/// so for every message we get 5 additional bytes of data -/// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671), -/// gRPC stream may contain more than one message -pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024; - -/// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details -pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5; - // HTTP constants pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; @@ -49,18 +29,6 @@ pub const ATTR_UNCHECKED: &str = "unchecked"; } -// old DynamoDB constants - -pub const BLOB_TABLE_NAME: &str = "blob-service-blob"; -pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; -pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path"; -pub const BLOB_TABLE_CREATED_FIELD: &str = "created"; - -pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index"; -pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder"; -pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash"; -pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index"; - // Environment variables pub const LOG_LEVEL_ENV_VAR: &str = diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs --- a/services/blob/src/database/mod.rs +++ b/services/blob/src/database/mod.rs @@ -1,6 +1,5 @@ pub mod client; pub mod errors; -pub mod old; pub mod types; pub use client::DatabaseClient; diff --git a/services/blob/src/database/old.rs b/services/blob/src/database/old.rs deleted file mode 100644 --- a/services/blob/src/database/old.rs +++ /dev/null @@ -1,308 +0,0 @@ -#![allow(deprecated)] - -use aws_sdk_dynamodb::{ - operation::get_item::GetItemOutput, types::AttributeValue, -}; -use chrono::{DateTime, Utc}; -use comm_services_lib::database; -use std::{collections::HashMap, sync::Arc}; -use tracing::error; - -use crate::{ - config::CONFIG, - constants::{ - BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, - BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, - BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, - BLOB_TABLE_S3_PATH_FIELD, - }, - s3::S3Path, -}; - -use super::errors::{BlobDBError, Error}; - -#[derive(Clone, Debug)] -pub struct BlobItem { - pub blob_hash: String, - pub s3_path: S3Path, - pub created: DateTime, -} - -impl BlobItem { - pub fn new(blob_hash: impl Into) -> Self { - let hash_str = blob_hash.into(); - BlobItem { - blob_hash: hash_str.clone(), - s3_path: S3Path { - bucket_name: CONFIG.s3_bucket_name.clone(), - object_name: hash_str, - }, - created: Utc::now(), - } - } -} - -#[derive(Clone, Debug)] -pub struct ReverseIndexItem { - pub holder: String, - pub blob_hash: String, -} - -#[derive(Clone)] -pub struct DatabaseClient { - client: Arc, -} - -impl DatabaseClient { - pub fn new(aws_config: &aws_types::SdkConfig) -> Self { - DatabaseClient { - client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), - } - } - - // Blob item - - pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> { - let item = HashMap::from([ - ( - BLOB_TABLE_BLOB_HASH_FIELD.to_string(), - AttributeValue::S(blob_item.blob_hash), - ), - ( - BLOB_TABLE_S3_PATH_FIELD.to_string(), - AttributeValue::S(blob_item.s3_path.to_full_path()), - ), - ( - BLOB_TABLE_CREATED_FIELD.to_string(), - AttributeValue::S(blob_item.created.to_rfc3339()), - ), - ]); - - self - .client - .put_item() - .table_name(BLOB_TABLE_NAME) - .set_item(Some(item)) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to put blob item"); - Error::AwsSdk(e.into()) - })?; - - Ok(()) - } - - pub async fn find_blob_item( - &self, - blob_hash: &str, - ) -> Result, Error> { - let item_key = HashMap::from([( - BLOB_TABLE_BLOB_HASH_FIELD.to_string(), - AttributeValue::S(blob_hash.to_string()), - )]); - match self - .client - .get_item() - .table_name(BLOB_TABLE_NAME) - .set_key(Some(item_key)) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to find blob item"); - Error::AwsSdk(e.into()) - })? { - GetItemOutput { - item: Some(mut item), - .. - } => { - let blob_hash = database::parse_string_attribute( - BLOB_TABLE_BLOB_HASH_FIELD, - item.remove(BLOB_TABLE_BLOB_HASH_FIELD), - )?; - let s3_path = database::parse_string_attribute( - BLOB_TABLE_S3_PATH_FIELD, - item.remove(BLOB_TABLE_S3_PATH_FIELD), - )?; - let created = database::parse_datetime_attribute( - BLOB_TABLE_CREATED_FIELD, - item.remove(BLOB_TABLE_CREATED_FIELD), - )?; - Ok(Some(BlobItem { - blob_hash, - s3_path: S3Path::from_full_path(&s3_path) - .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, - created, - })) - } - _ => Ok(None), - } - } - - pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> { - self - .client - .delete_item() - .table_name(BLOB_TABLE_NAME) - .key( - BLOB_TABLE_BLOB_HASH_FIELD, - AttributeValue::S(blob_hash.to_string()), - ) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to remove blob item"); - Error::AwsSdk(e.into()) - })?; - - Ok(()) - } - - // Reverse index item - - pub async fn put_reverse_index_item( - &self, - reverse_index_item: ReverseIndexItem, - ) -> Result<(), Error> { - let holder = &reverse_index_item.holder; - if self.find_reverse_index_by_holder(holder).await?.is_some() { - error!("Failed to put reverse index. Holder already exists."); - return Err(Error::Blob(BlobDBError::HolderAlreadyExists( - holder.to_string(), - ))); - } - - let item = HashMap::from([ - ( - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), - AttributeValue::S(reverse_index_item.holder), - ), - ( - BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), - AttributeValue::S(reverse_index_item.blob_hash), - ), - ]); - self - .client - .put_item() - .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) - .set_item(Some(item)) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to put reverse index"); - Error::AwsSdk(e.into()) - })?; - - Ok(()) - } - - pub async fn find_reverse_index_by_holder( - &self, - holder: &str, - ) -> Result, Error> { - let item_key = HashMap::from([( - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), - AttributeValue::S(holder.to_string()), - )]); - match self - .client - .get_item() - .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) - .set_key(Some(item_key)) - .consistent_read(true) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to find reverse index by holder"); - Error::AwsSdk(e.into()) - })? { - GetItemOutput { - item: Some(mut item), - .. - } => { - let holder = database::parse_string_attribute( - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, - item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), - )?; - let blob_hash = database::parse_string_attribute( - BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, - item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), - )?; - - Ok(Some(ReverseIndexItem { holder, blob_hash })) - } - _ => Ok(None), - } - } - - pub async fn find_reverse_index_by_hash( - &self, - blob_hash: &str, - ) -> Result, Error> { - let response = self - .client - .query() - .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) - .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) - .key_condition_expression("#blobHash = :valueToMatch") - .expression_attribute_names( - "#blobHash", - BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, - ) - .expression_attribute_values( - ":valueToMatch", - AttributeValue::S(blob_hash.to_string()), - ) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to find reverse indexes by hash"); - Error::AwsSdk(e.into()) - })?; - - if response.count == 0 { - return Ok(vec![]); - } - - let mut results: Vec = - Vec::with_capacity(response.count() as usize); - for mut item in response.items.unwrap_or_default() { - let holder = database::parse_string_attribute( - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, - item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), - )?; - let blob_hash = database::parse_string_attribute( - BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, - item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), - )?; - - results.push(ReverseIndexItem { holder, blob_hash }); - } - - return Ok(results); - } - - pub async fn remove_reverse_index_item( - &self, - holder: &str, - ) -> Result<(), Error> { - self - .client - .delete_item() - .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) - .key( - BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, - AttributeValue::S(holder.to_string()), - ) - .send() - .await - .map_err(|e| { - error!("DynamoDB client failed to remove reverse index"); - Error::AwsSdk(e.into()) - })?; - - Ok(()) - } -} diff --git a/services/blob/src/grpc.rs b/services/blob/src/grpc.rs deleted file mode 100644 --- a/services/blob/src/grpc.rs +++ /dev/null @@ -1,493 +0,0 @@ -use anyhow::Result; -use aws_sdk_dynamodb::Error as DynamoDBError; -use blob::blob_service_server::BlobService; -use std::{net::SocketAddr, pin::Pin}; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tonic::{transport::Server, Request, Response, Status}; -use tracing::{debug, error, info, instrument, trace, warn, Instrument}; - -use crate::{ - config::CONFIG, - constants::{ - GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, - MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, - }, - database::errors::Error as DBError, - database::old::{BlobItem, DatabaseClient, ReverseIndexItem}, - s3::{MultiPartUploadSession, S3Client, S3Path}, - tools::MemOps, -}; - -mod blob { - tonic::include_proto!("blob"); -} -use blob::blob_service_server::BlobServiceServer; - -pub async fn run_grpc_server( - db_client: DatabaseClient, - s3_client: S3Client, -) -> Result<()> { - let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?; - let blob_service = MyBlobService::new(db_client, s3_client); - - info!("Starting gRPC server listening at {}", CONFIG.grpc_port); - Server::builder() - .add_service(BlobServiceServer::new(blob_service)) - .serve(addr) - .await?; - - Ok(()) -} - -struct MyBlobService { - db: DatabaseClient, - s3: S3Client, -} - -impl MyBlobService { - pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self { - MyBlobService { - db: db_client, - s3: s3_client, - } - } - - async fn find_s3_path_by_reverse_index( - &self, - reverse_index_item: &ReverseIndexItem, - ) -> Result { - let blob_hash = &reverse_index_item.blob_hash; - match self.db.find_blob_item(&blob_hash).await { - Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), - Ok(None) => { - debug!("No blob found for {:?}", reverse_index_item); - Err(Status::not_found("blob not found")) - } - Err(err) => Err(handle_db_error(err)), - } - } - - async fn find_s3_path_by_holder( - &self, - holder: &str, - ) -> Result { - match self.db.find_reverse_index_by_holder(holder).await { - Ok(Some(reverse_index)) => { - self.find_s3_path_by_reverse_index(&reverse_index).await - } - Ok(None) => { - debug!("No db entry found for holder {:?}", holder); - Err(Status::not_found("blob not found")) - } - Err(err) => Err(handle_db_error(err)), - } - } -} - -// gRPC implementation -#[tonic::async_trait] -impl BlobService for MyBlobService { - type PutStream = - Pin> + Send>>; - - #[instrument(skip_all, fields(holder))] - async fn put( - &self, - request: Request>, - ) -> Result, Status> { - info!("Put blob request: {:?}", request); - let mut in_stream = request.into_inner(); - let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let db = self.db.clone(); - let s3 = self.s3.clone(); - let worker = async move { - let mut put_handler = PutHandler::new(&db, &s3); - - while let Some(message) = in_stream.next().await { - let response = match message { - Ok(blob::PutRequest { - data: Some(blob::put_request::Data::Holder(new_holder)), - }) => put_handler.handle_holder(new_holder).await, - Ok(blob::PutRequest { - data: Some(blob::put_request::Data::BlobHash(new_hash)), - }) => put_handler.handle_blob_hash(new_hash).await, - Ok(blob::PutRequest { - data: Some(blob::put_request::Data::DataChunk(new_data)), - }) => put_handler.handle_data_chunk(new_data).await, - unexpected => { - error!("Received an unexpected Result: {:?}", unexpected); - Err(Status::unknown("unknown error")) - } - }; - trace!("Sending response: {:?}", response); - if let Err(e) = tx.send(response).await { - error!("Response was dropped: {}", e); - break; - } - if put_handler.should_close_stream { - trace!("Put handler requested to close stream"); - break; - } - } - - if let Err(status) = put_handler.finish().await { - trace!("Sending error response: {:?}", status); - let _ = tx.send(Err(status)).await; - } - }; - tokio::spawn(worker.in_current_span()); - - let out_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) - } - - type GetStream = - Pin> + Send>>; - - #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))] - async fn get( - &self, - request: Request, - ) -> Result, Status> { - info!("Get blob request: {:?}", request); - let message: blob::GetRequest = request.into_inner(); - let s3_path = self.find_s3_path_by_holder(&message.holder).await?; - tracing::Span::current().record("s3_path", s3_path.to_full_path()); - - let object_metadata = - self.s3.get_object_metadata(&s3_path).await.map_err(|err| { - error!("Failed to get S3 object metadata: {:?}", err); - Status::aborted("server error") - })?; - - let file_size: u64 = - object_metadata.content_length().try_into().map_err(|err| { - error!("Failed to get S3 object content length: {:?}", err); - Status::aborted("server error") - })?; - let chunk_size: u64 = - GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; - - let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let s3 = self.s3.clone(); - - let worker = async move { - let mut offset: u64 = 0; - while offset < file_size { - let next_size = std::cmp::min(chunk_size, file_size - offset); - let range = offset..(offset + next_size); - trace!(?range, "Getting {} bytes of data", next_size); - - let response = match s3.get_object_bytes(&s3_path, range).await { - Ok(data) => Ok(blob::GetResponse { data_chunk: data }), - Err(err) => { - error!("Failed to download data chunk: {:?}", err); - Err(Status::aborted("download failed")) - } - }; - - let should_abort = response.is_err(); - if let Err(e) = tx.send(response).await { - error!("Response was dropped: {}", e); - break; - } - if should_abort { - trace!("Error response, aborting"); - break; - } - - offset += chunk_size; - } - }; - tokio::spawn(worker.in_current_span()); - - let out_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) - } - - #[instrument(skip_all, fields(holder = %request.get_ref().holder))] - async fn remove( - &self, - request: Request, - ) -> Result, Status> { - info!("Remove blob request: {:?}", request); - let message = request.into_inner(); - let holder = message.holder.as_str(); - let reverse_index_item = self - .db - .find_reverse_index_by_holder(holder) - .await - .map_err(handle_db_error)? - .ok_or_else(|| { - debug!("Blob not found"); - Status::not_found("Blob not found") - })?; - let blob_hash = &reverse_index_item.blob_hash; - - self - .db - .remove_reverse_index_item(holder) - .await - .map_err(handle_db_error)?; - - // TODO handle cleanup here properly - // for now the object's being removed right away - // after the last holder was removed - if self - .db - .find_reverse_index_by_hash(blob_hash) - .await - .map_err(handle_db_error)? - .is_empty() - { - let s3_path = self - .find_s3_path_by_reverse_index(&reverse_index_item) - .await?; - - self.s3.delete_object(&s3_path).await.map_err(|err| { - error!("Failed to delete S3 object: {:?}", err); - Status::aborted("Internal error") - })?; - - self - .db - .remove_blob_item(blob_hash) - .await - .map_err(handle_db_error)?; - } - - Ok(Response::new(())) - } -} - -type PutResult = Result; - -enum PutAction { - AssignHolder, - UploadNewBlob(BlobItem), -} - -/// A helper for handling Put RPC requests -struct PutHandler { - /// Should the stream be closed by server - pub should_close_stream: bool, - action: Option, - - holder: Option, - blob_hash: Option, - current_chunk: Vec, - - uploader: Option, - db: DatabaseClient, - s3: S3Client, -} - -impl PutHandler { - fn new(db: &DatabaseClient, s3: &S3Client) -> Self { - PutHandler { - should_close_stream: false, - action: None, - holder: None, - blob_hash: None, - current_chunk: Vec::new(), - uploader: None, - db: db.clone(), - s3: s3.clone(), - } - } - - pub async fn handle_holder(&mut self, new_holder: String) -> PutResult { - if self.holder.is_some() { - warn!("Holder already provided"); - return Err(Status::invalid_argument("Holder already provided")); - } - tracing::Span::current().record("holder", &new_holder); - self.holder = Some(new_holder); - self.determine_action().await - } - - pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult { - if self.blob_hash.is_some() { - warn!("Blob hash already provided"); - return Err(Status::invalid_argument("Blob hash already provided")); - } - debug!("Blob hash: {}", new_hash); - self.blob_hash = Some(new_hash); - self.determine_action().await - } - - /// private helper function to determine purpose of this RPC call - async fn determine_action(&mut self) -> PutResult { - // this should be called only if action isn't determined yet - // this error should actually never happen - if self.action.is_some() { - error!("Put action is already started"); - return Err(Status::failed_precondition("Put action is already started")); - } - - // holder and hash need both to be set in order to continue - // otherwise we send a standard response - if self.holder.is_none() || self.blob_hash.is_none() { - return Ok(blob::PutResponse { data_exists: false }); - } - let blob_hash = self - .blob_hash - .as_ref() - .ok_or_else(|| Status::failed_precondition("Internal error"))?; - - match self.db.find_blob_item(blob_hash).await { - // Hash already exists, so we're only assigning a new holder to it - Ok(Some(_)) => { - debug!("Blob found, assigning holder"); - self.action = Some(PutAction::AssignHolder); - self.should_close_stream = true; - Ok(blob::PutResponse { data_exists: true }) - } - // Hash doesn't exist, so we're starting a new upload session - Ok(None) => { - debug!("Blob not found, starting upload action"); - self.action = Some(PutAction::UploadNewBlob(BlobItem::new(blob_hash))); - Ok(blob::PutResponse { data_exists: false }) - } - Err(db_err) => { - self.should_close_stream = true; - Err(handle_db_error(db_err)) - } - } - } - - pub async fn handle_data_chunk( - &mut self, - mut new_data: Vec, - ) -> PutResult { - let blob_item = match &self.action { - Some(PutAction::UploadNewBlob(blob_item)) => blob_item, - _ => { - self.should_close_stream = true; - error!("Data chunk sent before upload action is started"); - return Err(Status::invalid_argument( - "Holder and hash should be provided before data", - )); - } - }; - trace!("Received {} bytes of data", new_data.len()); - - // create upload session if it doesn't already exist - if self.uploader.is_none() { - debug!("Uploader doesn't exist, starting new session"); - self.uploader = - match self.s3.start_upload_session(&blob_item.s3_path).await { - Ok(session) => Some(session), - Err(err) => { - self.should_close_stream = true; - error!("Failed to create upload session: {:?}", err); - return Err(Status::aborted("Internal error")); - } - } - } - let uploader = self.uploader.as_mut().unwrap(); - - // New parts should be added to AWS only if they exceed minimum part size, - // Otherwise AWS returns error - self.current_chunk.append(&mut new_data); - if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - { - trace!("Chunk size exceeded, adding new S3 part"); - if let Err(err) = uploader.add_part(self.current_chunk.take_out()).await { - self.should_close_stream = true; - error!("Failed to upload S3 part: {:?}", err); - return Err(Status::aborted("Internal error")); - } - } - - Ok(blob::PutResponse { data_exists: false }) - } - - /// This function should be called after the input stream is finished. - /// This consumes `self` so this put handler instance cannot be used - /// after this is called. - pub async fn finish(self) -> Result<(), Status> { - if self.action.is_none() { - debug!("No action to perform, finishing now"); - return Ok(()); - } - let holder = self.holder.ok_or_else(|| { - error!("Cannot finish action. No holder provided!"); - Status::aborted("Internal error") - })?; - let blob_hash = self.blob_hash.ok_or_else(|| { - error!("Cannot finish action. No blob hash provided!"); - Status::aborted("Internal error") - })?; - let blob_item = match self.action { - None => return Ok(()), - Some(PutAction::AssignHolder) => { - return assign_holder_to_blob(&self.db, holder, blob_hash).await; - } - Some(PutAction::UploadNewBlob(blob_item)) => blob_item, - }; - - let mut uploader = self.uploader.ok_or_else(|| { - // This also happens when client cancels before sending any data chunk - warn!("No uploader was created, finishing now"); - Status::aborted("Internal error") - })?; - - if !self.current_chunk.is_empty() { - if let Err(err) = uploader.add_part(self.current_chunk).await { - error!("Failed to upload final part: {:?}", err); - return Err(Status::aborted("Internal error")); - } - } - - if let Err(err) = uploader.finish_upload().await { - error!("Failed to finish upload session: {:?}", err); - return Err(Status::aborted("Internal error")); - } - - self - .db - .put_blob_item(blob_item) - .await - .map_err(handle_db_error)?; - - assign_holder_to_blob(&self.db, holder, blob_hash).await?; - - debug!("Upload finished successfully"); - Ok(()) - } -} - -async fn assign_holder_to_blob( - db: &DatabaseClient, - holder: String, - blob_hash: String, -) -> Result<(), Status> { - let reverse_index_item = ReverseIndexItem { holder, blob_hash }; - - db.put_reverse_index_item(reverse_index_item) - .await - .map_err(handle_db_error) -} - -fn handle_db_error(db_error: DBError) -> Status { - match db_error { - DBError::AwsSdk(DynamoDBError::InternalServerError(_)) - | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( - _, - )) - | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { - warn!("AWS transient error occurred"); - Status::unavailable("please retry") - } - DBError::Blob(e) => { - error!("Encountered Blob database error: {}", e); - Status::failed_precondition("Internal error") - } - e => { - error!("Encountered an unexpected error: {}", e); - Status::failed_precondition("unexpected error") - } - } -} diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,7 +1,6 @@ pub mod config; pub mod constants; pub mod database; -pub mod grpc; pub mod http; pub mod s3; pub mod service; @@ -29,21 +28,17 @@ config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; - let db = database::old::DatabaseClient::new(&aws_config); + let db = database::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); - let new_db = database::DatabaseClient::new(&aws_config); let service = service::BlobService::new( - new_db, - s3.clone(), + db, + s3, BlobServiceConfig { instant_delete_orphaned_blobs: true, ..Default::default() }, ); - tokio::select! { - http_result = crate::http::run_http_server(service) => http_result, - grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, - } + crate::http::run_http_server(service).await } diff --git a/services/docker-compose.yml b/services/docker-compose.yml --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -44,7 +44,7 @@ - COMM_SERVICES_SANDBOX=${COMM_SERVICES_SANDBOX} image: commapp/blob-server:0.1 ports: - - '${COMM_SERVICES_PORT_BLOB}:50051' + - '${COMM_SERVICES_PORT_BLOB}:50053' volumes: - $HOME/.aws/config:/home/comm/.aws/config:ro - $HOME/.aws/credentials:/home/comm/.aws/credentials:ro diff --git a/shared/protos/blob.proto b/shared/protos/blob.proto deleted file mode 100644 --- a/shared/protos/blob.proto +++ /dev/null @@ -1,41 +0,0 @@ -syntax = "proto3"; - -package blob; - -import "google/protobuf/empty.proto"; - -service BlobService { - rpc Put(stream PutRequest) returns (stream PutResponse) {} - rpc Get(GetRequest) returns (stream GetResponse) {} - rpc Remove(RemoveRequest) returns (google.protobuf.Empty) {} -} - -// Put - -message PutRequest { - oneof data { - string holder = 1; - string blobHash = 2; - bytes dataChunk = 3; - } -} - -message PutResponse { - bool dataExists = 1; -} - -// Get - -message GetRequest { - string holder = 1; -} - -message GetResponse { - bytes dataChunk = 1; -} - -// Remove - -message RemoveRequest { - string holder = 1; -}