Page MenuHomePhabricator

D6166.diff
No OneTemporary

D6166.diff

diff --git a/services/backup/src/blob/get_client.rs b/services/backup/src/blob/get_client.rs
--- a/services/backup/src/blob/get_client.rs
+++ b/services/backup/src/blob/get_client.rs
@@ -1,11 +1,24 @@
-use anyhow::Result;
-use tokio::{sync::mpsc::Receiver, task::JoinHandle};
-use tracing::instrument;
+use anyhow::{bail, Result};
+use tokio::{
+ sync::mpsc::{self, Receiver},
+ task::JoinHandle,
+};
+use tracing::{instrument, Instrument};
use super::proto;
+use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
+
pub use proto::put_request::Data as PutRequestData;
pub use proto::{PutRequest, PutResponse};
+/// The GetClient instance is a handle holder of a Tokio task running the
+/// actual blob client instance. The communication is done via a MPSC channel
+/// and is one-sided - the data is transmitted from the client task to the
+/// caller. Blob chunks received in response stream are waiting
+/// for the channel to have capacity, so it is recommended to read them quickly
+/// to make room for more.
+/// The client task can be stopped and awaited for result via the `terminate()`
+/// method.
pub struct GetClient {
rx: Receiver<Vec<u8>>,
handle: JoinHandle<anyhow::Result<()>>,
@@ -16,7 +29,37 @@
/// in a separate Tokio task.
#[instrument(name = "get_client")]
pub async fn start(holder: String) -> Result<Self> {
- todo!()
+ let service_url = &crate::CONFIG.blob_service_url;
+ let mut blob_client =
+ proto::blob_service_client::BlobServiceClient::connect(
+ service_url.to_string(),
+ )
+ .await?;
+ let (blob_res_tx, blob_res_rx) =
+ mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let client_thread = async move {
+ let response = blob_client.get(proto::GetRequest { holder }).await?;
+ let mut inner_response = response.into_inner();
+ loop {
+ match inner_response.message().await? {
+ Some(data) => {
+ let data: Vec<u8> = data.data_chunk;
+ if let Err(err) = blob_res_tx.send(data).await {
+ bail!(err);
+ }
+ }
+ // Response stream was closed
+ None => break,
+ }
+ }
+ Ok(())
+ };
+ let handle = tokio::spawn(client_thread.in_current_span());
+
+ Ok(GetClient {
+ rx: blob_res_rx,
+ handle,
+ })
}
/// Receives the next chunk of blob data if ready or sleeps
@@ -27,12 +70,13 @@
/// should be consumed by calling [`GetClient::terminate`] to handle
/// possible errors.
pub async fn get(&mut self) -> Option<Vec<u8>> {
- todo!()
+ self.rx.recv().await
}
/// Stops receiving messages and awaits the client thread to exit
/// and returns its status.
pub async fn terminate(mut self) -> Result<()> {
- todo!()
+ self.rx.close();
+ self.handle.await?
}
}
diff --git a/services/backup/src/constants.rs b/services/backup/src/constants.rs
--- a/services/backup/src/constants.rs
+++ b/services/backup/src/constants.rs
@@ -1,3 +1,7 @@
+// Assorted constants
+
+pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
+
// Configuration defaults
pub const DEFAULT_GRPC_SERVER_PORT: u64 = 50051;

File Metadata

Mime Type
text/plain
Expires
Fri, Dec 20, 7:23 PM (17 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2679111
Default Alt Text
D6166.diff (3 KB)

Event Timeline