Page MenuHomePhabricator

D6246.diff
No OneTemporary

D6246.diff

diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -1,11 +1,12 @@
use async_stream::try_stream;
use tokio_stream::{Stream, StreamExt};
use tonic::Status;
-use tracing::{debug, trace, warn};
+use tracing::{debug, error, trace, warn};
use super::handle_db_error;
use super::proto::{self, PullBackupResponse};
use crate::{
+ blob::GetClient,
constants::{
BACKUP_TABLE_FIELD_ATTACHMENT_HOLDERS, BACKUP_TABLE_FIELD_BACKUP_ID,
GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
@@ -57,7 +58,7 @@
try_stream! {
debug!("Pulling backup...");
{
- let compaction_stream = backup_compaction_stream(&self.backup_item);
+ let compaction_stream = data_stream(&self.backup_item);
tokio::pin!(compaction_stream);
while let Some(response) = compaction_stream.try_next().await? {
yield response;
@@ -76,7 +77,7 @@
if log.persisted_in_blob {
trace!("Log persisted in blob");
- let log_data_stream = log_stream(&log);
+ let log_data_stream = data_stream(&log);
tokio::pin!(log_data_stream);
while let Some(response) = log_data_stream.try_next().await? {
yield response;
@@ -94,19 +95,52 @@
}
}
-fn backup_compaction_stream(
- backup_item: &BackupItem,
-) -> impl Stream<Item = Result<proto::PullBackupResponse, Status>> + '_ {
- async_stream::stream! {
- yield Err(Status::unimplemented("Not implemented yet"));
- }
-}
+/// Downloads a blob-stored [`BlobStoredItem`] and streams its content into
+/// stream of [`PullBackupResponse`] objects, handles gRPC message size details.
+fn data_stream<Item>(
+ item: &Item,
+) -> impl Stream<Item = Result<PullBackupResponse, Status>> + '_
+where
+ Item: BlobStoredItem,
+{
+ try_stream! {
+ let mut buffer = ResponseBuffer::default();
+ let mut client =
+ GetClient::start(item.get_holder().to_string()).await.map_err(|err| {
+ error!(
+ "Failed to start blob client: {:?}", err
+ );
+ Status::aborted("Internal error")
+ })?;
+
+ let mut is_first_chunk = true;
+ loop {
+ if !buffer.is_saturated() {
+ if let Some(data) = client.get().await {
+ buffer.put(data);
+ }
+ }
+ if buffer.is_empty() {
+ break;
+ }
+
+ // get data chunk, shortened by length of metadata
+ let padding = item.metadata_size(is_first_chunk);
+ let chunk = buffer.get_chunk(padding);
+
+ trace!(
+ with_attachments = is_first_chunk,
+ data_size = chunk.len(),
+ "Sending data chunk"
+ );
+ yield item.to_response(chunk, is_first_chunk);
+ is_first_chunk = false;
+ }
-fn log_stream(
- log: &LogItem,
-) -> impl Stream<Item = Result<proto::PullBackupResponse, Status>> + '_ {
- async_stream::stream! {
- yield Err(Status::unimplemented("Not implemented yet"));
+ client.terminate().await.map_err(|err| {
+ error!("Blob client failed: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
}
}

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 7:48 PM (21 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2678653
Default Alt Text
D6246.diff (3 KB)

Event Timeline