diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs index 3084ca01b..103d6721e 100644 --- a/services/backup/src/service/handlers/create_backup.rs +++ b/services/backup/src/service/handlers/create_backup.rs @@ -1,96 +1,171 @@ use tonic::Status; +use tracing::{debug, error, trace, warn}; -use crate::{blob::PutClient, database::DatabaseClient, service::proto}; +use crate::{ + blob::{start_simple_put_client, PutClient}, + database::DatabaseClient, + service::proto, +}; type CreateBackupResult = Result; enum HandlerState { /// Initial state. Handler is receiving non-data inputs ReceivingParams, /// Handler is receiving data chunks ReceivingData { blob_client: PutClient }, /// A special case when Blob service claims that a blob with given /// [`CreateBackupHandler::data_hash`] already exists DataAlreadyExists, } pub struct CreateBackupHandler { // flow control pub should_close_stream: bool, // inputs user_id: Option, device_id: Option, key_entropy: Option>, data_hash: Option, // client instances db: DatabaseClient, // internal state state: HandlerState, backup_id: String, holder: Option, } impl CreateBackupHandler { pub fn new(db: &DatabaseClient) -> Self { CreateBackupHandler { should_close_stream: false, user_id: None, device_id: None, key_entropy: None, data_hash: None, db: db.clone(), state: HandlerState::ReceivingParams, backup_id: String::new(), holder: None, } } pub async fn handle_user_id( &mut self, user_id: String, ) -> CreateBackupResult { - unimplemented!() + if self.user_id.is_some() { + warn!("user ID already provided"); + return Err(Status::invalid_argument("User ID already provided")); + } + self.user_id = Some(user_id); + self.handle_internal().await } pub async fn handle_device_id( &mut self, device_id: String, ) -> CreateBackupResult { - unimplemented!() + if self.device_id.is_some() { + warn!("Device ID already provided"); + return Err(Status::invalid_argument("Device ID already provided")); + } + tracing::Span::current().record("device_id", &device_id); + self.device_id = Some(device_id); + self.handle_internal().await } pub async fn handle_key_entropy( &mut self, key_entropy: Vec, ) -> CreateBackupResult { - unimplemented!() + if self.key_entropy.is_some() { + warn!("Key entropy already provided"); + return Err(Status::invalid_argument("Key entropy already provided")); + } + self.key_entropy = Some(key_entropy); + self.handle_internal().await } pub async fn handle_data_hash( &mut self, data_hash: Vec, ) -> CreateBackupResult { - unimplemented!() + if self.data_hash.is_some() { + warn!("Data hash already provided"); + return Err(Status::invalid_argument("Data hash already provided")); + } + let hash_str = String::from_utf8(data_hash).map_err(|err| { + error!("Failed to convert data_hash into string: {:?}", err); + Status::aborted("Unexpected error") + })?; + tracing::Span::current().record("data_hash", &hash_str); + self.data_hash = Some(hash_str); + self.handle_internal().await } pub async fn handle_data_chunk( &mut self, data_chunk: Vec, ) -> CreateBackupResult { unimplemented!() } /// This function should be called after the input stream is finished. pub async fn finish(self) -> Result<(), Status> { unimplemented!() } + + // internal param handler helper + async fn handle_internal(&mut self) -> CreateBackupResult { + if !matches!(self.state, HandlerState::ReceivingParams) { + error!("CreateBackupHandler already received all non-data params."); + return Err(Status::failed_precondition("Backup data chunk expected")); + } + + // all non-data inputs must be set before receiving backup data chunks + let (Some(data_hash), Some(device_id), Some(_), Some(_)) = ( + self.data_hash.as_ref(), + self.device_id.as_ref(), + self.user_id.as_ref(), + self.key_entropy.as_ref(), + ) else { + // return empty backup ID when inputs are incomplete + return Ok(proto::CreateNewBackupResponse { + backup_id: "".to_string(), + }); + }; + + let backup_id = generate_backup_id(device_id); + let holder = + crate::utils::generate_blob_holder(data_hash, &backup_id, None); + self.backup_id = backup_id.clone(); + self.holder = Some(holder.clone()); + tracing::Span::current().record("backup_id", &backup_id); + tracing::Span::current().record("blob_holder", &holder); + + match start_simple_put_client(&holder, data_hash).await? { + Some(blob_client) => { + self.state = HandlerState::ReceivingData { blob_client }; + trace!("Everything prepared, waiting for data..."); + } + None => { + // Blob with given data_hash already exists + debug!("Blob already exists, finishing"); + self.should_close_stream = true; + self.state = HandlerState::DataAlreadyExists; + } + }; + + Ok(proto::CreateNewBackupResponse { backup_id }) + } } /// Generates ID for a new backup fn generate_backup_id(device_id: &str) -> String { format!( "{device_id}_{timestamp}", device_id = device_id, timestamp = chrono::Utc::now().timestamp_millis() ) }