diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -2,11 +2,13 @@ use tracing::{debug, error, trace, warn}; use crate::{ - blob::{start_simple_put_client, PutClient}, - database::DatabaseClient, + blob::{start_simple_put_client, PutClient, PutRequest, PutRequestData}, + database::{BackupItem, DatabaseClient}, service::proto, }; +use super::handle_db_error; + type CreateBackupResult = Result; pub struct CreateBackupHandler { @@ -101,12 +103,76 @@ &mut self, data_chunk: Vec, ) -> CreateBackupResult { - unimplemented!() + if !self.is_data_mode || self.backup_id.is_empty() { + self.should_close_stream = true; + error!("Data chunk sent before other inputs"); + return Err(Status::invalid_argument( + "Data chunk sent before other inputs", + )); + } + + // empty chunk ends transmission + if data_chunk.is_empty() { + self.should_close_stream = true; + return Ok(proto::CreateNewBackupResponse { + backup_id: self.backup_id.clone(), + }); + } + + if let Some(client) = self.blob_client.as_mut() { + client + .put(PutRequest { + data: Some(PutRequestData::DataChunk(data_chunk)), + }) + .await + .map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + } else { + self.should_close_stream = true; + error!("Put client uninitialized. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + } + + Ok(proto::CreateNewBackupResponse { + // actual Backup ID should be sent only once, the time it is generated + // see handle_internal() + backup_id: String::new(), + }) } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { - unimplemented!() + if let Some(client) = self.blob_client { + client.terminate().await.map_err(|err| { + error!("Put client task closed with error: {:?}", err); + Status::aborted("Internal error") + })?; + } else { + trace!("No blob client initialized. Skipping termination"); + } + + if !self.is_data_mode { + // client probably aborted early + trace!("Nothing to store in database. Finishing early"); + return Ok(()); + } + + let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { + error!("Holder / UserID absent in data mode. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + let backup_id = self.backup_id; + let backup_item = BackupItem::new(user_id, backup_id, holder); + + self + .db + .put_backup_item(backup_item) + .await + .map_err(handle_db_error)?; + + Ok(()) } // internal param handler helper