diff --git a/services/blob/src/database/errors.rs b/services/blob/src/database/errors.rs new file mode 100644 index 000000000..f31956947 --- /dev/null +++ b/services/blob/src/database/errors.rs @@ -0,0 +1,39 @@ +use std::fmt::{Display, Formatter}; + +use aws_sdk_dynamodb::Error as DynamoDBError; +use comm_services_lib::database::DBItemError; + +use crate::s3::S3PathError; + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum Error { + #[display(...)] + AwsSdk(DynamoDBError), + #[display(...)] + Attribute(DBItemError), + #[display(...)] + Blob(BlobDBError), + #[display(...)] + ItemAlreadyExists, +} + +#[derive(Debug)] +pub enum BlobDBError { + HolderAlreadyExists(String), + InvalidS3Path(S3PathError), +} + +impl Display for BlobDBError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + BlobDBError::HolderAlreadyExists(holder) => { + write!(f, "Item for given holder [{}] already exists", holder) + } + BlobDBError::InvalidS3Path(err) => err.fmt(f), + } + } +} + +impl std::error::Error for BlobDBError {} diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs new file mode 100644 index 000000000..afbb99d29 --- /dev/null +++ b/services/blob/src/database/mod.rs @@ -0,0 +1,2 @@ +pub mod errors; +pub mod old; diff --git a/services/blob/src/database.rs b/services/blob/src/database/old.rs similarity index 90% rename from services/blob/src/database.rs rename to services/blob/src/database/old.rs index dbd12f40d..dac3f5f82 100644 --- a/services/blob/src/database.rs +++ b/services/blob/src/database/old.rs @@ -1,339 +1,305 @@ use aws_sdk_dynamodb::{ operation::get_item::GetItemOutput, types::AttributeValue, - Error as DynamoDBError, }; use chrono::{DateTime, Utc}; -use comm_services_lib::database::{self, DBItemError}; -use std::{ - collections::HashMap, - fmt::{Display, Formatter}, - sync::Arc, -}; +use comm_services_lib::database; +use std::{collections::HashMap, sync::Arc}; use tracing::error; use crate::{ constants::{ BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME, BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME, BLOB_S3_BUCKET_NAME, BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME, BLOB_TABLE_S3_PATH_FIELD, }, - s3::{S3Path, S3PathError}, + s3::S3Path, }; +use super::errors::{BlobDBError, Error}; + #[derive(Clone, Debug)] pub struct BlobItem { pub blob_hash: String, pub s3_path: S3Path, pub created: DateTime, } impl BlobItem { pub fn new(blob_hash: impl Into) -> Self { let hash_str = blob_hash.into(); BlobItem { blob_hash: hash_str.clone(), s3_path: S3Path { bucket_name: BLOB_S3_BUCKET_NAME.to_string(), object_name: hash_str, }, created: Utc::now(), } } } #[derive(Clone, Debug)] pub struct ReverseIndexItem { pub holder: String, pub blob_hash: String, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &aws_types::SdkConfig) -> Self { DatabaseClient { client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)), } } // Blob item pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> { let item = HashMap::from([ ( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_item.blob_hash), ), ( BLOB_TABLE_S3_PATH_FIELD.to_string(), AttributeValue::S(blob_item.s3_path.to_full_path()), ), ( BLOB_TABLE_CREATED_FIELD.to_string(), AttributeValue::S(blob_item.created.to_rfc3339()), ), ]); self .client .put_item() .table_name(BLOB_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_blob_item( &self, blob_hash: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(blob_hash.to_string()), )]); match self .client .get_item() .table_name(BLOB_TABLE_NAME) .set_key(Some(item_key)) .send() .await .map_err(|e| { error!("DynamoDB client failed to find blob item"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let blob_hash = database::parse_string_attribute( BLOB_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_TABLE_BLOB_HASH_FIELD), )?; let s3_path = database::parse_string_attribute( BLOB_TABLE_S3_PATH_FIELD, item.remove(BLOB_TABLE_S3_PATH_FIELD), )?; let created = database::parse_datetime_attribute( BLOB_TABLE_CREATED_FIELD, item.remove(BLOB_TABLE_CREATED_FIELD), )?; Ok(Some(BlobItem { blob_hash, s3_path: S3Path::from_full_path(&s3_path) .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?, created, })) } _ => Ok(None), } } pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_TABLE_NAME) .key( BLOB_TABLE_BLOB_HASH_FIELD, AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove blob item"); Error::AwsSdk(e.into()) })?; Ok(()) } // Reverse index item pub async fn put_reverse_index_item( &self, reverse_index_item: ReverseIndexItem, ) -> Result<(), Error> { let holder = &reverse_index_item.holder; if self.find_reverse_index_by_holder(holder).await?.is_some() { error!("Failed to put reverse index. Holder already exists."); return Err(Error::Blob(BlobDBError::HolderAlreadyExists( holder.to_string(), ))); } let item = HashMap::from([ ( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(reverse_index_item.holder), ), ( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(), AttributeValue::S(reverse_index_item.blob_hash), ), ]); self .client .put_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_item(Some(item)) .send() .await .map_err(|e| { error!("DynamoDB client failed to put reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn find_reverse_index_by_holder( &self, holder: &str, ) -> Result, Error> { let item_key = HashMap::from([( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(), AttributeValue::S(holder.to_string()), )]); match self .client .get_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .set_key(Some(item_key)) .consistent_read(true) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse index by holder"); Error::AwsSdk(e.into()) })? { GetItemOutput { item: Some(mut item), .. } => { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; Ok(Some(ReverseIndexItem { holder, blob_hash })) } _ => Ok(None), } } pub async fn find_reverse_index_by_hash( &self, blob_hash: &str, ) -> Result, Error> { let response = self .client .query() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME) .key_condition_expression("#blobHash = :valueToMatch") .expression_attribute_names( "#blobHash", BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, ) .expression_attribute_values( ":valueToMatch", AttributeValue::S(blob_hash.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to find reverse indexes by hash"); Error::AwsSdk(e.into()) })?; if response.count == 0 { return Ok(vec![]); } let mut results: Vec = Vec::with_capacity(response.count() as usize); for mut item in response.items.unwrap_or_default() { let holder = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD), )?; let blob_hash = database::parse_string_attribute( BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD, item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD), )?; results.push(ReverseIndexItem { holder, blob_hash }); } return Ok(results); } pub async fn remove_reverse_index_item( &self, holder: &str, ) -> Result<(), Error> { self .client .delete_item() .table_name(BLOB_REVERSE_INDEX_TABLE_NAME) .key( BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, AttributeValue::S(holder.to_string()), ) .send() .await .map_err(|e| { error!("DynamoDB client failed to remove reverse index"); Error::AwsSdk(e.into()) })?; Ok(()) } } - -#[derive( - Debug, derive_more::Display, derive_more::From, derive_more::Error, -)] -pub enum Error { - #[display(...)] - AwsSdk(DynamoDBError), - #[display(...)] - Attribute(DBItemError), - #[display(...)] - Blob(BlobDBError), -} - -#[derive(Debug)] -pub enum BlobDBError { - HolderAlreadyExists(String), - InvalidS3Path(S3PathError), -} - -impl Display for BlobDBError { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - BlobDBError::HolderAlreadyExists(holder) => { - write!(f, "Item for given holder [{}] already exists", holder) - } - BlobDBError::InvalidS3Path(err) => err.fmt(f), - } - } -} - -impl std::error::Error for BlobDBError {} diff --git a/services/blob/src/grpc.rs b/services/blob/src/grpc.rs index baa7ce99a..7ecf0622f 100644 --- a/services/blob/src/grpc.rs +++ b/services/blob/src/grpc.rs @@ -1,492 +1,493 @@ use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use blob::blob_service_server::BlobService; use std::{net::SocketAddr, pin::Pin}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; use crate::{ config::CONFIG, constants::{ GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, - database::{BlobItem, DatabaseClient, Error as DBError, ReverseIndexItem}, + database::errors::Error as DBError, + database::old::{BlobItem, DatabaseClient, ReverseIndexItem}, s3::{MultiPartUploadSession, S3Client, S3Path}, tools::MemOps, }; mod blob { tonic::include_proto!("blob"); } use blob::blob_service_server::BlobServiceServer; pub async fn run_grpc_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?; let blob_service = MyBlobService::new(db_client, s3_client); info!("Starting gRPC server listening at {}", CONFIG.grpc_port); Server::builder() .add_service(BlobServiceServer::new(blob_service)) .serve(addr) .await?; Ok(()) } struct MyBlobService { db: DatabaseClient, s3: S3Client, } impl MyBlobService { pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self { MyBlobService { db: db_client, s3: s3_client, } } async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, ) -> Result { let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("No blob found for {:?}", reverse_index_item); Err(Status::not_found("blob not found")) } Err(err) => Err(handle_db_error(err)), } } async fn find_s3_path_by_holder( &self, holder: &str, ) -> Result { match self.db.find_reverse_index_by_holder(holder).await { Ok(Some(reverse_index)) => { self.find_s3_path_by_reverse_index(&reverse_index).await } Ok(None) => { debug!("No db entry found for holder {:?}", holder); Err(Status::not_found("blob not found")) } Err(err) => Err(handle_db_error(err)), } } } // gRPC implementation #[tonic::async_trait] impl BlobService for MyBlobService { type PutStream = Pin> + Send>>; #[instrument(skip_all, fields(holder))] async fn put( &self, request: Request>, ) -> Result, Status> { info!("Put blob request: {:?}", request); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); let s3 = self.s3.clone(); let worker = async move { let mut put_handler = PutHandler::new(&db, &s3); while let Some(message) = in_stream.next().await { let response = match message { Ok(blob::PutRequest { data: Some(blob::put_request::Data::Holder(new_holder)), }) => put_handler.handle_holder(new_holder).await, Ok(blob::PutRequest { data: Some(blob::put_request::Data::BlobHash(new_hash)), }) => put_handler.handle_blob_hash(new_hash).await, Ok(blob::PutRequest { data: Some(blob::put_request::Data::DataChunk(new_data)), }) => put_handler.handle_data_chunk(new_data).await, unexpected => { error!("Received an unexpected Result: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; trace!("Sending response: {:?}", response); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if put_handler.should_close_stream { trace!("Put handler requested to close stream"); break; } } if let Err(status) = put_handler.finish().await { trace!("Sending error response: {:?}", status); let _ = tx.send(Err(status)).await; } }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) } type GetStream = Pin> + Send>>; #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))] async fn get( &self, request: Request, ) -> Result, Status> { info!("Get blob request: {:?}", request); let message: blob::GetRequest = request.into_inner(); let s3_path = self.find_s3_path_by_holder(&message.holder).await?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = self.s3.get_object_metadata(&s3_path).await.map_err(|err| { error!("Failed to get S3 object metadata: {:?}", err); Status::aborted("server error") })?; let file_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to get S3 object content length: {:?}", err); Status::aborted("server error") })?; let chunk_size: u64 = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let s3 = self.s3.clone(); let worker = async move { let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); let response = match s3.get_object_bytes(&s3_path, range).await { Ok(data) => Ok(blob::GetResponse { data_chunk: data }), Err(err) => { error!("Failed to download data chunk: {:?}", err); Err(Status::aborted("download failed")) } }; let should_abort = response.is_err(); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if should_abort { trace!("Error response, aborting"); break; } offset += chunk_size; } }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) } #[instrument(skip_all, fields(holder = %request.get_ref().holder))] async fn remove( &self, request: Request, ) -> Result, Status> { info!("Remove blob request: {:?}", request); let message = request.into_inner(); let holder = message.holder.as_str(); let reverse_index_item = self .db .find_reverse_index_by_holder(holder) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); Status::not_found("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; self .db .remove_reverse_index_item(holder) .await .map_err(handle_db_error)?; // TODO handle cleanup here properly // for now the object's being removed right away // after the last holder was removed if self .db .find_reverse_index_by_hash(blob_hash) .await .map_err(handle_db_error)? .is_empty() { let s3_path = self .find_s3_path_by_reverse_index(&reverse_index_item) .await?; self.s3.delete_object(&s3_path).await.map_err(|err| { error!("Failed to delete S3 object: {:?}", err); Status::aborted("Internal error") })?; self .db .remove_blob_item(blob_hash) .await .map_err(handle_db_error)?; } Ok(Response::new(())) } } type PutResult = Result; enum PutAction { AssignHolder, UploadNewBlob(BlobItem), } /// A helper for handling Put RPC requests struct PutHandler { /// Should the stream be closed by server pub should_close_stream: bool, action: Option, holder: Option, blob_hash: Option, current_chunk: Vec, uploader: Option, db: DatabaseClient, s3: S3Client, } impl PutHandler { fn new(db: &DatabaseClient, s3: &S3Client) -> Self { PutHandler { should_close_stream: false, action: None, holder: None, blob_hash: None, current_chunk: Vec::new(), uploader: None, db: db.clone(), s3: s3.clone(), } } pub async fn handle_holder(&mut self, new_holder: String) -> PutResult { if self.holder.is_some() { warn!("Holder already provided"); return Err(Status::invalid_argument("Holder already provided")); } tracing::Span::current().record("holder", &new_holder); self.holder = Some(new_holder); self.determine_action().await } pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult { if self.blob_hash.is_some() { warn!("Blob hash already provided"); return Err(Status::invalid_argument("Blob hash already provided")); } debug!("Blob hash: {}", new_hash); self.blob_hash = Some(new_hash); self.determine_action().await } /// private helper function to determine purpose of this RPC call async fn determine_action(&mut self) -> PutResult { // this should be called only if action isn't determined yet // this error should actually never happen if self.action.is_some() { error!("Put action is already started"); return Err(Status::failed_precondition("Put action is already started")); } // holder and hash need both to be set in order to continue // otherwise we send a standard response if self.holder.is_none() || self.blob_hash.is_none() { return Ok(blob::PutResponse { data_exists: false }); } let blob_hash = self .blob_hash .as_ref() .ok_or_else(|| Status::failed_precondition("Internal error"))?; match self.db.find_blob_item(blob_hash).await { // Hash already exists, so we're only assigning a new holder to it Ok(Some(_)) => { debug!("Blob found, assigning holder"); self.action = Some(PutAction::AssignHolder); self.should_close_stream = true; Ok(blob::PutResponse { data_exists: true }) } // Hash doesn't exist, so we're starting a new upload session Ok(None) => { debug!("Blob not found, starting upload action"); self.action = Some(PutAction::UploadNewBlob(BlobItem::new(blob_hash))); Ok(blob::PutResponse { data_exists: false }) } Err(db_err) => { self.should_close_stream = true; Err(handle_db_error(db_err)) } } } pub async fn handle_data_chunk( &mut self, mut new_data: Vec, ) -> PutResult { let blob_item = match &self.action { Some(PutAction::UploadNewBlob(blob_item)) => blob_item, _ => { self.should_close_stream = true; error!("Data chunk sent before upload action is started"); return Err(Status::invalid_argument( "Holder and hash should be provided before data", )); } }; trace!("Received {} bytes of data", new_data.len()); // create upload session if it doesn't already exist if self.uploader.is_none() { debug!("Uploader doesn't exist, starting new session"); self.uploader = match self.s3.start_upload_session(&blob_item.s3_path).await { Ok(session) => Some(session), Err(err) => { self.should_close_stream = true; error!("Failed to create upload session: {:?}", err); return Err(Status::aborted("Internal error")); } } } let uploader = self.uploader.as_mut().unwrap(); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error self.current_chunk.append(&mut new_data); if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!("Chunk size exceeded, adding new S3 part"); if let Err(err) = uploader.add_part(self.current_chunk.take_out()).await { self.should_close_stream = true; error!("Failed to upload S3 part: {:?}", err); return Err(Status::aborted("Internal error")); } } Ok(blob::PutResponse { data_exists: false }) } /// This function should be called after the input stream is finished. /// This consumes `self` so this put handler instance cannot be used /// after this is called. pub async fn finish(self) -> Result<(), Status> { if self.action.is_none() { debug!("No action to perform, finishing now"); return Ok(()); } let holder = self.holder.ok_or_else(|| { error!("Cannot finish action. No holder provided!"); Status::aborted("Internal error") })?; let blob_hash = self.blob_hash.ok_or_else(|| { error!("Cannot finish action. No blob hash provided!"); Status::aborted("Internal error") })?; let blob_item = match self.action { None => return Ok(()), Some(PutAction::AssignHolder) => { return assign_holder_to_blob(&self.db, holder, blob_hash).await; } Some(PutAction::UploadNewBlob(blob_item)) => blob_item, }; let mut uploader = self.uploader.ok_or_else(|| { // This also happens when client cancels before sending any data chunk warn!("No uploader was created, finishing now"); Status::aborted("Internal error") })?; if !self.current_chunk.is_empty() { if let Err(err) = uploader.add_part(self.current_chunk).await { error!("Failed to upload final part: {:?}", err); return Err(Status::aborted("Internal error")); } } if let Err(err) = uploader.finish_upload().await { error!("Failed to finish upload session: {:?}", err); return Err(Status::aborted("Internal error")); } self .db .put_blob_item(blob_item) .await .map_err(handle_db_error)?; assign_holder_to_blob(&self.db, holder, blob_hash).await?; debug!("Upload finished successfully"); Ok(()) } } async fn assign_holder_to_blob( db: &DatabaseClient, holder: String, blob_hash: String, ) -> Result<(), Status> { let reverse_index_item = ReverseIndexItem { holder, blob_hash }; db.put_reverse_index_item(reverse_index_item) .await .map_err(handle_db_error) } fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); Status::unavailable("please retry") } DBError::Blob(e) => { error!("Encountered Blob database error: {}", e); Status::failed_precondition("Internal error") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } } diff --git a/services/blob/src/http/context.rs b/services/blob/src/http/context.rs index 25f2440bf..03efc9620 100644 --- a/services/blob/src/http/context.rs +++ b/services/blob/src/http/context.rs @@ -1,70 +1,70 @@ -use crate::database::ReverseIndexItem; -use crate::database::{BlobItem, DatabaseClient, Error as DBError}; +use crate::database::errors::Error as DBError; +use crate::database::old::{BlobItem, DatabaseClient, ReverseIndexItem}; use crate::s3::{Error as S3Error, S3Client, S3Path}; use actix_web::error::{ ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, }; use actix_web::Error as HttpError; use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use tracing::{debug, error, warn}; /// This structure is passed to every HTTP request handler /// It should be cloneable because each HTTP worker thread receives a copy #[derive(Clone)] pub struct AppContext { pub db: DatabaseClient, pub s3: S3Client, } impl AppContext { pub async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, ) -> Result { let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("No blob found for {:?}", reverse_index_item); Err(ErrorNotFound("blob not found")) } Err(err) => Err(handle_db_error(err)), } } } pub fn handle_db_error(db_error: DBError) -> HttpError { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } DBError::Blob(blob_err) => { error!("Encountered Blob database error: {}", blob_err); ErrorInternalServerError("Internal error") } err => { error!("Encountered an unexpected error: {}", err); ErrorInternalServerError("unexpected error") } } } pub fn handle_s3_error(s3_error: S3Error) -> HttpError { match s3_error { S3Error::EmptyUpload => { warn!("Empty upload. Aborting"); ErrorBadRequest("Empty upload") } err => { error!("Encountered S3 error: {:?}", err); ErrorInternalServerError("Internal error") } } } diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index e5d9c1e86..394d94383 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -1,379 +1,379 @@ use crate::constants::{ BLOB_DOWNLOAD_CHUNK_SIZE, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }; -use crate::database::{BlobItem, ReverseIndexItem}; +use crate::database::old::{BlobItem, ReverseIndexItem}; use crate::http::context::handle_s3_error; use crate::tools::MemOps; use crate::validate_identifier; use super::{handle_db_error, AppContext}; use actix_web::error::{ ErrorBadRequest, ErrorConflict, ErrorInternalServerError, ErrorNotFound, ErrorRangeNotSatisfiable, }; use actix_web::{ http::header::{ByteRangeSpec, Range}, web, Error as HttpError, HttpResponse, }; use anyhow::Result; use async_stream::{try_stream, AsyncStream}; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; /// Returns a tuple of first and last byte number (inclusive) represented by given range header. fn parse_range_header( range_header: &Option>, file_size: u64, ) -> actix_web::Result<(u64, u64)> { let (range_start, range_end): (u64, u64) = match range_header { Some(web::Header(Range::Bytes(ranges))) => { if ranges.len() > 1 { return Err(ErrorBadRequest("Multiple ranges not supported")); } match ranges[0] { ByteRangeSpec::FromTo(start, end) => { if end >= file_size || start > end { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, end) } ByteRangeSpec::From(start) => { if start >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (start, file_size - 1) } ByteRangeSpec::Last(length) => { if length >= file_size { return Err(ErrorRangeNotSatisfiable("Range not satisfiable")); } (file_size - length, file_size - 1) } } } Some(web::Header(Range::Unregistered(..))) => { return Err(ErrorBadRequest("Use ranges registered at IANA")); } None => (0, file_size - 1), }; Ok((range_start, range_end)) } #[instrument( name = "get_blob", skip_all, fields(blob_hash = %params.as_ref().as_str(), s3_path)) ] pub async fn get_blob_handler( ctx: web::Data, params: web::Path, range_header: Option>, ) -> actix_web::Result { info!("Get blob request"); let blob_hash = params.into_inner(); validate_identifier!(blob_hash); let s3_path = match ctx.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("Blob with given hash not found in database"); Err(ErrorNotFound("blob not found")) } Err(err) => Err(handle_db_error(err)), }?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = ctx .s3 .get_object_metadata(&s3_path) .await .map_err(handle_s3_error)?; let file_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to get S3 object content length: {:?}", err); ErrorInternalServerError("server error") })?; // Stream the data in chunks to avoid loading the whole file into memory. let chunk_size: u64 = BLOB_DOWNLOAD_CHUNK_SIZE; let s3 = ctx.s3.clone(); let (range_start, range_end): (u64, u64) = parse_range_header(&range_header, file_size)?; let stream: AsyncStream, _> = try_stream! { debug!(?range_start, ?range_end, "Getting range of data"); let mut offset: u64 = range_start; while offset < range_end { let next_size = std::cmp::min(chunk_size, range_end - offset + 1); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); let data = s3.get_object_bytes(&s3_path, range).await.map_err(handle_s3_error)?; yield web::Bytes::from(data); offset += chunk_size; } }; let content_length = (range_end - range_start + 1).to_string(); if range_header.is_some() { return Ok( HttpResponse::PartialContent() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .append_header(( "Content-Range", format!("bytes {}-{}/{}", range_start, range_end, file_size), )) .streaming(Box::pin(stream.in_current_span())), ); } Ok( HttpResponse::Ok() .content_type("application/octet-stream") .append_header(("Content-Length", content_length)) .streaming(Box::pin(stream.in_current_span())), ) } #[derive(Deserialize, Debug)] pub struct AssignHolderPayload { holder: String, blob_hash: String, } #[derive(Serialize)] struct AssignHolderResponnse { data_exists: bool, } #[instrument(name = "assign_holder", skip(ctx))] pub async fn assign_holder_handler( ctx: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Assign holder request"); let AssignHolderPayload { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); if ctx .db .find_reverse_index_by_holder(&holder) .await .map_err(handle_db_error)? .is_some() { warn!("holder already assigned"); return Err(ErrorConflict("holder already assigned")); } let data_exists = ctx .db .find_blob_item(&blob_hash) .await .map_err(handle_db_error)? .is_some(); debug!(data_exists, "Checked blob item existence"); let reverse_index_item = ReverseIndexItem { holder, blob_hash }; ctx .db .put_reverse_index_item(reverse_index_item) .await .map_err(handle_db_error)?; Ok(HttpResponse::Ok().json(web::Json(AssignHolderResponnse { data_exists }))) } async fn get_blob_hash_field( multipart_payload: &mut actix_multipart::Multipart, ) -> Result { let Some(mut field) = multipart_payload.try_next().await? else { debug!("Malfolmed multipart request"); return Err(ErrorBadRequest("Bad request")); }; if field.name() != "blob_hash" { warn!("Blob hash is required as a first form field"); return Err(ErrorBadRequest("Bad request")); } let mut buf = Vec::new(); while let Some(chunk) = field.try_next().await? { buf.extend_from_slice(&chunk); } let blob_hash = String::from_utf8(buf) .map_err(|_| ErrorInternalServerError("Internal error"))?; validate_identifier!(blob_hash); return Ok(blob_hash); } async fn process_blob_data( multipart_payload: &mut actix_multipart::Multipart, upload_session: &mut crate::s3::MultiPartUploadSession, ) -> Result<(), HttpError> { let mut s3_chunk: Vec = Vec::new(); while let Some(mut field) = multipart_payload.try_next().await? { let field_name = field.name(); if field_name != "blob_data" { warn!( field_name, "Malfolmed request: 'blob_data' multipart field expected." ); return Err(ErrorBadRequest("Bad request")); } while let Some(chunk) = field.try_next().await? { let mut chunk = chunk.to_vec(); s3_chunk.append(&mut chunk); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error if s3_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!( chunk_size = s3_chunk.len(), "Chunk size exceeded, adding new S3 part" ); upload_session .add_part(s3_chunk.take_out()) .await .map_err(handle_s3_error)?; } } } // add the remaining data as the last S3 part if !s3_chunk.is_empty() { upload_session .add_part(s3_chunk) .await .map_err(handle_s3_error)?; } Ok(()) } #[instrument(skip_all, name = "upload_blob", fields(blob_hash))] pub async fn upload_blob_handler( ctx: web::Data, mut payload: actix_multipart::Multipart, ) -> actix_web::Result { info!("Upload blob request"); let blob_hash = get_blob_hash_field(&mut payload).await?; debug!("Received blob_hash: {}", &blob_hash); tracing::Span::current().record("blob_hash", &blob_hash); if ctx .db .find_blob_item(&blob_hash) .await .map_err(handle_db_error)? .is_some() { warn!("Blob with given hash already exists"); return Err(ErrorConflict("Conflict")); } let blob_item = BlobItem::new(blob_hash); let mut upload_session = ctx .s3 .start_upload_session(&blob_item.s3_path) .await .map_err(handle_s3_error)?; trace!("Receiving blob data"); process_blob_data(&mut payload, &mut upload_session).await?; upload_session .finish_upload() .await .map_err(handle_s3_error)?; trace!("Upload finished, saving blob item to DB: {:?}", &blob_item); ctx .db .put_blob_item(blob_item) .await .map_err(handle_db_error)?; Ok(HttpResponse::NoContent().finish()) } #[derive(Deserialize, Debug)] pub struct RemoveHolderPayload { holder: String, blob_hash: String, } #[instrument(name = "remove_holder", skip(ctx))] pub async fn remove_holder_handler( ctx: web::Data, payload: web::Json, ) -> actix_web::Result { info!("Remove holder request"); let RemoveHolderPayload { holder, blob_hash } = payload.into_inner(); validate_identifier!(holder); validate_identifier!(blob_hash); let reverse_index_item = ctx .db .find_reverse_index_by_holder(&holder) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); ErrorNotFound("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; ctx .db .remove_reverse_index_item(&holder) .await .map_err(handle_db_error)?; // TODO: handle cleanup here properly // for now the object's being removed right away // after the last holder was removed if ctx .db .find_reverse_index_by_hash(blob_hash) .await .map_err(handle_db_error)? .is_empty() { let s3_path = ctx .find_s3_path_by_reverse_index(&reverse_index_item) .await?; debug!("Last holder removed. Deleting S3 object: {:?}", &s3_path); ctx .s3 .delete_object(&s3_path) .await .map_err(handle_s3_error)?; ctx .db .remove_blob_item(blob_hash) .await .map_err(handle_db_error)?; } else { debug!("Blob still has holders, S3 object not deleted"); } Ok(HttpResponse::NoContent().finish()) } diff --git a/services/blob/src/http/mod.rs b/services/blob/src/http/mod.rs index eadd518ac..663a9ffc5 100644 --- a/services/blob/src/http/mod.rs +++ b/services/blob/src/http/mod.rs @@ -1,71 +1,71 @@ use crate::config::CONFIG; -use crate::database::DatabaseClient; +use crate::database::old::DatabaseClient; use crate::s3::S3Client; use actix_cors::Cors; use actix_web::{web, App, HttpServer}; use anyhow::Result; use tracing::info; mod context; use context::AppContext; mod utils; mod handlers { pub(super) mod blob; // convenience exports to be used in handlers use super::context::{handle_db_error, AppContext}; } fn cors_config() -> Cors { if CONFIG.is_sandbox { // All origins, methods, request headers and exposed headers allowed. // Credentials supported. Max age 1 hour. Does not send wildcard. return Cors::permissive(); } Cors::default() .allowed_origin("https://web.comm.app") // for local development using prod service .allowed_origin("http://localhost:3000") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allow_any_header() .expose_any_header() } pub async fn run_http_server( db_client: DatabaseClient, s3_client: S3Client, ) -> Result<()> { info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { // context that is passed to every handler let ctx = AppContext { db: db_client.to_owned(), s3: s3_client.to_owned(), }; App::new() .wrap(tracing_actix_web::TracingLogger::default()) .wrap(cors_config()) .app_data(web::Data::new(ctx)) .service( web::resource("/blob/{holder}") .route(web::get().to(handlers::blob::get_blob_handler)), ) .service( web::resource("/blob") .route(web::put().to(handlers::blob::upload_blob_handler)) .route(web::post().to(handlers::blob::assign_holder_handler)) .route(web::delete().to(handlers::blob::remove_holder_handler)), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs index 276723fa8..2f7437ee5 100644 --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,36 +1,36 @@ pub mod config; pub mod constants; pub mod database; pub mod grpc; pub mod http; pub mod s3; pub mod tools; use anyhow::Result; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; - let db = database::DatabaseClient::new(&aws_config); + let db = database::old::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); tokio::select! { http_result = crate::http::run_http_server(db.clone(), s3.clone()) => http_result, grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, } }