Page MenuHomePhabricator

D4973.diff
No OneTemporary

D4973.diff

diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -10,13 +10,14 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use lazy_static::lazy_static;
+use libc;
+use libc::c_char;
+use std::ffi::CStr;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use libc;
-use libc::c_char;
-use std::ffi::CStr;
+use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -62,6 +63,7 @@
}
fn report_error(message: String) {
+ error!("[RUST] Error: {}", message);
ERROR_MESSAGES
.lock()
.expect("access error messages")
@@ -123,7 +125,7 @@
) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let receiver_handle = RUNTIME.spawn(async move {
println!("[RUST] [receiver_thread] begin");
- let response: Option<
+ let maybe_response: Option<
tonic::Response<tonic::codec::Streaming<PutResponse>>,
> = match grpc_client
.expect("access grpc client")
@@ -133,35 +135,49 @@
Ok(res) => Some(res),
Err(err) => {
report_error(err.to_string());
- println!("ERROR!! {}", err.to_string());
None
}
};
- if response.is_none() {
+ if maybe_response.is_none() {
return;
}
- let mut inbound = response.unwrap().into_inner();
- loop {
- let response: Option<PutResponse> = match inbound.message().await {
- Ok(res) => res,
- Err(err) => {
- report_error(err.to_string());
- println!("ERROR!! {}", err.to_string());
- None
+ match maybe_response {
+ Some(response) => {
+ let mut inner_response = response.into_inner();
+ let mut response_present = true;
+ while response_present {
+ response_present = match inner_response.message().await {
+ Ok(maybe_response_message) => {
+ let mut result = false;
+ if let Some(response_message) = maybe_response_message {
+ println!(
+ "[RUST] got response: {}",
+ response_message.data_exists
+ );
+ // warning: this will hang if there's more unread responses than
+ // MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order to dequeue
+ // the responses in c++ and make room for more
+ if let Ok(_) = response_thread_tx
+ .send((response_message.data_exists as i32).to_string())
+ .await
+ {
+ result = true;
+ }
+ }
+ result
+ }
+ Err(err) => {
+ report_error(err.to_string());
+ false
+ }
+ };
}
- };
- if response.is_none() {
- break;
}
- let response: PutResponse = response.unwrap();
- println!("[RUST] got response: {}", response.data_exists);
- // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY
- // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more
- response_thread_tx
- .send((response.data_exists as i32).to_string())
- .await
- .unwrap();
- }
+ unexpected => {
+ report_error(format!("unexpected result received: {:?}", unexpected));
+ }
+ };
println!("[RUST] [receiver_thread] done");
});
@@ -188,10 +204,7 @@
}
CLIENT.lock().expect("access client").rx = Some(rx);
});
- if response.is_none() {
- return Err("response not received properly".to_string());
- }
- Ok(response.unwrap())
+ response.ok_or("response not received properly".to_string())
}
pub fn put_client_write_cxx(
@@ -212,7 +225,10 @@
.tx
.as_ref()
.expect("access client's transmitter")
- .send(PutRequestData{field_index, data: data_bytes})
+ .send(PutRequestData {
+ field_index,
+ data: data_bytes,
+ })
.await
.expect("send data to receiver");
});
@@ -225,16 +241,18 @@
pub fn put_client_terminate_cxx() -> Result<(), String> {
println!("[RUST] put_client_terminating");
check_error()?;
- let receiver_handle = CLIENT
- .lock()
- .expect("access client")
- .receiver_handle
- .take()
- .unwrap();
- drop(CLIENT.lock().expect("access client").tx.take().unwrap());
- RUNTIME.block_on(async {
- receiver_handle.await.unwrap();
- });
+ if let Some(receiver_handle) =
+ CLIENT.lock().expect("access client").receiver_handle.take()
+ {
+ if let Some(tx) = CLIENT.lock().expect("access client").tx.take() {
+ drop(tx);
+ }
+ RUNTIME.block_on(async {
+ if receiver_handle.await.is_err() {
+ report_error("wait for receiver handle failed".to_string());
+ }
+ });
+ }
assert!(
!is_initialized(),

File Metadata

Mime Type
text/plain
Expires
Fri, Dec 27, 5:04 AM (9 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2710638
Default Alt Text
D4973.diff (5 KB)

Event Timeline