Page MenuHomePhabricator

D5009.diff
No OneTemporary

D5009.diff

diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -2,9 +2,16 @@
tonic::include_proto!("blob");
}
+use proto::blob_service_client::BlobServiceClient;
+use proto::put_request;
+use proto::put_request::Data::*;
+use proto::PutRequest;
+
+use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use lazy_static::lazy_static;
use libc;
use libc::c_char;
+use std::ffi::CStr;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
@@ -62,7 +69,110 @@
}
pub fn put_client_initialize_cxx() -> Result<(), String> {
- unimplemented!();
+ if is_initialized() {
+ put_client_terminate_cxx()?;
+ }
+ assert!(!is_initialized(), "client cannot be initialized twice");
+ // grpc
+ if let Ok(mut grpc_client) =
+ RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await })
+ {
+ let (request_thread_tx, mut request_thread_rx) =
+ mpsc::channel::<PutRequestData>(MPSC_CHANNEL_BUFFER_CAPACITY);
+
+ let outbound = async_stream::stream! {
+ while let Some(data) = request_thread_rx.recv().await {
+ let request_data: Option<put_request::Data> = match data.field_index {
+ 1 => {
+ match String::from_utf8(data.data) {
+ Ok(utf8_data) => Some(Holder(utf8_data)),
+ _ => {
+ report_error("invalid utf-8".to_string());
+ None
+ },
+ }
+ }
+ 2 => {
+ match String::from_utf8(data.data).ok() {
+ Some(utf8_data) => Some(BlobHash(utf8_data)),
+ None => {
+ report_error("invalid utf-8".to_string());
+ None
+ },
+ }
+ }
+ 3 => {
+ Some(DataChunk(data.data))
+ }
+ _ => {
+ report_error(format!("invalid field index value {}", data.field_index));
+ None
+ }
+ };
+ if let Some (unpacked_data) = request_data {
+ let request = PutRequest {
+ data: Some(unpacked_data),
+ };
+ yield request;
+ } else {
+ report_error("an error occured, aborting connection".to_string());
+ break;
+ }
+ }
+ };
+
+ // spawn receiver thread
+ let (response_thread_tx, response_thread_rx) =
+ mpsc::channel::<String>(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let rx_handle = RUNTIME.spawn(async move {
+ match grpc_client.put(tonic::Request::new(outbound)).await {
+ Ok(response) => {
+ let mut inner_response = response.into_inner();
+ loop {
+ match inner_response.message().await {
+ Ok(maybe_response_message) => {
+ let mut result = false;
+ if let Some(response_message) = maybe_response_message {
+ // warning: this will produce an error if there's more unread responses than
+ // MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order to dequeue
+ // the responses in c++ and make room for more
+ if let Ok(_) = response_thread_tx
+ .try_send((response_message.data_exists as i32).to_string())
+ {
+ result = true;
+ } else {
+ report_error("response queue full".to_string());
+ }
+ }
+ if !result {
+ break;
+ }
+ }
+ Err(err) => {
+ report_error(err.to_string());
+ break;
+ }
+ };
+ }
+ }
+ Err(err) => {
+ report_error(err.to_string());
+ }
+ };
+ });
+
+ if let Ok(mut client) = CLIENT.lock() {
+ *client = Some(BidiClient {
+ tx: request_thread_tx,
+ rx: response_thread_rx,
+ rx_handle,
+ });
+ return Ok(());
+ }
+ return Err("could not access client".to_string());
+ }
+ Err("could not successfully connect to the blob server".to_string())
}
pub fn put_client_blocking_read_cxx() -> Result<String, String> {

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 5:04 AM (21 h, 47 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583013
Default Alt Text
D5009.diff (4 KB)

Event Timeline