diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs --- a/services/backup/blob_client/src/get_client.rs +++ b/services/backup/blob_client/src/get_client.rs @@ -61,7 +61,64 @@ pub fn get_client_initialize_cxx( holder_char: *const c_char, ) -> Result<(), String> { - unimplemented!(); + if is_initialized() { + get_client_terminate_cxx()?; + } + + assert!(!is_initialized(), "client cannot be initialized twice"); + + let holder_cstr: &CStr = unsafe { CStr::from_ptr(holder_char) }; + let holder: String = holder_cstr.to_str().unwrap().to_owned(); + + // grpc + if let Ok(mut grpc_client) = + RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await }) + { + // spawn receiver thread + let (response_thread_tx, response_thread_rx) = + mpsc::channel::>(MPSC_CHANNEL_BUFFER_CAPACITY); + let rx_handle = RUNTIME.spawn(async move { + if let Ok(response) = grpc_client.get(GetRequest { holder }).await { + let mut inner_response = response.into_inner(); + loop { + match inner_response.message().await { + Ok(maybe_data) => { + let mut result = false; + if let Some(data) = maybe_data { + let data: Vec = data.data_chunk; + result = match response_thread_tx.send(data).await { + Ok(_) => true, + Err(err) => { + report_error(err.to_string()); + false + } + } + } + if !result { + break; + } + } + Err(err) => { + report_error(err.to_string()); + break; + } + }; + } + } else { + report_error("couldn't perform grpc get operation".to_string()); + } + }); + + if let Ok(mut client) = CLIENT.lock() { + *client = Some(ReadClient { + rx_handle, + rx: response_thread_rx, + }); + return Ok(()); + } + return Err("could not access client".to_string()); + } + Err("could not successfully connect to the blob server".to_string()) } pub fn get_client_blocking_read_cxx() -> Result, String> {