diff --git a/native/native_rust_library/src/blob_client.rs b/native/native_rust_library/src/blob_client.rs --- a/native/native_rust_library/src/blob_client.rs +++ b/native/native_rust_library/src/blob_client.rs @@ -1,9 +1,10 @@ +use crate::ffi; use blob::blob_service_client::BlobServiceClient; -use blob::{put_request::Data, PutRequest}; +use blob::{put_request::Data, GetRequest, GetResponse, PutRequest}; use tokio::sync::mpsc; use tokio::task; use tokio_stream::wrappers::ReceiverStream; -use tonic::Request; +use tonic::{Request, Streaming}; pub mod blob { tonic::include_proto!("blob"); @@ -17,6 +18,10 @@ receiver_task: task::JoinHandle>, } +pub struct DownloadState { + response_stream: Streaming, +} + pub async fn initialize_upload_state() -> Result, String> { let mut client = BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) @@ -100,3 +105,42 @@ .await .map_err(|e| format!("Error occurred on consumer task. Details {}", e))? } + +pub async fn initialize_download_state( + holder: String, +) -> Result, String> { + let mut client = + BlobServiceClient::connect(BLOB_SERVICE_SOCKET_ADDR) + .await + .map_err(|e| format!("Can't connect to blob service. Details {}", e))?; + + let request = GetRequest { holder }; + let response_stream = client + .get(Request::new(request)) + .await + .map_err(|e| format!("Can't initialize gRPC streaming. Details {}", e))? + .into_inner(); + + Ok(Box::new(DownloadState { response_stream })) +} + +pub async fn pull_chunk( + client: &mut Box, +) -> Result { + // Getting None from response_stream indicates that server closed + // stream, so we return false to inform C++ caller about this fact + if let Some(response) = + client.response_stream.message().await.map_err(|e| { + format!("Failed to pull response from stream. Details {}", e) + })? + { + return Ok(ffi::BlobChunkResponse { + stream_end: false, + data: response.data_chunk, + }); + } + Ok(ffi::BlobChunkResponse { + stream_end: true, + data: vec![], + }) +} diff --git a/native/native_rust_library/src/lib.rs b/native/native_rust_library/src/lib.rs --- a/native/native_rust_library/src/lib.rs +++ b/native/native_rust_library/src/lib.rs @@ -29,6 +29,17 @@ #[cxx::bridge] mod ffi { + // data structures which fields need to be accessible in both + // Rust and C++ must be declared outside extern "C++/Rust" + + // Blob Service Client + struct BlobChunkResponse { + stream_end: bool, + data: Vec, + } + + // data structures declared here will be accessible in C++ + // but their fields will not be visible extern "Rust" { type Client; fn initialize_client() -> Box;