Page MenuHomePhabricator

D6168.id20840.diff
No OneTemporary

D6168.id20840.diff

diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/put_client.rs
--- a/services/backup/src/blob/put_client.rs
+++ b/services/backup/src/blob/put_client.rs
@@ -4,7 +4,8 @@
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
-use tracing::{instrument, Instrument};
+use tonic::Status;
+use tracing::{error, instrument, Instrument};
use super::proto;
use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
@@ -92,6 +93,20 @@
.ok_or_else(|| anyhow!("Blob client channel closed"))
}
+ /// Convenience wrapper for
+ /// ```
+ /// BlobClient::put(PutRequest {
+ /// data: Some(PutRequestData::DataChunk(data))
+ /// })
+ /// ```
+ pub async fn put_data(&mut self, data: Vec<u8>) -> Result<PutResponse> {
+ self
+ .put(PutRequest {
+ data: Some(PutRequestData::DataChunk(data)),
+ })
+ .await
+ }
+
/// Closes the connection and awaits the blob client task to finish.
pub async fn terminate(self) -> Result<()> {
drop(self.req_tx);
@@ -99,3 +114,62 @@
thread_result
}
}
+
+/// Starts a put client instance. Fulfills request with blob hash and holder.
+///
+/// `None` is returned if given `holder` already exists.
+///
+/// ## Example
+/// ```
+/// if let Some(mut client) =
+/// start_simple_put_client("my_holder", "my_hash").await? {
+/// let my_data = vec![1,2,3,4];
+/// let _ = client.put_data(my_data).await;
+///
+/// let status = client.terminate().await;
+/// }
+/// ```
+pub async fn start_simple_put_client(
+ holder: &str,
+ blob_hash: &str,
+) -> Result<Option<PutClient>, Status> {
+ // start client
+ let mut put_client = PutClient::start().await.map_err(|err| {
+ error!("Failed to instantiate blob client: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // send holder
+ put_client
+ .put(PutRequest {
+ data: Some(PutRequestData::Holder(holder.to_string())),
+ })
+ .await
+ .map_err(|err| {
+ error!("Failed to set blob holder: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // send hash
+ let PutResponse { data_exists } = put_client
+ .put(PutRequest {
+ data: Some(PutRequestData::BlobHash(blob_hash.to_string())),
+ })
+ .await
+ .map_err(|err| {
+ error!("Failed to set blob hash: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // Blob with given holder already exists, nothing to do
+ if data_exists {
+ // the connection is already terminated by server,
+ // but it's good to await it anyway
+ put_client.terminate().await.map_err(|err| {
+ error!("Put client task closed with error: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+ return Ok(None);
+ }
+ Ok(Some(put_client))
+}

File Metadata

Mime Type
text/plain
Expires
Sun, Nov 17, 3:18 PM (21 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2528630
Default Alt Text
D6168.id20840.diff (2 KB)

Event Timeline