diff --git a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs --- a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs +++ b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs @@ -1,11 +1,11 @@ use blob::blob_service_client::BlobServiceClient; -use blob::{put_request::Data, PutRequest}; +use blob::{put_request::Data, GetRequest, GetResponse, PutRequest}; use lazy_static::lazy_static; use tokio::runtime::{Builder, Runtime}; use tokio::sync::mpsc; use tokio::task; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::Request; +use tonic::{Request, Streaming}; pub mod blob { tonic::include_proto!("blob"); @@ -27,6 +27,10 @@ receiver_task: task::JoinHandle>, } +pub struct DownloadState { + response_stream: Streaming, +} + async fn initialize_upload_state() -> Result, String> { let mut client = BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) @@ -109,3 +113,44 @@ .map_err(|e| format!("Error occurred on consumer task. Details {}", e))??; Ok(result) } + +async fn initialize_download_state( + holder: String, +) -> Result, String> { + let mut client = + BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) + .await + .map_err(|e| format!("Can't connect to blob service. Details {}", e))?; + + let request = GetRequest { + holder: holder, + }; + let response_stream = client + .get(Request::new(request)) + .await + .map_err(|e| format!("Can't initialize gRPC streaming. Details {}", e))? + .into_inner(); + + Ok(Box::new(DownloadState { + response_stream: response_stream, + })) +} + +async fn pull_chunk( + client: &mut Box, + buffer: &mut Vec, +) -> Result { + // Getting None from response_stream indicates that server closed + // stream, so we return false to inform C++ caller about this fact + if let Some(response) = + client.response_stream.message().await.map_err(|e| { + format!("Failed to pull response from stream. Details {}", e) + })? + { + let chunk = String::from_utf8(response.data_chunk) + .map_err(|e| format!("Invalid bytes received. Details {}", e))?; + buffer.push(chunk); + return Ok(true); + } + Ok(false) +}