diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -1,9 +1,11 @@ use tonic::Status; -use tracing::{error, trace, warn}; +use tracing::{debug, error, trace, warn}; use uuid::Uuid; use crate::{ - blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient, + blob::PutClient, + constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, + database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, }; @@ -93,7 +95,38 @@ &mut self, data_chunk: Vec, ) -> Result<(), Status> { - unimplemented!() + if !self.should_receive_data || self.log_id.is_none() { + self.should_close_stream = true; + error!("Data chunk sent before other inputs"); + return Err(Status::invalid_argument( + "Data chunk sent before other inputs", + )); + } + + // empty chunk ends transmission + if data_chunk.is_empty() { + self.should_close_stream = true; + return Ok(()); + } + + match self.persistence_method { + LogPersistence::DB => { + self.log_buffer.extend(data_chunk); + self.ensure_size_constraints().await?; + } + LogPersistence::BLOB { .. } => { + let Some(client) = self.blob_client.as_mut() else { + self.should_close_stream = true; + error!("Put client uninitialized. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + client.put_data(data_chunk).await.map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + } + } + Ok(()) } pub async fn finish(self) -> Result { @@ -122,6 +155,49 @@ self.should_receive_data = true; Ok(()) } + + /// Ensures log fits db size constraints. If not, it is moved to blob + /// persistence + async fn ensure_size_constraints(&mut self) -> Result<(), Status> { + let (Some(backup_id), Some(log_id), Some(log_hash)) = ( + self.backup_id.as_ref(), + self.log_id.as_ref(), + self.log_hash.as_ref() + ) else { + self.should_close_stream = true; + error!("Log info absent in data mode. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + + let log_size = LogItem::size_from_components( + backup_id, + log_id, + log_hash, + &self.log_buffer, + ); + if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { + debug!("Log too large, switching persistence to Blob"); + let holder = + crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); + match crate::blob::start_simple_put_client(&holder, &log_hash).await? { + Some(mut put_client) => { + let blob_chunk = std::mem::take(&mut self.log_buffer); + put_client.put_data(blob_chunk).await.map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + + self.persistence_method = LogPersistence::BLOB { holder }; + self.blob_client = Some(put_client); + } + None => { + debug!("Log holder already exists"); + self.should_close_stream = true; + } + } + } + Ok(()) + } } fn generate_log_id(backup_id: &str) -> String {