diff --git a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs --- a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs +++ b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs @@ -1,5 +1,5 @@ use blob::blob_service_client::BlobServiceClient; -use blob::PutRequest; +use blob::{put_request::Data, PutRequest}; use lazy_static::lazy_static; use tokio::runtime::{Builder, Runtime}; use tokio::sync::mpsc; @@ -62,3 +62,54 @@ receiver_task: receiver_task, })) } + +async fn start_upload( + state: &mut Box, + holder: String, + hash: String, +) -> Result<(), String> { + let holder_request = PutRequest { + data: Some(Data::Holder(holder.clone())), + }; + let hash_request = PutRequest { + data: Some(Data::BlobHash(hash.clone())), + }; + state + .sender + .send(holder_request) + .await + .map_err(|e| format!("Failed to send request. Details {}", e))?; + state + .sender + .send(hash_request) + .await + .map_err(|e| format!("Failed to send request. Details {}", e))?; + Ok(()) +} + +async fn upload_chunk( + state: &mut Box, + chunk: &[u8], +) -> Result<(), String> { + let put_request_data_chunk = PutRequest { + data: Some(Data::DataChunk(chunk.to_vec())), + }; + state + .sender + .send(put_request_data_chunk) + .await + .map_err(|e| format!("Failed to send request. Details {}", e))?; + Ok(()) +} + +async fn complete_upload(state: Box) -> Result { + // We call drop on sender to close corresponding receiver + // and hence entire gRPC stream. Without this receiver_task + // would run infinitely + std::mem::drop(state.sender); + let result = state + .receiver_task + .await + .map_err(|e| format!("Error occurred on consumer task. Details {}", e))??; + Ok(result) +}