diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs new file mode 100644 --- /dev/null +++ b/services/backup/src/service/handlers/send_log.rs @@ -0,0 +1,80 @@ +use tonic::Status; +use uuid::Uuid; + +use crate::{ + blob::PutClient, constants::ID_SEPARATOR, database::DatabaseClient, + service::proto::SendLogResponse, +}; + +enum LogPersistence { + /// Log entirely stored in DynamoDB database + DB, + /// Log contents stored with Blob service + BLOB { holder: String }, +} + +pub struct SendLogHandler { + // flow control + pub should_close_stream: bool, + + // inputs + user_id: Option, + backup_id: Option, + log_hash: Option, + + // internal state + log_id: Option, + log_buffer: Vec, + persistence_method: LogPersistence, + should_receive_data: bool, + + // client instances + db: DatabaseClient, + blob_client: Option, +} + +impl SendLogHandler { + pub fn new(db: &DatabaseClient) -> Self { + SendLogHandler { + db: db.clone(), + blob_client: None, + user_id: None, + backup_id: None, + log_hash: None, + log_id: None, + log_buffer: Vec::new(), + persistence_method: LogPersistence::DB, + should_receive_data: false, + should_close_stream: false, + } + } + + pub async fn handle_user_id( + &mut self, + user_id: String, + ) -> Result<(), Status> { + unimplemented!() + } + pub async fn handle_backup_id( + &mut self, + backup_id: String, + ) -> Result<(), Status> { + unimplemented!() + } + pub async fn handle_log_hash( + &mut self, + log_hash: Vec, + ) -> Result<(), Status> { + unimplemented!() + } + pub async fn handle_log_data( + &mut self, + data_chunk: Vec, + ) -> Result<(), Status> { + unimplemented!() + } + + pub async fn finish(self) -> Result { + unimplemented!() + } +} diff --git a/services/backup/src/service/mod.rs b/services/backup/src/service/mod.rs --- a/services/backup/src/service/mod.rs +++ b/services/backup/src/service/mod.rs @@ -20,12 +20,14 @@ mod handlers { pub(super) mod add_attachments; pub(super) mod create_backup; + pub(super) mod send_log; // re-exports for convenient usage in handlers pub(self) use super::handle_db_error; pub(self) use super::proto; } use self::handlers::create_backup::CreateBackupHandler; +use self::handlers::send_log::SendLogHandler; pub struct MyBackupService { db: DatabaseClient, @@ -106,12 +108,50 @@ )) } - #[instrument(skip(self))] + #[instrument(skip_all, fields(backup_id, log_hash, log_id))] async fn send_log( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented("unimplemented")) + use proto::send_log_request::Data::*; + + info!("SendLog request: {:?}", request); + let mut handler = SendLogHandler::new(&self.db); + + let mut in_stream = request.into_inner(); + while let Some(message) = in_stream.next().await { + let result = match message { + Ok(proto::SendLogRequest { + data: Some(UserId(user_id)), + }) => handler.handle_user_id(user_id).await, + Ok(proto::SendLogRequest { + data: Some(BackupId(backup_id)), + }) => handler.handle_backup_id(backup_id).await, + Ok(proto::SendLogRequest { + data: Some(LogHash(log_hash)), + }) => handler.handle_log_hash(log_hash).await, + Ok(proto::SendLogRequest { + data: Some(LogData(chunk)), + }) => handler.handle_log_data(chunk).await, + unexpected => { + error!("Received an unexpected request: {:?}", unexpected); + Err(Status::unknown("unknown error")) + } + }; + + if let Err(err) = result { + error!("An error occurred when processing request: {:?}", err); + return Err(err); + } + if handler.should_close_stream { + trace!("Handler requested to close request stream"); + break; + } + } + + let response = handler.finish().await; + debug!("Finished. Sending response: {:?}", response); + response.map(|response_body| Response::new(response_body)) } type RecoverBackupKeyStream = Pin<