diff --git a/native/native_rust_library/src/blob_client.rs b/native/native_rust_library/src/blob_client.rs --- a/native/native_rust_library/src/blob_client.rs +++ b/native/native_rust_library/src/blob_client.rs @@ -1,5 +1,5 @@ use blob::blob_service_client::BlobServiceClient; -use blob::PutRequest; +use blob::{put_request::Data, PutRequest}; use tokio::sync::mpsc; use tokio::task; use tokio_stream::wrappers::ReceiverStream; @@ -51,3 +51,52 @@ receiver_task, })) } + +pub async fn start_upload( + state: &mut Box, + holder: String, + hash: String, +) -> Result<(), String> { + let holder_request = PutRequest { + data: Some(Data::Holder(holder.clone())), + }; + let hash_request = PutRequest { + data: Some(Data::BlobHash(hash.clone())), + }; + state + .sender + .send(holder_request) + .await + .map_err(|e| format!("Failed to send request. Details {}", e))?; + state + .sender + .send(hash_request) + .await + .map_err(|e| format!("Failed to send request. Details {}", e))?; + Ok(()) +} + +pub async fn upload_chunk( + state: &mut Box, + chunk: &[u8], +) -> Result<(), String> { + let put_request_data_chunk = PutRequest { + data: Some(Data::DataChunk(chunk.to_vec())), + }; + state + .sender + .send(put_request_data_chunk) + .await + .map_err(|e| format!("Failed to send request. Details {}", e)) +} + +pub async fn complete_upload(state: Box) -> Result { + // We call drop on sender to close corresponding receiver + // and hence entire gRPC stream. Without this receiver_task + // would run infinitely + std::mem::drop(state.sender); + state + .receiver_task + .await + .map_err(|e| format!("Error occurred on consumer task. Details {}", e))? +}