diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs index c8b151f57..fb76aa57c 100644 --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -1,234 +1,234 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use crate::{ blob::{start_simple_uploader, BlobClient, BlobUploader}, database::{BackupItem, DatabaseClient}, service::proto, }; use super::handle_db_error; type CreateBackupResult = Result; enum HandlerState { /// Initial state. Handler is receiving non-data inputs ReceivingParams, /// Handler is receiving data chunks ReceivingData { uploader: BlobUploader }, /// A special case when Blob service claims that a blob with given /// [`CreateBackupHandler::data_hash`] already exists DataAlreadyExists, } pub struct CreateBackupHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, device_id: Option, key_entropy: Option>, data_hash: Option, // client instances db: DatabaseClient, blob_client: BlobClient, // internal state state: HandlerState, backup_id: String, holder: Option, } impl CreateBackupHandler { pub fn new(db: DatabaseClient, blob_client: BlobClient) -> Self { CreateBackupHandler { should_close_stream: false, user_id: None, device_id: None, key_entropy: None, data_hash: None, db, blob_client, state: HandlerState::ReceivingParams, backup_id: String::new(), holder: None, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> CreateBackupResult { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_device_id( &mut self, device_id: String, ) -> CreateBackupResult { if self.device_id.is_some() { warn!("Device ID already provided"); return Err(Status::invalid_argument("Device ID already provided")); } tracing::Span::current().record("device_id", &device_id); self.device_id = Some(device_id); self.handle_internal().await } pub async fn handle_key_entropy( &mut self, key_entropy: Vec, ) -> CreateBackupResult { if self.key_entropy.is_some() { warn!("Key entropy already provided"); return Err(Status::invalid_argument("Key entropy already provided")); } self.key_entropy = Some(key_entropy); self.handle_internal().await } pub async fn handle_data_hash( &mut self, data_hash: Vec, ) -> CreateBackupResult { if self.data_hash.is_some() { warn!("Data hash already provided"); return Err(Status::invalid_argument("Data hash already provided")); } let hash_str = String::from_utf8(data_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; - tracing::Span::current().record("data_hash", &hash_str); + debug!("Received data hash: {}", &hash_str); self.data_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_data_chunk( &mut self, data_chunk: Vec, ) -> CreateBackupResult { let HandlerState::ReceivingData { ref mut uploader } = self.state else { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); }; // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(proto::CreateNewBackupResponse { backup_id: self.backup_id.clone(), }); } trace!("Received {} bytes of data", data_chunk.len()); uploader.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; Ok(proto::CreateNewBackupResponse { // actual Backup ID should be sent only once, the time it is generated // see handle_internal() backup_id: String::new(), }) } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { match self.state { HandlerState::ReceivingParams => { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(()); } HandlerState::ReceivingData { uploader } => { uploader.terminate().await.map_err(|err| { error!("Uploader task closed with error: {:?}", err); Status::aborted("Internal error") })?; } HandlerState::DataAlreadyExists => (), } let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { error!("Holder / UserID absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; if self.backup_id.is_empty() { error!("Backup ID was not generated. This should never happen!"); return Err(Status::failed_precondition("Internal error")); } let backup_item = BackupItem::new(user_id, self.backup_id, holder); self .db .put_backup_item(backup_item) .await .map_err(handle_db_error)?; Ok(()) } // internal param handler helper async fn handle_internal(&mut self) -> CreateBackupResult { if !matches!(self.state, HandlerState::ReceivingParams) { error!("CreateBackupHandler already received all non-data params."); return Err(Status::failed_precondition("Backup data chunk expected")); } // all non-data inputs must be set before receiving backup data chunks let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( self.data_hash.as_ref(), self.device_id.as_ref(), self.user_id.as_ref(), self.key_entropy.as_ref(), ) else { // return empty backup ID when inputs are incomplete return Ok(proto::CreateNewBackupResponse { backup_id: "".to_string(), }); }; let backup_id = generate_backup_id(device_id); let holder = crate::utils::generate_blob_holder(data_hash, &backup_id, None); self.backup_id = backup_id.clone(); self.holder = Some(holder.clone()); tracing::Span::current().record("backup_id", &backup_id); tracing::Span::current().record("blob_holder", &holder); match start_simple_uploader(&holder, data_hash, self.blob_client.clone()) .await? { Some(uploader) => { self.state = HandlerState::ReceivingData { uploader }; trace!("Everything prepared, waiting for data..."); } None => { // Blob with given data_hash already exists debug!("Blob already exists, finishing"); self.should_close_stream = true; self.state = HandlerState::DataAlreadyExists; } }; Ok(proto::CreateNewBackupResponse { backup_id }) } } /// Generates ID for a new backup fn generate_backup_id(device_id: &str) -> String { format!( "{device_id}_{timestamp}", device_id = device_id, timestamp = chrono::Utc::now().timestamp_millis() ) } diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs index d7e6577a7..bc116891d 100644 --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -1,272 +1,272 @@ use tonic::Status; use tracing::{debug, error, trace, warn}; use uuid::Uuid; use super::handle_db_error; use crate::{ blob::{BlobClient, BlobUploader}, constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, }; enum LogPersistence { /// Log entirely stored in DynamoDB database DB, /// Log contents stored with Blob service BLOB { holder: String }, } pub struct SendLogHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, backup_id: Option, log_hash: Option, // internal state log_id: Option, log_buffer: Vec, persistence_method: LogPersistence, should_receive_data: bool, // client instances db: DatabaseClient, blob_client: BlobClient, uploader: Option, } impl SendLogHandler { pub fn new(db: &DatabaseClient, blob_client: &BlobClient) -> Self { SendLogHandler { db: db.clone(), blob_client: blob_client.clone(), uploader: None, user_id: None, backup_id: None, log_hash: None, log_id: None, log_buffer: Vec::new(), persistence_method: LogPersistence::DB, should_receive_data: false, should_close_stream: false, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> Result<(), Status> { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_backup_id( &mut self, backup_id: String, ) -> Result<(), Status> { if self.backup_id.is_some() { warn!("backup ID already provided"); return Err(Status::invalid_argument("Backup ID already provided")); } tracing::Span::current().record("backup_id", &backup_id); self.backup_id = Some(backup_id); self.handle_internal().await } pub async fn handle_log_hash( &mut self, log_hash: Vec, ) -> Result<(), Status> { if self.log_hash.is_some() { warn!("Log hash already provided"); return Err(Status::invalid_argument("Log hash already provided")); } let hash_str = String::from_utf8(log_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; - tracing::Span::current().record("log_hash", &hash_str); + debug!("Received log hash: {}", &hash_str); self.log_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_log_data( &mut self, data_chunk: Vec, ) -> Result<(), Status> { if !self.should_receive_data || self.log_id.is_none() { self.should_close_stream = true; error!("Data chunk sent before other inputs"); return Err(Status::invalid_argument( "Data chunk sent before other inputs", )); } // empty chunk ends transmission if data_chunk.is_empty() { self.should_close_stream = true; return Ok(()); } match self.persistence_method { LogPersistence::DB => { self.log_buffer.extend(data_chunk); self.ensure_size_constraints().await?; } LogPersistence::BLOB { .. } => { let Some(client) = self.uploader.as_mut() else { self.should_close_stream = true; error!("Put client uninitialized. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; client.put_data(data_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; } } Ok(()) } pub async fn finish(self) -> Result { if let Some(client) = self.uploader { client.terminate().await.map_err(|err| { error!("Put client task closed with error: {:?}", err); Status::aborted("Internal error") })?; } else { trace!("No uploader initialized. Skipping termination"); } if !self.should_receive_data { // client probably aborted early trace!("Nothing to store in database. Finishing early"); return Ok(SendLogResponse { log_checkpoint: "".to_string(), }); } let (Some(backup_id), Some(log_id), Some(data_hash)) = ( self.backup_id, self.log_id, self.log_hash ) else { error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let (log_value, persisted_in_blob) = match self.persistence_method { LogPersistence::BLOB { holder } => (holder, true), LogPersistence::DB => { let contents = String::from_utf8(self.log_buffer).map_err(|err| { error!("Failed to convert log contents data into string: {:?}", err); Status::aborted("Unexpected error") })?; (contents, false) } }; let log_item = LogItem { backup_id, log_id: log_id.clone(), persisted_in_blob, value: log_value, attachment_holders: String::new(), data_hash, }; self .db .put_log_item(log_item) .await .map_err(handle_db_error)?; Ok(SendLogResponse { log_checkpoint: log_id, }) } // internal param handler helper async fn handle_internal(&mut self) -> Result<(), Status> { if self.should_receive_data { error!("SendLogHandler is already expecting data chunks"); return Err(Status::failed_precondition("Log data chunk expected")); } // all non-data inputs must be set before receiving log contents let (Some(backup_id), Some(_), Some(_)) = ( self.backup_id.as_ref(), self.user_id.as_ref(), self.log_hash.as_ref() ) else { return Ok(()); }; let log_id = generate_log_id(backup_id); tracing::Span::current().record("log_id", &log_id); self.log_id = Some(log_id); trace!("Everything prepared, waiting for data..."); self.should_receive_data = true; Ok(()) } /// Ensures log fits db size constraints. If not, it is moved to blob /// persistence async fn ensure_size_constraints(&mut self) -> Result<(), Status> { let (Some(backup_id), Some(log_id), Some(log_hash)) = ( self.backup_id.as_ref(), self.log_id.as_ref(), self.log_hash.as_ref() ) else { self.should_close_stream = true; error!("Log info absent in data mode. This should never happen!"); return Err(Status::failed_precondition("Internal error")); }; let log_size = LogItem::size_from_components( backup_id, log_id, log_hash, &self.log_buffer, ); if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { debug!("Log too large, switching persistence to Blob"); let holder = crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); match crate::blob::start_simple_uploader( &holder, &log_hash, self.blob_client.clone(), ) .await? { Some(mut uploader) => { let blob_chunk = std::mem::take(&mut self.log_buffer); uploader.put_data(blob_chunk).await.map_err(|err| { error!("Failed to upload data chunk: {:?}", err); Status::aborted("Internal error") })?; self.uploader = Some(uploader); } None => { debug!("Log hash already exists"); self.should_close_stream = true; } } self.persistence_method = LogPersistence::BLOB { holder }; } Ok(()) } } fn generate_log_id(backup_id: &str) -> String { format!( "{backup_id}{sep}{uuid}", backup_id = backup_id, sep = ID_SEPARATOR, uuid = Uuid::new_v4() ) } diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs index 3b3fc1922..ea03fe8af 100644 --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -1,241 +1,241 @@ use aws_sdk_dynamodb::Error as DynamoDBError; use proto::backup_service_server::BackupService; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, instrument, trace, warn}; use tracing_futures::Instrument; use crate::{ blob::BlobClient, constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::{DatabaseClient, Error as DBError}, }; mod proto { tonic::include_proto!("backup"); } pub use proto::backup_service_server::BackupServiceServer; /// submodule containing gRPC endpoint handler implementations mod handlers { pub(super) mod add_attachments; pub(super) mod create_backup; pub(super) mod pull_backup; pub(super) mod send_log; // re-exports for convenient usage in handlers pub(self) use super::handle_db_error; pub(self) use super::proto; } use self::handlers::create_backup::CreateBackupHandler; use self::handlers::pull_backup::PullBackupHandler; use self::handlers::send_log::SendLogHandler; pub struct MyBackupService { db: DatabaseClient, blob_client: BlobClient, } impl MyBackupService { pub fn new(db_client: DatabaseClient, blob_client: BlobClient) -> Self { MyBackupService { db: db_client, blob_client, } } } // gRPC implementation #[tonic::async_trait] impl BackupService for MyBackupService { type CreateNewBackupStream = Pin< Box< dyn Stream> + Send, >, >; - #[instrument(skip_all, fields(device_id, data_hash, backup_id, blob_holder))] + #[instrument(skip_all, fields(device_id, backup_id, blob_holder))] async fn create_new_backup( &self, request: Request>, ) -> Result, Status> { use proto::create_new_backup_request::Data::*; info!("CreateNewBackup request: {:?}", request); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); let blob_client = self.blob_client.clone(); let worker = async move { let mut handler = CreateBackupHandler::new(db, blob_client); while let Some(message) = in_stream.next().await { let response = match message { Ok(proto::CreateNewBackupRequest { data: Some(UserId(user_id)), }) => handler.handle_user_id(user_id).await, Ok(proto::CreateNewBackupRequest { data: Some(DeviceId(device_id)), }) => handler.handle_device_id(device_id).await, Ok(proto::CreateNewBackupRequest { data: Some(KeyEntropy(key_entropy)), }) => handler.handle_key_entropy(key_entropy).await, Ok(proto::CreateNewBackupRequest { data: Some(NewCompactionHash(hash)), }) => handler.handle_data_hash(hash).await, Ok(proto::CreateNewBackupRequest { data: Some(NewCompactionChunk(chunk)), }) => handler.handle_data_chunk(chunk).await, unexpected => { error!("Received an unexpected request: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; trace!("Sending response: {:?}", response); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if handler.should_close_stream { trace!("Handler requested to close stream"); break; } } if let Err(status) = handler.finish().await { trace!("Sending error response: {:?}", status); let _ = tx.send(Err(status)).await; } debug!("Request finished processing"); }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::CreateNewBackupStream )) } - #[instrument(skip_all, fields(backup_id, log_hash, log_id))] + #[instrument(skip_all, fields(backup_id, log_id))] async fn send_log( &self, request: Request>, ) -> Result, Status> { use proto::send_log_request::Data::*; info!("SendLog request: {:?}", request); let mut handler = SendLogHandler::new(&self.db, &self.blob_client); let mut in_stream = request.into_inner(); while let Some(message) = in_stream.next().await { let result = match message { Ok(proto::SendLogRequest { data: Some(UserId(user_id)), }) => handler.handle_user_id(user_id).await, Ok(proto::SendLogRequest { data: Some(BackupId(backup_id)), }) => handler.handle_backup_id(backup_id).await, Ok(proto::SendLogRequest { data: Some(LogHash(log_hash)), }) => handler.handle_log_hash(log_hash).await, Ok(proto::SendLogRequest { data: Some(LogData(chunk)), }) => handler.handle_log_data(chunk).await, unexpected => { error!("Received an unexpected request: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; if let Err(err) = result { error!("An error occurred when processing request: {:?}", err); return Err(err); } if handler.should_close_stream { trace!("Handler requested to close request stream"); break; } } let response = handler.finish().await; debug!("Finished. Sending response: {:?}", response); response.map(|response_body| Response::new(response_body)) } type RecoverBackupKeyStream = Pin< Box< dyn Stream> + Send, >, >; #[instrument(skip(self))] async fn recover_backup_key( &self, _request: Request>, ) -> Result, Status> { Err(Status::unimplemented("unimplemented")) } type PullBackupStream = Pin< Box> + Send>, >; #[instrument(skip_all, fields(backup_id = &request.get_ref().backup_id))] async fn pull_backup( &self, request: Request, ) -> Result, Status> { info!("PullBackup request: {:?}", request); let handler = PullBackupHandler::new(&self.db, &self.blob_client, request.into_inner()) .await?; let stream = handler.into_response_stream().in_current_span(); Ok(Response::new(Box::pin(stream) as Self::PullBackupStream)) } #[instrument(skip_all, fields( backup_id = &request.get_ref().backup_id, log_id = &request.get_ref().log_id) )] async fn add_attachments( &self, request: Request, ) -> Result, Status> { info!( "AddAttachment request. New holders: {}", &request.get_ref().holders ); handlers::add_attachments::handle_add_attachments( &self.db, &self.blob_client, request.into_inner(), ) .await?; info!("Request processed successfully"); Ok(Response::new(())) } } /// A helper converting our Database errors into gRPC responses fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs index 369d78495..e66bbd3a8 100644 --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,482 +1,482 @@ use anyhow::Result; use aws_sdk_dynamodb::Error as DynamoDBError; use blob::blob_service_server::BlobService; use chrono::Utc; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; use crate::{ constants::{ BLOB_S3_BUCKET_NAME, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE, }, database::{BlobItem, DatabaseClient, Error as DBError, ReverseIndexItem}, s3::{MultiPartUploadSession, S3Client, S3Path}, tools::MemOps, }; pub mod blob { tonic::include_proto!("blob"); } pub struct MyBlobService { db: DatabaseClient, s3: S3Client, } impl MyBlobService { pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self { MyBlobService { db: db_client, s3: s3_client, } } async fn find_s3_path_by_reverse_index( &self, reverse_index_item: &ReverseIndexItem, ) -> Result { let blob_hash = &reverse_index_item.blob_hash; match self.db.find_blob_item(&blob_hash).await { Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path), Ok(None) => { debug!("No blob found for {:?}", reverse_index_item); Err(Status::not_found("blob not found")) } Err(err) => Err(handle_db_error(err)), } } async fn find_s3_path_by_holder( &self, holder: &str, ) -> Result { match self.db.find_reverse_index_by_holder(holder).await { Ok(Some(reverse_index)) => { self.find_s3_path_by_reverse_index(&reverse_index).await } Ok(None) => { debug!("No db entry found for holder {:?}", holder); Err(Status::not_found("blob not found")) } Err(err) => Err(handle_db_error(err)), } } } // gRPC implementation #[tonic::async_trait] impl BlobService for MyBlobService { type PutStream = Pin> + Send>>; - #[instrument(skip_all, fields(holder, blob_hash))] + #[instrument(skip_all, fields(holder))] async fn put( &self, request: Request>, ) -> Result, Status> { info!("Put blob request: {:?}", request); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db = self.db.clone(); let s3 = self.s3.clone(); let worker = async move { let mut put_handler = PutHandler::new(&db, &s3); while let Some(message) = in_stream.next().await { let response = match message { Ok(blob::PutRequest { data: Some(blob::put_request::Data::Holder(new_holder)), }) => put_handler.handle_holder(new_holder).await, Ok(blob::PutRequest { data: Some(blob::put_request::Data::BlobHash(new_hash)), }) => put_handler.handle_blob_hash(new_hash).await, Ok(blob::PutRequest { data: Some(blob::put_request::Data::DataChunk(new_data)), }) => put_handler.handle_data_chunk(new_data).await, unexpected => { error!("Received an unexpected Result: {:?}", unexpected); Err(Status::unknown("unknown error")) } }; trace!("Sending response: {:?}", response); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if put_handler.should_close_stream { trace!("Put handler requested to close stream"); break; } } if let Err(status) = put_handler.finish().await { trace!("Sending error response: {:?}", status); let _ = tx.send(Err(status)).await; } }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::PutStream)) } type GetStream = Pin> + Send>>; #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))] async fn get( &self, request: Request, ) -> Result, Status> { info!("Get blob request: {:?}", request); let message: blob::GetRequest = request.into_inner(); let s3_path = self.find_s3_path_by_holder(&message.holder).await?; tracing::Span::current().record("s3_path", s3_path.to_full_path()); let object_metadata = self.s3.get_object_metadata(&s3_path).await.map_err(|err| { error!("Failed to get S3 object metadata: {:?}", err); Status::aborted("server error") })?; let file_size: u64 = object_metadata.content_length().try_into().map_err(|err| { error!("Failed to get S3 object content length: {:?}", err); Status::aborted("server error") })?; let chunk_size: u64 = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let s3 = self.s3.clone(); let worker = async move { let mut offset: u64 = 0; while offset < file_size { let next_size = std::cmp::min(chunk_size, file_size - offset); let range = offset..(offset + next_size); trace!(?range, "Getting {} bytes of data", next_size); let response = match s3.get_object_bytes(&s3_path, range).await { Ok(data) => Ok(blob::GetResponse { data_chunk: data }), Err(err) => { error!("Failed to download data chunk: {:?}", err); Err(Status::aborted("download failed")) } }; let should_abort = response.is_err(); if let Err(e) = tx.send(response).await { error!("Response was dropped: {}", e); break; } if should_abort { trace!("Error response, aborting"); break; } offset += chunk_size; } }; tokio::spawn(worker.in_current_span()); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::GetStream)) } #[instrument(skip_all, fields(holder = %request.get_ref().holder))] async fn remove( &self, request: Request, ) -> Result, Status> { info!("Remove blob request: {:?}", request); let message = request.into_inner(); let holder = message.holder.as_str(); let reverse_index_item = self .db .find_reverse_index_by_holder(holder) .await .map_err(handle_db_error)? .ok_or_else(|| { debug!("Blob not found"); Status::not_found("Blob not found") })?; let blob_hash = &reverse_index_item.blob_hash; self .db .remove_reverse_index_item(holder) .await .map_err(handle_db_error)?; // TODO handle cleanup here properly // for now the object's being removed right away // after the last holder was removed if self .db .find_reverse_index_by_hash(blob_hash) .await .map_err(handle_db_error)? .is_empty() { let s3_path = self .find_s3_path_by_reverse_index(&reverse_index_item) .await?; self.s3.delete_object(&s3_path).await.map_err(|err| { error!("Failed to delete S3 object: {:?}", err); Status::aborted("Internal error") })?; self .db .remove_blob_item(blob_hash) .await .map_err(handle_db_error)?; } Ok(Response::new(())) } } type PutResult = Result; enum PutAction { AssignHolder, UploadNewBlob(BlobItem), } /// A helper for handling Put RPC requests struct PutHandler { /// Should the stream be closed by server pub should_close_stream: bool, action: Option, holder: Option, blob_hash: Option, current_chunk: Vec, uploader: Option, db: DatabaseClient, s3: S3Client, } impl PutHandler { fn new(db: &DatabaseClient, s3: &S3Client) -> Self { PutHandler { should_close_stream: false, action: None, holder: None, blob_hash: None, current_chunk: Vec::new(), uploader: None, db: db.clone(), s3: s3.clone(), } } pub async fn handle_holder(&mut self, new_holder: String) -> PutResult { if self.holder.is_some() { warn!("Holder already provided"); return Err(Status::invalid_argument("Holder already provided")); } tracing::Span::current().record("holder", &new_holder); self.holder = Some(new_holder); self.determine_action().await } pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult { if self.blob_hash.is_some() { warn!("Blob hash already provided"); return Err(Status::invalid_argument("Blob hash already provided")); } - tracing::Span::current().record("blob_hash", &new_hash); + debug!("Blob hash: {}", new_hash); self.blob_hash = Some(new_hash); self.determine_action().await } /// private helper function to determine purpose of this RPC call async fn determine_action(&mut self) -> PutResult { // this should be called only if action isn't determined yet // this error should actually never happen if self.action.is_some() { error!("Put action is already started"); return Err(Status::failed_precondition("Put action is already started")); } // holder and hash need both to be set in order to continue // otherwise we send a standard response if self.holder.is_none() || self.blob_hash.is_none() { return Ok(blob::PutResponse { data_exists: false }); } let blob_hash = self .blob_hash .as_ref() .ok_or_else(|| Status::failed_precondition("Internal error"))?; match self.db.find_blob_item(blob_hash).await { // Hash already exists, so we're only assigning a new holder to it Ok(Some(_)) => { debug!("Blob found, assigning holder"); self.action = Some(PutAction::AssignHolder); self.should_close_stream = true; Ok(blob::PutResponse { data_exists: true }) } // Hash doesn't exist, so we're starting a new upload session Ok(None) => { debug!("Blob not found, starting upload action"); self.action = Some(PutAction::UploadNewBlob(BlobItem { blob_hash: blob_hash.to_string(), s3_path: S3Path { bucket_name: BLOB_S3_BUCKET_NAME.to_string(), object_name: blob_hash.to_string(), }, created: Utc::now(), })); Ok(blob::PutResponse { data_exists: false }) } Err(db_err) => { self.should_close_stream = true; Err(handle_db_error(db_err)) } } } pub async fn handle_data_chunk( &mut self, mut new_data: Vec, ) -> PutResult { let blob_item = match &self.action { Some(PutAction::UploadNewBlob(blob_item)) => blob_item, _ => { self.should_close_stream = true; error!("Data chunk sent before upload action is started"); return Err(Status::invalid_argument( "Holder and hash should be provided before data", )); } }; trace!("Received {} bytes of data", new_data.len()); // create upload session if it doesn't already exist if self.uploader.is_none() { debug!("Uploader doesn't exist, starting new session"); self.uploader = match self.s3.start_upload_session(&blob_item.s3_path).await { Ok(session) => Some(session), Err(err) => { self.should_close_stream = true; error!("Failed to create upload session: {:?}", err); return Err(Status::aborted("Internal error")); } } } let uploader = self.uploader.as_mut().unwrap(); // New parts should be added to AWS only if they exceed minimum part size, // Otherwise AWS returns error self.current_chunk.append(&mut new_data); if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE { trace!("Chunk size exceeded, adding new S3 part"); if let Err(err) = uploader.add_part(self.current_chunk.take_out()).await { self.should_close_stream = true; error!("Failed to upload S3 part: {:?}", err); return Err(Status::aborted("Internal error")); } } Ok(blob::PutResponse { data_exists: false }) } /// This function should be called after the input stream is finished. /// This consumes `self` so this put handler instance cannot be used /// after this is called. pub async fn finish(self) -> Result<(), Status> { if self.action.is_none() { debug!("No action to perform, finishing now"); return Ok(()); } let holder = self.holder.ok_or_else(|| { error!("Cannot finish action. No holder provided!"); Status::aborted("Internal error") })?; let blob_hash = self.blob_hash.ok_or_else(|| { error!("Cannot finish action. No blob hash provided!"); Status::aborted("Internal error") })?; let blob_item = match self.action { None => return Ok(()), Some(PutAction::AssignHolder) => { return assign_holder_to_blob(&self.db, holder, blob_hash).await; } Some(PutAction::UploadNewBlob(blob_item)) => blob_item, }; let mut uploader = self.uploader.ok_or_else(|| { // This also happens when client cancels before sending any data chunk warn!("No uploader was created, finishing now"); Status::aborted("Internal error") })?; if !self.current_chunk.is_empty() { if let Err(err) = uploader.add_part(self.current_chunk).await { error!("Failed to upload final part: {:?}", err); return Err(Status::aborted("Internal error")); } } if let Err(err) = uploader.finish_upload().await { error!("Failed to finish upload session: {:?}", err); return Err(Status::aborted("Internal error")); } self .db .put_blob_item(blob_item) .await .map_err(handle_db_error)?; assign_holder_to_blob(&self.db, holder, blob_hash).await?; debug!("Upload finished successfully"); Ok(()) } } async fn assign_holder_to_blob( db: &DatabaseClient, holder: String, blob_hash: String, ) -> Result<(), Status> { let reverse_index_item = ReverseIndexItem { holder, blob_hash }; db.put_reverse_index_item(reverse_index_item) .await .map_err(handle_db_error) } fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { warn!("AWS transient error occurred"); Status::unavailable("please retry") } DBError::Blob(e) => { error!("Encountered Blob database error: {}", e); Status::failed_precondition("Internal error") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } }