diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -3,10 +3,12 @@ use crate::{ blob::{start_simple_put_client, PutClient}, - database::DatabaseClient, + database::{BackupItem, DatabaseClient}, service::proto, }; +use super::handle_db_error; + type CreateBackupResult = Result; enum HandlerState { @@ -108,12 +110,69 @@ &mut self, data_chunk: Vec, ) -> CreateBackupResult { - unimplemented!() + let HandlerState::ReceivingData { ref mut blob_client } = self.state else { + self.should_close_stream = true; + error!("Data chunk sent before other inputs"); + return Err(Status::invalid_argument( + "Data chunk sent before other inputs", + )); + }; + + // empty chunk ends transmission + if data_chunk.is_empty() { + self.should_close_stream = true; + return Ok(proto::CreateNewBackupResponse { + backup_id: self.backup_id.clone(), + }); + } + + trace!("Received {} bytes of data", data_chunk.len()); + blob_client.put_data(data_chunk).await.map_err(|err| { + error!("Failed to upload data chunk: {:?}", err); + Status::aborted("Internal error") + })?; + + Ok(proto::CreateNewBackupResponse { + // actual Backup ID should be sent only once, the time it is generated + // see handle_internal() + backup_id: String::new(), + }) } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { - unimplemented!() + match self.state { + HandlerState::ReceivingParams => { + // client probably aborted early + trace!("Nothing to store in database. Finishing early"); + return Ok(()); + } + HandlerState::ReceivingData { blob_client } => { + blob_client.terminate().await.map_err(|err| { + error!("Put client task closed with error: {:?}", err); + Status::aborted("Internal error") + })?; + } + HandlerState::DataAlreadyExists => (), + } + + let (Some(user_id), Some(holder)) = (self.user_id, self.holder) else { + error!("Holder / UserID absent in data mode. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + }; + if self.backup_id.is_empty() { + error!("Backup ID was not generated. This should never happen!"); + return Err(Status::failed_precondition("Internal error")); + } + let backup_item = BackupItem::new(user_id, self.backup_id, holder); + + self + .db + .put_backup_item(backup_item) + .await + .map_err(handle_db_error)?; + + Ok(()) } // internal param handler helper