Page MenuHomePhabricator

D6386.diff
No OneTemporary

D6386.diff

diff --git a/services/backup/src/blob/downloader.rs b/services/backup/src/blob/downloader.rs
--- a/services/backup/src/blob/downloader.rs
+++ b/services/backup/src/blob/downloader.rs
@@ -5,7 +5,7 @@
};
use tracing::{instrument, Instrument};
-use super::proto;
+use super::{proto, BlobClient};
use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
pub use proto::put_request::Data as PutRequestData;
@@ -28,13 +28,10 @@
/// Connects to the Blob service and keeps the client connection open
/// in a separate Tokio task.
#[instrument(name = "blob_downloader")]
- pub async fn start(holder: String) -> Result<Self> {
- let service_url = &crate::CONFIG.blob_service_url;
- let mut blob_client =
- proto::blob_service_client::BlobServiceClient::connect(
- service_url.to_string(),
- )
- .await?;
+ pub async fn start(
+ holder: String,
+ mut blob_client: BlobClient,
+ ) -> Result<Self> {
let (blob_res_tx, blob_res_rx) =
mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let client_thread = async move {
diff --git a/services/backup/src/blob/uploader.rs b/services/backup/src/blob/uploader.rs
--- a/services/backup/src/blob/uploader.rs
+++ b/services/backup/src/blob/uploader.rs
@@ -7,7 +7,7 @@
use tonic::Status;
use tracing::{error, instrument, Instrument};
-use super::proto;
+use super::{proto, BlobClient};
use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
pub use proto::put_request::Data as PutRequestData;
@@ -30,13 +30,7 @@
/// Connects to the Blob service and keeps the client connection open
/// in a separate Tokio task.
#[instrument(name = "blob_uploader")]
- pub async fn start() -> Result<Self> {
- let service_url = &crate::CONFIG.blob_service_url;
- let mut blob_client =
- proto::blob_service_client::BlobServiceClient::connect(
- service_url.to_string(),
- )
- .await?;
+ pub async fn start(mut blob_client: BlobClient) -> Result<Self> {
let (blob_req_tx, blob_req_rx) =
mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let (blob_res_tx, blob_res_rx) =
@@ -132,9 +126,10 @@
pub async fn start_simple_uploader(
holder: &str,
blob_hash: &str,
+ blob_client: BlobClient,
) -> Result<Option<BlobUploader>, Status> {
// start client
- let mut uploader = BlobUploader::start().await.map_err(|err| {
+ let mut uploader = BlobUploader::start(blob_client).await.map_err(|err| {
error!("Failed to instantiate uploader: {:?}", err);
Status::aborted("Internal error")
})?;
diff --git a/services/backup/src/service/handlers/add_attachments.rs b/services/backup/src/service/handlers/add_attachments.rs
--- a/services/backup/src/service/handlers/add_attachments.rs
+++ b/services/backup/src/service/handlers/add_attachments.rs
@@ -89,8 +89,12 @@
Some(&log_item.log_id),
);
- if let Some(mut uploader) =
- crate::blob::start_simple_uploader(&holder, &log_item.data_hash).await?
+ if let Some(mut uploader) = crate::blob::start_simple_uploader(
+ &holder,
+ &log_item.data_hash,
+ blob_client.clone(),
+ )
+ .await?
{
let blob_chunk = log_item.value.into_bytes();
uploader.put_data(blob_chunk).await.map_err(|err| {
diff --git a/services/backup/src/service/handlers/create_backup.rs b/services/backup/src/service/handlers/create_backup.rs
--- a/services/backup/src/service/handlers/create_backup.rs
+++ b/services/backup/src/service/handlers/create_backup.rs
@@ -205,7 +205,9 @@
tracing::Span::current().record("backup_id", &backup_id);
tracing::Span::current().record("blob_holder", &holder);
- match start_simple_uploader(&holder, data_hash).await? {
+ match start_simple_uploader(&holder, data_hash, self.blob_client.clone())
+ .await?
+ {
Some(uploader) => {
self.state = HandlerState::ReceivingData { uploader };
trace!("Everything prepared, waiting for data...");
diff --git a/services/backup/src/service/handlers/pull_backup.rs b/services/backup/src/service/handlers/pull_backup.rs
--- a/services/backup/src/service/handlers/pull_backup.rs
+++ b/services/backup/src/service/handlers/pull_backup.rs
@@ -116,7 +116,7 @@
try_stream! {
let mut buffer = ResponseBuffer::default();
let mut downloader =
- BlobDownloader::start(item.get_holder().to_string()).await.map_err(|err| {
+ BlobDownloader::start(item.get_holder().to_string(), blob_client).await.map_err(|err| {
error!(
"Failed to start blob downloader: {:?}", err
);
diff --git a/services/backup/src/service/handlers/send_log.rs b/services/backup/src/service/handlers/send_log.rs
--- a/services/backup/src/service/handlers/send_log.rs
+++ b/services/backup/src/service/handlers/send_log.rs
@@ -236,7 +236,13 @@
debug!("Log too large, switching persistence to Blob");
let holder =
crate::utils::generate_blob_holder(log_hash, backup_id, Some(log_id));
- match crate::blob::start_simple_uploader(&holder, &log_hash).await? {
+ match crate::blob::start_simple_uploader(
+ &holder,
+ &log_hash,
+ self.blob_client.clone(),
+ )
+ .await?
+ {
Some(mut uploader) => {
let blob_chunk = std::mem::take(&mut self.log_buffer);
uploader.put_data(blob_chunk).await.map_err(|err| {

File Metadata

Mime Type
text/plain
Expires
Wed, Nov 27, 5:19 PM (18 h, 47 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2591137
Default Alt Text
D6386.diff (5 KB)

Event Timeline