Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3290486
D6168.id20840.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Referenced Files
None
Subscribers
None
D6168.id20840.diff
View Options
diff --git a/services/backup/src/blob/put_client.rs b/services/backup/src/blob/put_client.rs
--- a/services/backup/src/blob/put_client.rs
+++ b/services/backup/src/blob/put_client.rs
@@ -4,7 +4,8 @@
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
-use tracing::{instrument, Instrument};
+use tonic::Status;
+use tracing::{error, instrument, Instrument};
use super::proto;
use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY;
@@ -92,6 +93,20 @@
.ok_or_else(|| anyhow!("Blob client channel closed"))
}
+ /// Convenience wrapper for
+ /// ```
+ /// BlobClient::put(PutRequest {
+ /// data: Some(PutRequestData::DataChunk(data))
+ /// })
+ /// ```
+ pub async fn put_data(&mut self, data: Vec<u8>) -> Result<PutResponse> {
+ self
+ .put(PutRequest {
+ data: Some(PutRequestData::DataChunk(data)),
+ })
+ .await
+ }
+
/// Closes the connection and awaits the blob client task to finish.
pub async fn terminate(self) -> Result<()> {
drop(self.req_tx);
@@ -99,3 +114,62 @@
thread_result
}
}
+
+/// Starts a put client instance. Fulfills request with blob hash and holder.
+///
+/// `None` is returned if given `holder` already exists.
+///
+/// ## Example
+/// ```
+/// if let Some(mut client) =
+/// start_simple_put_client("my_holder", "my_hash").await? {
+/// let my_data = vec![1,2,3,4];
+/// let _ = client.put_data(my_data).await;
+///
+/// let status = client.terminate().await;
+/// }
+/// ```
+pub async fn start_simple_put_client(
+ holder: &str,
+ blob_hash: &str,
+) -> Result<Option<PutClient>, Status> {
+ // start client
+ let mut put_client = PutClient::start().await.map_err(|err| {
+ error!("Failed to instantiate blob client: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // send holder
+ put_client
+ .put(PutRequest {
+ data: Some(PutRequestData::Holder(holder.to_string())),
+ })
+ .await
+ .map_err(|err| {
+ error!("Failed to set blob holder: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // send hash
+ let PutResponse { data_exists } = put_client
+ .put(PutRequest {
+ data: Some(PutRequestData::BlobHash(blob_hash.to_string())),
+ })
+ .await
+ .map_err(|err| {
+ error!("Failed to set blob hash: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+
+ // Blob with given holder already exists, nothing to do
+ if data_exists {
+ // the connection is already terminated by server,
+ // but it's good to await it anyway
+ put_client.terminate().await.map_err(|err| {
+ error!("Put client task closed with error: {:?}", err);
+ Status::aborted("Internal error")
+ })?;
+ return Ok(None);
+ }
+ Ok(Some(put_client))
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Nov 17, 3:18 PM (21 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2528630
Default Alt Text
D6168.id20840.diff (2 KB)
Attached To
Mode
D6168: [services][backup] Add Put client convenience helpers
Attached
Detach File
Event Timeline
Log In to Comment