diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs new file mode 100644 --- /dev/null +++ b/services/backup/src/service/handlers/create_backup.rs @@ -0,0 +1,87 @@ +use tonic::Status; + +use crate::{blob::PutClient, database::DatabaseClient, service::proto}; + +type CreateBackupResult = Result; + +enum HandlerState { + /// Initial state. Handler is receiving non-data inputs + ReceivingParams, + /// Handler is receiving data chunks + ReceivingData { blob_client: PutClient }, + /// A special case when Blob service claims that a blob with given + /// [`CreateBackupHandler::data_hash`] already exists + DataAlreadyExists, +} + +pub struct CreateBackupHandler { + // flow control + pub should_close_stream: bool, + + // inputs + user_id: Option, + device_id: Option, + key_entropy: Option>, + data_hash: Option, + + // client instances + db: DatabaseClient, + + // internal state + state: HandlerState, + backup_id: String, + holder: Option, +} + +impl CreateBackupHandler { + pub fn new(db: &DatabaseClient) -> Self { + CreateBackupHandler { + should_close_stream: false, + user_id: None, + device_id: None, + key_entropy: None, + data_hash: None, + db: db.clone(), + state: HandlerState::ReceivingParams, + backup_id: String::new(), + holder: None, + } + } + + pub async fn handle_user_id( + &mut self, + user_id: String, + ) -> CreateBackupResult { + unimplemented!() + } + pub async fn handle_device_id( + &mut self, + device_id: String, + ) -> CreateBackupResult { + unimplemented!() + } + pub async fn handle_key_entropy( + &mut self, + key_entropy: Vec, + ) -> CreateBackupResult { + unimplemented!() + } + pub async fn handle_data_hash( + &mut self, + data_hash: Vec, + ) -> CreateBackupResult { + unimplemented!() + } + + pub async fn handle_data_chunk( + &mut self, + data_chunk: Vec, + ) -> CreateBackupResult { + unimplemented!() + } + + /// This function should be called after the input stream is finished. + pub async fn finish(self) -> Result<(), Status> { + unimplemented!() + } +} diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -1,16 +1,28 @@ use proto::backup_service_server::BackupService; use std::pin::Pin; -use tokio_stream::Stream; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; -use tracing::instrument; +use tracing::{debug, error, info, instrument, trace, Instrument}; -use crate::database::DatabaseClient; +use crate::{ + constants::MPSC_CHANNEL_BUFFER_CAPACITY, database::DatabaseClient, +}; mod proto { tonic::include_proto!("backup"); } pub use proto::backup_service_server::BackupServiceServer; +/// submodule containing gRPC endpoint handler implementations +mod handlers { + pub(super) mod create_backup; + + // re-exports for convenient usage in handlers + pub(self) use super::proto; +} +use self::handlers::create_backup::CreateBackupHandler; + pub struct MyBackupService { db: DatabaseClient, } @@ -30,12 +42,64 @@ >, >; - #[instrument(skip(self))] + #[instrument(skip_all, fields(device_id, data_hash, backup_id, blob_holder))] async fn create_new_backup( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("unimplemented")) + use proto::create_new_backup_request::Data::*; + + info!("CreateNewBackup request: {:?}", request); + let mut in_stream = request.into_inner(); + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let db = self.db.clone(); + let worker = async move { + let mut handler = CreateBackupHandler::new(&db); + while let Some(message) = in_stream.next().await { + let response = match message { + Ok(proto::CreateNewBackupRequest { + data: Some(UserId(user_id)), + }) => handler.handle_user_id(user_id).await, + Ok(proto::CreateNewBackupRequest { + data: Some(DeviceId(device_id)), + }) => handler.handle_device_id(device_id).await, + Ok(proto::CreateNewBackupRequest { + data: Some(KeyEntropy(key_entropy)), + }) => handler.handle_key_entropy(key_entropy).await, + Ok(proto::CreateNewBackupRequest { + data: Some(NewCompactionHash(hash)), + }) => handler.handle_data_hash(hash).await, + Ok(proto::CreateNewBackupRequest { + data: Some(NewCompactionChunk(chunk)), + }) => handler.handle_data_chunk(chunk).await, + unexpected => { + error!("Received an unexpected request: {:?}", unexpected); + Err(Status::unknown("unknown error")) + } + }; + + trace!("Sending response: {:?}", response); + if let Err(e) = tx.send(response).await { + error!("Response was dropped: {}", e); + break; + } + if handler.should_close_stream { + trace!("Handler requested to close stream"); + break; + } + } + if let Err(status) = handler.finish().await { + trace!("Sending error response: {:?}", status); + let _ = tx.send(Err(status)).await; + } + debug!("Request finished processing"); + }; + tokio::spawn(worker.in_current_span()); + + let out_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(out_stream) as Self::CreateNewBackupStream + )) } #[instrument(skip(self))]