diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,6 +1,9 @@ -use tokio_stream::Stream; +use async_stream::try_stream; +use tokio_stream::{Stream, StreamExt}; use tonic::Status; +use tracing::{debug, trace}; +use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::database::{BackupItem, DatabaseClient, LogItem}; @@ -14,7 +17,23 @@ db: &DatabaseClient, request: proto::PullBackupRequest, ) -> Result { - unimplemented!() + let proto::PullBackupRequest { user_id, backup_id } = request; + let backup_item = db + .find_backup_item(&user_id, &backup_id) + .await + .map_err(handle_db_error)? + .ok_or_else(|| { + debug!("Backup item not found"); + Status::not_found("Backup item not found") + })?; + + let backup_id = backup_item.backup_id.as_str(); + let logs = db + .find_log_items_for_backup(backup_id) + .await + .map_err(handle_db_error)?; + + Ok(PullBackupHandler { backup_item, logs }) } /// Consumes the handler and provides a response `Stream`. The stream will @@ -26,9 +45,60 @@ pub fn into_response_stream( self, ) -> impl Stream> { - // the unimplemented!() macro doesnt compile here - async_stream::stream! { - yield Err(Status::unimplemented("not implemented yet")) + use proto::pull_backup_response::*; + + try_stream! { + debug!("Pulling backup..."); + { + let compaction_stream = backup_compaction_stream(&self.backup_item); + tokio::pin!(compaction_stream); + while let Some(response) = compaction_stream.try_next().await? { + yield response; + } + } + trace!("Backup data pull complete."); + + if self.logs.is_empty() { + debug!("No logs to pull. Finishing"); + return; + } + + debug!("Pulling logs..."); + for log in self.logs { + trace!("Pulling log ID={}", &log.log_id); + + if log.persisted_in_blob { + trace!("Log persisted in blob"); + let log_data_stream = log_stream(&log); + tokio::pin!(log_data_stream); + while let Some(response) = log_data_stream.try_next().await? { + yield response; + } + } else { + yield proto::PullBackupResponse { + attachment_holders: Some(log.attachment_holders), + id: Some(Id::LogId(log.log_id)), + data: Some(Data::LogChunk(log.value.into_bytes())), + }; + } + } + trace!("Pulled all logs, done"); } } } + +fn backup_compaction_stream( + backup_item: &BackupItem, +) -> impl Stream> + '_ { + async_stream::stream! { + yield Err(Status::unimplemented("Not implemented yet")); + } +} + +fn log_stream( + log: &LogItem, +) -> impl Stream> + '_ { + async_stream::stream! { + yield Err(Status::unimplemented("Not implemented yet")); + } +}