diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -1,6 +1,11 @@ use tonic::Status; +use tracing::{debug, error, trace, warn}; -use crate::{blob::PutClient, database::DatabaseClient, service::proto}; +use crate::{ + blob::{start_simple_put_client, PutClient}, + database::DatabaseClient, + service::proto, +}; type CreateBackupResult = Result; @@ -52,25 +57,51 @@ &mut self, user_id: String, ) -> CreateBackupResult { - unimplemented!() + if self.user_id.is_some() { + warn!("user ID already provided"); + return Err(Status::invalid_argument("User ID already provided")); + } + self.user_id = Some(user_id); + self.handle_internal().await } pub async fn handle_device_id( &mut self, device_id: String, ) -> CreateBackupResult { - unimplemented!() + if self.device_id.is_some() { + warn!("Device ID already provided"); + return Err(Status::invalid_argument("Device ID already provided")); + } + tracing::Span::current().record("device_id", &device_id); + self.device_id = Some(device_id); + self.handle_internal().await } pub async fn handle_key_entropy( &mut self, key_entropy: Vec, ) -> CreateBackupResult { - unimplemented!() + if self.key_entropy.is_some() { + warn!("Key entropy already provided"); + return Err(Status::invalid_argument("Key entropy already provided")); + } + self.key_entropy = Some(key_entropy); + self.handle_internal().await } pub async fn handle_data_hash( &mut self, data_hash: Vec, ) -> CreateBackupResult { - unimplemented!() + if self.data_hash.is_some() { + warn!("Data hash already provided"); + return Err(Status::invalid_argument("Data hash already provided")); + } + let hash_str = String::from_utf8(data_hash).map_err(|err| { + error!("Failed to convert data_hash into string: {:?}", err); + Status::aborted("Unexpected error") + })?; + tracing::Span::current().record("data_hash", &hash_str); + self.data_hash = Some(hash_str); + self.handle_internal().await } pub async fn handle_data_chunk( @@ -84,6 +115,50 @@ pub async fn finish(self) -> Result<(), Status> { unimplemented!() } + + // internal param handler helper + async fn handle_internal(&mut self) -> CreateBackupResult { + if !matches!(self.state, HandlerState::ReceivingParams) { + error!("CreateBackupHandler already received all non-data params."); + return Err(Status::failed_precondition("Backup data chunk expected")); + } + + // all non-data inputs must be set before receiving backup data chunks + let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( + self.data_hash.as_ref(), + self.device_id.as_ref(), + self.user_id.as_ref(), + self.key_entropy.as_ref(), + ) else { + // return empty backup ID when inputs are incomplete + return Ok(proto::CreateNewBackupResponse { + backup_id: "".to_string(), + }); + }; + + let backup_id = generate_backup_id(device_id); + let holder = + crate::utils::generate_blob_holder(data_hash, &backup_id, None); + self.backup_id = backup_id.clone(); + self.holder = Some(holder.clone()); + tracing::Span::current().record("backup_id", &backup_id); + tracing::Span::current().record("blob_holder", &holder); + + match start_simple_put_client(&holder, data_hash).await? { + Some(blob_client) => { + self.state = HandlerState::ReceivingData { blob_client }; + trace!("Everything prepared, waiting for data..."); + } + None => { + // Blob with given data_hash already exists + debug!("Blob already exists, finishing"); + self.should_close_stream = true; + self.state = HandlerState::DataAlreadyExists; + } + }; + + Ok(proto::CreateNewBackupResponse { backup_id }) + } } /// Generates ID for a new backup