diff --git a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs --- a/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs +++ b/native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs @@ -1,11 +1,11 @@ use blob::blob_service_client::BlobServiceClient; -use blob::{put_request::Data, PutRequest}; +use blob::{put_request::Data, GetRequest, GetResponse, PutRequest}; use lazy_static::lazy_static; use tokio::runtime::{Builder, Runtime}; use tokio::sync::mpsc; use tokio::task; use tokio_stream::wrappers::ReceiverStream; -use tonic::Request; +use tonic::{Request, Streaming}; pub mod blob { tonic::include_proto!("blob"); @@ -23,11 +23,23 @@ .unwrap(); } +#[cxx::bridge(namespace = "blob")] +mod ffi { + struct BlobChunkResponse { + stream_end: bool, + data: Vec, + } +} + pub struct UploadState { sender: mpsc::Sender, receiver_task: task::JoinHandle>, } +pub struct DownloadState { + response_stream: Streaming, +} + async fn initialize_upload_state() -> Result, String> { let mut client = BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) @@ -113,3 +125,44 @@ .map_err(|e| format!("Error occurred on consumer task. Details {}", e))??; Ok(result) } + +async fn initialize_download_state( + holder: String, +) -> Result, String> { + let mut client = + BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) + .await + .map_err(|e| format!("Can't connect to blob service. Details {}", e))?; + + let request = GetRequest { holder: holder }; + let response_stream = client + .get(Request::new(request)) + .await + .map_err(|e| format!("Can't initialize gRPC streaming. Details {}", e))? + .into_inner(); + + Ok(Box::new(DownloadState { + response_stream: response_stream, + })) +} + +async fn pull_chunk( + client: &mut Box, +) -> Result { + // Getting None from response_stream indicates that server closed + // stream, so we return false to inform C++ caller about this fact + if let Some(response) = + client.response_stream.message().await.map_err(|e| { + format!("Failed to pull response from stream. Details {}", e) + })? + { + return Ok(ffi::BlobChunkResponse { + stream_end: false, + data: response.data_chunk, + }); + } + Ok(ffi::BlobChunkResponse { + stream_end: true, + data: vec![], + }) +}