diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs index 04562718b..5568c3bd1 100644 --- a/services/backup/src/service/handlers/send_log.rs +++ b/services/backup/src/service/handlers/send_log.rs @@ -1,134 +1,209 @@ use tonic::Status; -use tracing::{error, trace, warn}; +use tracing::{debug, error, trace, warn}; use uuid::Uuid; use crate::{ - blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient, + blob::PutClient, + constants::{ID_SEPARATOR, LOG_DATA_SIZE_DATABASE_LIMIT}, + database::{DatabaseClient, LogItem}, service::proto::SendLogResponse, }; enum LogPersistence { /// Log entirely stored in DynamoDB database DB, /// Log contents stored with Blob service BLOB { holder: String }, } pub struct SendLogHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, backup_id: Option, log_hash: Option, // internal state log_id: Option, log_buffer: Vec, persistence_method: LogPersistence, should_receive_data: bool, // client instances db: DatabaseClient, blob_client: Option, } impl SendLogHandler { pub fn new(db: &DatabaseClient) -> Self { SendLogHandler { db: db.clone(), blob_client: None, user_id: None, backup_id: None, log_hash: None, log_id: None, log_buffer: Vec::new(), persistence_method: LogPersistence::DB, should_receive_data: false, should_close_stream: false, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> Result<(), Status> { if self.user_id.is_some() { warn!("user ID already provided"); return Err(Status::invalid_argument("User ID already provided")); } self.user_id = Some(user_id); self.handle_internal().await } pub async fn handle_backup_id( &mut self, backup_id: String, ) -> Result<(), Status> { if self.backup_id.is_some() { warn!("backup ID already provided"); return Err(Status::invalid_argument("Backup ID already provided")); } tracing::Span::current().record("backup_id", &backup_id); self.backup_id = Some(backup_id); self.handle_internal().await } pub async fn handle_log_hash( &mut self, log_hash: Vec, ) -> Result<(), Status> { if self.log_hash.is_some() { warn!("Log hash already provided"); return Err(Status::invalid_argument("Log hash already provided")); } let hash_str = String::from_utf8(log_hash).map_err(|err| { error!("Failed to convert data_hash into string: {:?}", err); Status::aborted("Unexpected error") })?; tracing::Span::current().record("log_hash", &hash_str); self.log_hash = Some(hash_str); self.handle_internal().await } pub async fn handle_log_data( &mut self, data_chunk: Vec, ) -> Result<(), Status> { - unimplemented!() + if !self.should_receive_data || self.log_id.is_none() { + self.should_close_stream = true; + error!("Data chunk sent before other inputs"); + return Err(Status::invalid_argument( + "Data chunk sent before other inputs", + )); + } + + // empty chunk ends transmission + if data_chunk.is_empty() { + self.should_close_stream = true; + return Ok(()); + } + + match self.persistence_method { + LogPersistence::DB => { + self.log_buffer.extend(data_chunk); + self.ensure_size_constraints().await?; + } + LogPersistence::BLOB { .. } => { + let Some(client) = self.blob_client.as_mut() else { + self.should_close_stream = true; + error!("Put client uninitialized. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + client.put_data(data_chunk).await.map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + } + } + Ok(()) } pub async fn finish(self) -> Result { unimplemented!() } // internal param handler helper async fn handle_internal(&mut self) -> Result<(), Status> { if self.should_receive_data { error!("SendLogHandler is already expecting data chunks"); return Err(Status::failed_precondition("Log data chunk expected")); } // all non-data inputs must be set before receiving log contents let (Some(backup_id), Some(_), Some(_)) = ( self.backup_id.as_ref(), self.user_id.as_ref(), self.log_hash.as_ref() ) else { return Ok(()); }; let log_id = generate_log_id(backup_id); tracing::Span::current().record("log_id", &log_id); self.log_id = Some(log_id); trace!("Everything prepared, waiting for data..."); self.should_receive_data = true; Ok(()) } + + /// Ensures log fits db size constraints. If not, it is moved to blob + /// persistence + async fn ensure_size_constraints(&mut self) -> Result<(), Status> { + let (Some(backup_id), Some(log_id), Some(log_hash)) = ( + self.backup_id.as_ref(), + self.log_id.as_ref(), + self.log_hash.as_ref() + ) else { + self.should_close_stream = true; + error!("Log info absent in data mode. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + + let log_size = LogItem::size_from_components( + backup_id, + log_id, + log_hash, + &self.log_buffer, + ); + if log_size > LOG_DATA_SIZE_DATABASE_LIMIT { + debug!("Log too large, switching persistence to Blob"); + let holder = + crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id)); + match crate::blob::start_simple_put_client(&holder, &log_hash).await? { + Some(mut put_client) => { + let blob_chunk = std::mem::take(&mut self.log_buffer); + put_client.put_data(blob_chunk).await.map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + self.blob_client = Some(put_client); + } + None => { + debug!("Log hash already exists"); + self.should_close_stream = true; + } + } + self.persistence_method = LogPersistence::BLOB { holder }; + } + Ok(()) + } } fn generate_log_id(backup_id: &str) -> String { format!( "{backup_id}{sep}{uuid}", backup_id = backup_id, sep = ID_SEPARATOR, uuid = Uuid::new_v4() ) }