Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3540451
D4973.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Referenced Files
None
Subscribers
None
D4973.diff
View Options
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -10,13 +10,14 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use lazy_static::lazy_static;
+use libc;
+use libc::c_char;
+use std::ffi::CStr;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use libc;
-use libc::c_char;
-use std::ffi::CStr;
+use tracing::error;
#[derive(Debug)]
struct PutRequestData {
@@ -62,6 +63,7 @@
}
fn report_error(message: String) {
+ error!("[RUST] Error: {}", message);
ERROR_MESSAGES
.lock()
.expect("access error messages")
@@ -123,7 +125,7 @@
) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
let receiver_handle = RUNTIME.spawn(async move {
println!("[RUST] [receiver_thread] begin");
- let response: Option<
+ let maybe_response: Option<
tonic::Response<tonic::codec::Streaming<PutResponse>>,
> = match grpc_client
.expect("access grpc client")
@@ -133,35 +135,49 @@
Ok(res) => Some(res),
Err(err) => {
report_error(err.to_string());
- println!("ERROR!! {}", err.to_string());
None
}
};
- if response.is_none() {
+ if maybe_response.is_none() {
return;
}
- let mut inbound = response.unwrap().into_inner();
- loop {
- let response: Option<PutResponse> = match inbound.message().await {
- Ok(res) => res,
- Err(err) => {
- report_error(err.to_string());
- println!("ERROR!! {}", err.to_string());
- None
+ match maybe_response {
+ Some(response) => {
+ let mut inner_response = response.into_inner();
+ let mut response_present = true;
+ while response_present {
+ response_present = match inner_response.message().await {
+ Ok(maybe_response_message) => {
+ let mut result = false;
+ if let Some(response_message) = maybe_response_message {
+ println!(
+ "[RUST] got response: {}",
+ response_message.data_exists
+ );
+ // warning: this will hang if there's more unread responses than
+ // MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order to dequeue
+ // the responses in c++ and make room for more
+ if let Ok(_) = response_thread_tx
+ .send((response_message.data_exists as i32).to_string())
+ .await
+ {
+ result = true;
+ }
+ }
+ result
+ }
+ Err(err) => {
+ report_error(err.to_string());
+ false
+ }
+ };
}
- };
- if response.is_none() {
- break;
}
- let response: PutResponse = response.unwrap();
- println!("[RUST] got response: {}", response.data_exists);
- // warning: this will hang if there's more unread responses than MPSC_CHANNEL_BUFFER_CAPACITY
- // you should then use put_client_blocking_read_cxx in order to dequeue the responses in c++ and make room for more
- response_thread_tx
- .send((response.data_exists as i32).to_string())
- .await
- .unwrap();
- }
+ unexpected => {
+ report_error(format!("unexpected result received: {:?}", unexpected));
+ }
+ };
println!("[RUST] [receiver_thread] done");
});
@@ -188,10 +204,7 @@
}
CLIENT.lock().expect("access client").rx = Some(rx);
});
- if response.is_none() {
- return Err("response not received properly".to_string());
- }
- Ok(response.unwrap())
+ response.ok_or("response not received properly".to_string())
}
pub fn put_client_write_cxx(
@@ -212,7 +225,10 @@
.tx
.as_ref()
.expect("access client's transmitter")
- .send(PutRequestData{field_index, data: data_bytes})
+ .send(PutRequestData {
+ field_index,
+ data: data_bytes,
+ })
.await
.expect("send data to receiver");
});
@@ -225,16 +241,18 @@
pub fn put_client_terminate_cxx() -> Result<(), String> {
println!("[RUST] put_client_terminating");
check_error()?;
- let receiver_handle = CLIENT
- .lock()
- .expect("access client")
- .receiver_handle
- .take()
- .unwrap();
- drop(CLIENT.lock().expect("access client").tx.take().unwrap());
- RUNTIME.block_on(async {
- receiver_handle.await.unwrap();
- });
+ if let Some(receiver_handle) =
+ CLIENT.lock().expect("access client").receiver_handle.take()
+ {
+ if let Some(tx) = CLIENT.lock().expect("access client").tx.take() {
+ drop(tx);
+ }
+ RUNTIME.block_on(async {
+ if receiver_handle.await.is_err() {
+ report_error("wait for receiver handle failed".to_string());
+ }
+ });
+ }
assert!(
!is_initialized(),
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Dec 27, 5:04 AM (9 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2710638
Default Alt Text
D4973.diff (5 KB)
Attached To
Mode
D4973: [services] Backup - Connect to Blob - Rust - Get rid of unwrap
Attached
Detach File
Event Timeline
Log In to Comment