diff --git a/native/cpp/CommonCpp/grpc/grpc_client/src/lib.rs b/native/cpp/CommonCpp/grpc/grpc_client/src/lib.rs --- a/native/cpp/CommonCpp/grpc/grpc_client/src/lib.rs +++ b/native/cpp/CommonCpp/grpc/grpc_client/src/lib.rs @@ -93,6 +93,133 @@ }) .await } + + #[instrument(skip(self))] + async fn register_user( + &mut self, + user_id: String, + device_id: String, + username: String, + password: String, + user_public_key: String, + ) -> Result { + // Create a RegistrationRequest channel and use ReceiverStream to turn the + // MPSC receiver into a Stream for outbound messages + let (tx, rx) = mpsc::channel(1); + let stream = ReceiverStream::new(rx); + let request = Request::new(stream); + + // `response` is the Stream for inbound messages + let mut response = self + .identity_client + .register_user(request) + .await? + .into_inner(); + + // Start PAKE registration on client and send initial registration request + // to Identity service + let mut client_rng = OsRng; + let (registration_request, mut client_registration) = + pake_registration_start( + &mut client_rng, + user_id, + &password, + device_id, + username, + user_public_key, + )?; + if let Err(e) = tx.send(registration_request).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("Dropped response")); + } + + let mut client_login: Option> = None; + let mut num_messages_received = 0; + + // Handle responses from Identity service sequentially, making sure we get + // messages in the correct order + while let Some(message) = response.message().await? { + match message.data { + Some(PakeRegistrationResponse(registration_response_bytes)) => { + if num_messages_received != 0 { + error!("Too many messages received in stream, aborting"); + return Err(Status::aborted("please retry")); + } + + // Finish PAKE registration and begin PAKE login; send the + // final registration request and initial login request together to + // reduce the number of trips + let registration_request = RegistrationRequest { + data: Some(PakeRegistrationUploadAndCredentialRequest( + PakeRegistrationUploadAndCredentialRequestStruct { + pake_registration_upload: pake_registration_finish( + &mut client_rng, + ®istration_response_bytes, + client_registration, + )? + .serialize(), + pake_credential_request: pake_login_start( + &mut client_rng, + &password, + ) + .map(|res| { + client_login = res.1; + res.0 + })? + .serialize() + .map_err(|e| { + error!("Could not serialize credential request: {}", e); + Status::failed_precondition("PAKE failure") + })?, + }, + )), + }; + if let Err(e) = tx.send(registration_request).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("Dropped response")); + } + client_registration = None; + } + Some(PakeLoginResponse(PakeLoginResponseStruct { + data: Some(PakeCredentialResponse(credential_response_bytes)), + })) => { + if num_messages_received != 1 { + error!("Wrong number of messages received in stream, aborting"); + return Err(Status::aborted("please retry")); + } + + // Finish PAKE login; send final login request to Identity service + let registration_request = RegistrationRequest { + data: Some(PakeCredentialFinalization( + pake_login_finish(&credential_response_bytes, client_login)? + .serialize() + .map_err(|e| { + error!("Could not serialize credential request: {}", e); + Status::failed_precondition("PAKE failure") + })?, + )), + }; + if let Err(e) = tx.send(registration_request).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("Dropped response")); + } + client_login = None; + } + Some(PakeLoginResponse(PakeLoginResponseStruct { + data: Some(AccessToken(access_token)), + })) => { + if num_messages_received != 2 { + error!("Wrong number of messages received in stream, aborting"); + return Err(Status::aborted("please retry")); + } + return Ok(access_token); + } + _ => return Err(Status::invalid_argument("Invalid response data")), + } + num_messages_received += 1; + } + Err(Status::unknown("Unexpected error")) + } } fn pake_registration_start(