diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs --- a/services/backup/src/service/handlers/pull_backup.rs +++ b/services/backup/src/service/handlers/pull_backup.rs @@ -1,11 +1,12 @@ use async_stream::try_stream; use tokio_stream::{Stream, StreamExt}; use tonic::Status; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; use super::handle_db_error; use super::proto::{self, PullBackupResponse}; use crate::{ + blob::GetClient, constants::{ BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID, GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE, @@ -57,7 +58,7 @@ try_stream! { debug!("Pulling backup..."); { - let compaction_stream = backup_compaction_stream(&self.backup_item); + let compaction_stream = data_stream(&self.backup_item); tokio::pin!(compaction_stream); while let Some(response) = compaction_stream.try_next().await? { yield response; @@ -76,7 +77,7 @@ if log.persisted_in_blob { trace!("Log persisted in blob"); - let log_data_stream = log_stream(&log); + let log_data_stream = data_stream(&log); tokio::pin!(log_data_stream); while let Some(response) = log_data_stream.try_next().await? { yield response; @@ -94,19 +95,52 @@ } } -fn backup_compaction_stream( - backup_item: &BackupItem, -) -> impl Stream> + '_ { - async_stream::stream! { - yield Err(Status::unimplemented("Not implemented yet")); - } -} +/// Downloads a blob-stored [`BlobStoredItem`] and streams its content into +/// stream of [`PullBackupResponse`] objects, handles gRPC message size details. +fn data_stream( + item: &Item, +) -> impl Stream> + '_ +where + Item: BlobStoredItem, +{ + try_stream! { + let mut buffer = ResponseBuffer::default(); + let mut client = + GetClient::start(item.get_holder().to_string()).await.map_err(|err| { + error!( + "Failed to start blob client: {:?}", err + ); + Status::aborted("Internal error") + })?; + + let mut is_first_chunk = true; + loop { + if !buffer.is_saturated() { + if let Some(data) = client.get().await { + buffer.put(data); + } + } + if buffer.is_empty() { + break; + } + + // get data chunk, shortened by length of metadata + let padding = item.metadata_size(is_first_chunk); + let chunk = buffer.get_chunk(padding); + + trace!( + with_attachments = is_first_chunk, + data_size = chunk.len(), + "Sending data chunk" + ); + yield item.to_response(chunk, is_first_chunk); + is_first_chunk = false; + } -fn log_stream( - log: &LogItem, -) -> impl Stream> + '_ { - async_stream::stream! { - yield Err(Status::unimplemented("Not implemented yet")); + client.terminate().await.map_err(|err| { + error!("Blob client failed: {:?}", err); + Status::aborted("Internal error") + })?; } }