Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3529828
D4974.id16042.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Referenced Files
None
Subscribers
None
D4974.id16042.diff
View Options
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -45,18 +45,14 @@
}
fn is_initialized() -> bool {
- if CLIENT.lock().expect("access client").tx.is_none() {
- return false;
- }
- if CLIENT.lock().expect("access client").rx.is_none() {
- return false;
- }
- if CLIENT
- .lock()
- .expect("access client")
- .receiver_handle
- .is_none()
- {
+ if let Ok(client) = CLIENT.lock() {
+ if client.tx.is_none()
+ || client.rx.is_none()
+ || client.receiver_handle.is_none()
+ {
+ return false;
+ }
+ } else {
return false;
}
return true;
@@ -64,145 +60,174 @@
fn report_error(message: String) {
error!("[RUST] Error: {}", message);
- ERROR_MESSAGES
- .lock()
- .expect("access error messages")
- .push(message);
+ if let Ok(mut error_messages) = ERROR_MESSAGES.lock() {
+ error_messages.push(message);
+ }
+ error!("could not access error messages")
}
fn check_error() -> Result<(), String> {
- let errors = ERROR_MESSAGES.lock().expect("access error messages");
- let mut errors_str_value = None;
- if !errors.is_empty() {
- errors_str_value = Some(errors.join("\n"));
+ if let Ok(error_messages) = ERROR_MESSAGES.lock() {
+ if !error_messages.is_empty() {
+ return Err(error_messages.join("\n"));
+ }
+ return Ok(());
+ } else {
+ return Err("could not access error messages".to_string());
}
- return match errors_str_value {
- Some(value) => Err(value),
- None => Ok(()),
- };
}
pub fn put_client_initialize_cxx() -> Result<(), String> {
println!("[RUST] initializing");
assert!(!is_initialized(), "client cannot be initialized twice");
// grpc
- let mut grpc_client: Option<BlobServiceClient<tonic::transport::Channel>> =
- None;
+ let mut maybe_grpc_client: Option<
+ BlobServiceClient<tonic::transport::Channel>,
+ > = None;
RUNTIME.block_on(async {
- grpc_client = Some(
- BlobServiceClient::connect(BLOB_ADDRESS)
- .await
- .expect("successfully connect to the blob server"),
- );
+ maybe_grpc_client = BlobServiceClient::connect(BLOB_ADDRESS).await.ok();
});
+ if let Some(mut grpc_client) = maybe_grpc_client {
+ let (request_thread_tx, mut request_thread_rx): (
+ mpsc::Sender<PutRequestData>,
+ mpsc::Receiver<PutRequestData>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let (request_thread_tx, mut request_thread_rx): (
- mpsc::Sender<PutRequestData>,
- mpsc::Receiver<PutRequestData>,
- ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
-
- let outbound = async_stream::stream! {
- while let Some(data) = request_thread_rx.recv().await {
- println!("[RUST] [transmitter_thread] field index: {}", data.field_index);
- println!("[RUST] [transmitter_thread] data: {:?}", data.data);
- let request_data: put_request::Data = match data.field_index {
- 0 => Holder(String::from_utf8(data.data).expect("Found invalid UTF-8")),
- 1 => BlobHash(String::from_utf8(data.data).expect("Found invalid UTF-8")),
- 2 => DataChunk(data.data),
- _ => panic!("invalid field index value {}", data.field_index)
- };
- let request = PutRequest {
- data: Some(request_data),
- };
- yield request;
- }
- };
-
- // spawn receiver thread
- let (response_thread_tx, response_thread_rx): (
- mpsc::Sender<String>,
- mpsc::Receiver<String>,
- ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let receiver_handle = RUNTIME.spawn(async move {
- println!("[RUST] [receiver_thread] begin");
- let maybe_response: Option<
- tonic::Response<tonic::codec::Streaming<PutResponse>>,
- > = match grpc_client
- .expect("access grpc client")
- .put(tonic::Request::new(outbound))
- .await
- {
- Ok(res) => Some(res),
- Err(err) => {
- report_error(err.to_string());
- None
- }
- };
- if maybe_response.is_none() {
- return;
- }
- match maybe_response {
- Some(response) => {
- let mut inner_response = response.into_inner();
- let mut response_present = true;
- while response_present {
- response_present = match inner_response.message().await {
- Ok(maybe_response_message) => {
- let mut result = false;
- if let Some(response_message) = maybe_response_message {
- println!(
- "[RUST] got response: {}",
- response_message.data_exists
- );
- // warning: this will hang if there's more unread responses than
- // MPSC_CHANNEL_BUFFER_CAPACITY
- // you should then use put_client_blocking_read_cxx in order to dequeue
- // the responses in c++ and make room for more
- if let Ok(_) = response_thread_tx
- .send((response_message.data_exists as i32).to_string())
- .await
- {
- result = true;
- }
- }
- result
+ let outbound = async_stream::stream! {
+ while let Some(data) = request_thread_rx.recv().await {
+ println!("[RUST] [transmitter_thread] field index: {}", data.field_index);
+ println!("[RUST] [transmitter_thread] data: {:?}", data.data);
+ let request_data: Option<put_request::Data> = match data.field_index {
+ 0 => {
+ match String::from_utf8(data.data).ok() {
+ Some(utf8_data) => Some(Holder(utf8_data)),
+ None => {
+ report_error("invalid utf-8".to_string());
+ None
+ },
}
- Err(err) => {
- report_error(err.to_string());
- false
+ }
+ 1 => {
+ match String::from_utf8(data.data).ok() {
+ Some(utf8_data) => Some(BlobHash(utf8_data)),
+ None => {
+ report_error("invalid utf-8".to_string());
+ None
+ },
}
+ }
+ 2 => {
+ Some(DataChunk(data.data))
+ }
+ _ => {
+ report_error(format!("invalid field index value {}", data.field_index));
+ None
+ }
+ };
+ if let Some (unpacked_data) = request_data {
+ let request = PutRequest {
+ data: Some(unpacked_data),
};
+ yield request;
+ } else {
+ report_error("an error occured, aborting connection".to_string());
+ break;
}
}
- unexpected => {
- report_error(format!("unexpected result received: {:?}", unexpected));
- }
};
- println!("[RUST] [receiver_thread] done");
- });
- CLIENT.lock().expect("access client").tx = Some(request_thread_tx);
- CLIENT.lock().expect("access client").receiver_handle = Some(receiver_handle);
- CLIENT.lock().expect("access client").rx = Some(response_thread_rx);
- println!("[RUST] initialized");
- Ok(())
+ // spawn receiver thread
+ let (response_thread_tx, response_thread_rx): (
+ mpsc::Sender<String>,
+ mpsc::Receiver<String>,
+ ) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let receiver_handle = RUNTIME.spawn(async move {
+ println!("[RUST] [receiver_thread] begin");
+ let maybe_response: Option<
+ tonic::Response<tonic::codec::Streaming<PutResponse>>,
+ > = match grpc_client.put(tonic::Request::new(outbound)).await {
+ Ok(res) => Some(res),
+ Err(err) => {
+ report_error(err.to_string());
+ None
+ }
+ };
+ if maybe_response.is_none() {
+ return;
+ }
+ match maybe_response {
+ Some(response) => {
+ let mut inner_response = response.into_inner();
+ let mut response_present = true;
+ while response_present {
+ response_present = match inner_response.message().await {
+ Ok(maybe_response_message) => {
+ let mut result = false;
+ if let Some(response_message) = maybe_response_message {
+ println!(
+ "[RUST] got response: {}",
+ response_message.data_exists
+ );
+ // warning: this will hang if there's more unread responses than
+ // MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order to dequeue
+ // the responses in c++ and make room for more
+ if let Ok(_) = response_thread_tx
+ .send((response_message.data_exists as i32).to_string())
+ .await
+ {
+ result = true;
+ }
+ }
+ result
+ }
+ Err(err) => {
+ report_error(err.to_string());
+ false
+ }
+ };
+ }
+ }
+ unexpected => {
+ report_error(format!("unexpected result received: {:?}", unexpected));
+ }
+ };
+ println!("[RUST] [receiver_thread] done");
+ });
+
+ if let Ok(mut client) = CLIENT.lock() {
+ client.tx = Some(request_thread_tx);
+ client.receiver_handle = Some(receiver_handle);
+ client.rx = Some(response_thread_rx);
+ println!("[RUST] initialized");
+ return Ok(());
+ }
+ return Err("could not access client".to_string());
+ }
+ Err("could not successfully connect to the blob server".to_string())
}
pub fn put_client_blocking_read_cxx() -> Result<String, String> {
let mut response: Option<String> = None;
check_error()?;
RUNTIME.block_on(async {
- let mut rx: mpsc::Receiver<String> = CLIENT
- .lock()
- .expect("access client")
- .rx
- .take()
- .expect("access client's receiver");
- if let Some(data) = rx.recv().await {
- println!("received data {}", data);
- response = Some(data);
+ if let Ok(mut client) = CLIENT.lock() {
+ if let Some(mut rx) = client.rx.take() {
+ if let Some(data) = rx.recv().await {
+ println!("received data {}", data);
+ response = Some(data);
+ } else {
+ report_error(
+ "couldn't receive data via client's receiver".to_string(),
+ );
+ }
+ client.rx = Some(rx);
+ } else {
+ report_error("couldn't access client's receiver".to_string());
+ }
+ } else {
+ report_error("couldn't access client".to_string());
}
- CLIENT.lock().expect("access client").rx = Some(rx);
});
response.ok_or("response not received properly".to_string())
}
@@ -219,18 +244,24 @@
println!("[RUST] [put_client_process] data string: {:?}", data_bytes);
RUNTIME.block_on(async {
- CLIENT
- .lock()
- .expect("access client")
- .tx
- .as_ref()
- .expect("access client's transmitter")
- .send(PutRequestData {
- field_index,
- data: data_bytes,
- })
- .await
- .expect("send data to receiver");
+ if let Ok(mut client) = CLIENT.lock() {
+ if let Some(tx) = client.tx.take() {
+ if let Ok(_) = tx
+ .send(PutRequestData {
+ field_index,
+ data: data_bytes,
+ })
+ .await
+ {
+ } else {
+ report_error("send data to receiver failed".to_string());
+ }
+ } else {
+ report_error("couldn't access client's transmitter".to_string());
+ }
+ } else {
+ report_error("couldn't access client".to_string());
+ }
});
println!("[RUST] [put_client_process] end");
Ok(())
@@ -241,17 +272,20 @@
pub fn put_client_terminate_cxx() -> Result<(), String> {
println!("[RUST] put_client_terminating");
check_error()?;
- if let Some(receiver_handle) =
- CLIENT.lock().expect("access client").receiver_handle.take()
- {
- if let Some(tx) = CLIENT.lock().expect("access client").tx.take() {
- drop(tx);
- }
- RUNTIME.block_on(async {
- if receiver_handle.await.is_err() {
- report_error("wait for receiver handle failed".to_string());
+
+ if let Ok(mut client) = CLIENT.lock() {
+ if let Some(receiver_handle) = client.receiver_handle.take() {
+ if let Some(tx) = client.tx.take() {
+ drop(tx);
}
- });
+ RUNTIME.block_on(async {
+ if receiver_handle.await.is_err() {
+ report_error("wait for receiver handle failed".to_string());
+ }
+ });
+ }
+ } else {
+ report_error("couldn't access client".to_string());
}
assert!(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 25, 9:23 PM (4 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2703218
Default Alt Text
D4974.id16042.diff (12 KB)
Attached To
Mode
D4974: [services] Backup - Connect to Blob - Rust - Get rid of expect
Attached
Detach File
Event Timeline
Log In to Comment