Page MenuHomePhabricator

D6387.diff
No OneTemporary

D6387.diff

diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs
--- a/services/backup/src/blob/downloader.rs
+++ b/services/backup/src/blob/downloader.rs
@@ -3,7 +3,7 @@
sync::mpsc::{self, Receiver},
task::JoinHandle,
};
-use tracing::{instrument, Instrument};
+use tracing::{error, instrument, Instrument};
use super::{proto, BlobClient};
use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
@@ -28,14 +28,17 @@
/// Connects to the Blob service and keeps the client connection open
/// in a separate Tokio task.
#[instrument(name = "blob_downloader")]
- pub async fn start(
- holder: String,
- mut blob_client: BlobClient,
- ) -> Result<Self> {
+ pub fn start(holder: String, mut blob_client: BlobClient) -> Self {
let (blob_res_tx, blob_res_rx) =
mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let client_thread = async move {
- let response = blob_client.get(proto::GetRequest { holder }).await?;
+ let response = blob_client
+ .get(proto::GetRequest { holder })
+ .await
+ .map_err(|err| {
+ error!("Get request failed: {:?}", err);
+ err
+ })?;
let mut inner_response = response.into_inner();
loop {
match inner_response.message().await? {
@@ -53,10 +56,10 @@
};
let handle = tokio::spawn(client_thread.in_current_span());
- Ok(BlobDownloader {
+ BlobDownloader {
rx: blob_res_rx,
handle,
- })
+ }
}
/// Receives the next chunk of blob data if ready or sleeps
diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs
--- a/services/backup/src/blob/uploader.rs
+++ b/services/backup/src/blob/uploader.rs
@@ -30,7 +30,7 @@
/// Connects to the Blob service and keeps the client connection open
/// in a separate Tokio task.
#[instrument(name = "blob_uploader")]
- pub async fn start(mut blob_client: BlobClient) -> Result<Self> {
+ pub fn start(mut blob_client: BlobClient) -> Self {
let (blob_req_tx, blob_req_rx) =
mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let (blob_res_tx, blob_res_rx) =
@@ -61,6 +61,7 @@
}
}
Err(err) => {
+ error!("Put request failed: {:?}", err);
bail!(err.to_string());
}
};
@@ -68,11 +69,11 @@
};
let handle = tokio::spawn(client_thread.in_current_span());
- Ok(BlobUploader {
+ BlobUploader {
req_tx: blob_req_tx,
res_rx: blob_res_rx,
handle,
- })
+ }
}
/// Sends a [`PutRequest`] to the stream and waits for blob service
@@ -128,11 +129,8 @@
blob_hash: &str,
blob_client: BlobClient,
) -> Result<Option<BlobUploader>, Status> {
- // start client
- let mut uploader = BlobUploader::start(blob_client).await.map_err(|err| {
- error!("Failed to instantiate uploader: {:?}", err);
- Status::aborted("Internal error")
- })?;
+ // start upload request
+ let mut uploader = BlobUploader::start(blob_client);
// send holder
uploader
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -116,12 +116,7 @@
try_stream! {
let mut buffer = ResponseBuffer::default();
let mut downloader =
- BlobDownloader::start(item.get_holder().to_string(), blob_client).await.map_err(|err| {
- error!(
- "Failed to start blob downloader: {:?}", err
- );
- Status::aborted("Internal error")
- })?;
+ BlobDownloader::start(item.get_holder().to_string(), blob_client);
let mut is_first_chunk = true;
loop {

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 9:27 PM (16 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2572489
Default Alt Text
D6387.diff (3 KB)

Event Timeline