Page MenuHomePhabricator

Define data structure for holding multimedia upload process state and method to initialize
AbandonedPublic

Authored by marcin on Sep 5 2022, 8:13 AM.
Tags
None
Referenced Files
F3396116: D5054.diff
Sun, Dec 1, 10:30 AM
Unknown Object (File)
Fri, Nov 29, 7:56 AM
Unknown Object (File)
Fri, Nov 29, 6:53 AM
Unknown Object (File)
Tue, Nov 26, 5:14 AM
Unknown Object (File)
Fri, Nov 22, 12:42 PM
Unknown Object (File)
Tue, Nov 19, 2:49 PM
Unknown Object (File)
Sat, Nov 9, 7:40 AM
Unknown Object (File)
Thu, Nov 7, 1:07 AM

Details

Summary

This differential defines rust data structure thet holds internal state of gRPC stream of media upload and a function to initialize it. The state involves write end of request stream and tokio task that constantly tries to read request, send to blob yield blob response.

Test Plan

Start blob service locally. Paste the following code at the end of lib.rs file:

#[tokio::test]
async fn init_test() {
  let upload_state = initialize_upload_state().await.unwrap();
}

Run cargo test from project root. Test should pass. Now kill blob service and run test again. Test should fail with message that it cannot connect to blob service.

Diff Detail

Repository
rCOMM Comm
Lint
No Lint Coverage
Unit
No Test Coverage

Event Timeline

marcin requested review of this revision.Sep 5 2022, 8:23 AM
tomek added a reviewer: karol.
tomek added a subscriber: karol.
tomek added inline comments.
native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
33 ↗(On Diff #16331)

Why do we need to use Box?

39 ↗(On Diff #16331)

Is it safe to use unbounded channel? I saw in other places that we're using a bounded one @karol

native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
33 ↗(On Diff #16331)

In this particular differential it is not needed, but this function is intended to be be exposed to C++ by cxx bridge in further diffs in this stack. UploadState is a Rust type that is opaque to C++. Returning opaque Rust type to C++ is not supported by cxx bridge. Wrapping in a box is a solution suggested by Rust compiler.

39 ↗(On Diff #16331)

The difference (https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.unbounded_channel.html) is that channel is a queue with fixed size and unbounded_channel is a queue that can "stretch" itself. Theoretically there is a threat that we will overflow memory by adding more and more messages the queue. On the other hand it is not probable since there is already a simultaneous tokio task that constantly tries to consume whatever is in the queue. Moreover using channel would require us to provide an estimate on the number of messages queue can safely hold, which is hard since messages can have different sizes. We can slice our multimedia file into lots of small chunks that together take less memory than a few large chunks. Therefore I do not think we can provide any reasonable number here. That said I do not think there is a reason to use channel here.
As a first iteration I would use unbounded_channel and if we happen to see out-of-memory crashes we can do some performance tests to see how much is too much and add a check that rejects message to be added to the queue if real size (max(message_size) * message_count) exceeds some treshold.

karol added inline comments.
native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
32–34 ↗(On Diff #16410)

What is the profit in using map_err?

39 ↗(On Diff #16331)

Going out of memory is one thing. Another thing is that with a bounded channel we have more control over it and we can spot problems quicker. For example, if there's a case when the channel grows and grows, we may not go out of mem but we may never know about this as well. I don't know, deferring to you, but a bounded channel feels safer.

This revision is now accepted and ready to land.Sep 8 2022, 4:26 AM
native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
32–34 ↗(On Diff #16410)

If this function throws error it is tonic::transport:Error. According to https://cxx.rs/binding/result.html any error thrown in Rust will be propagated to C++ as ordinary std::exception with e.what(), holding stringified details about rust error. If I didn't use map_error here I would need to declare this function as returning Result<Box<dyn Error>> or create my custom error structure. But as I said it is not very beneficial since at the end all we get in C++ is std::exception with string holding the details. Therefore I stringify in Rust.

native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
39 ↗(On Diff #16331)

Ok, I didn't think about loosing control when channel grows too large, I will use bounded channel here but need to think of reasonable estimate for the number of messages.

Use bounded channel and provide constant for channel size

tomek added inline comments.
native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
14 ↗(On Diff #16492)

Could you describe why this limit was chosen?

60–63 ↗(On Diff #16492)

We can use a shorthand

native/cpp/CommonCpp/grpc/blob_client/rust/src/lib.rs
14 ↗(On Diff #16492)

Maximal size for one put request is ~4MB, so setting limit to 10 means we allow to store 40MB od multimedia data in RAM ot once. Most devices have between 4GB - 6GB so it sounded reasonable for me to choose this limit. However I am about to update this differential and set limit here to 1 message. Limiting queue size to 1 message means we cannot send new message before previous one is consumed from queue and processed be blob service. I have several reasons:

  1. Blob service uses PutReactor which extends ServerBidiReactorBase, which implements 1:1 streaming (request-response-request-response ..). Therefore it is safe to wait to receive response from server before sending another request in the stream.
  2. By waiting to receive response before sending new request we are not overwhelming server with and adjusting to its current processing capabilities.
  3. In my experimentation I didn't see performance difference between limit 1 and limit 10.

Refactor after varun's changes

Diff from 2022 what I was assigned encrypted blob upload work.