diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -56,6 +56,7 @@ mod login; mod registration; +mod update; #[derive(Debug)] enum PakeWorkflow { @@ -302,7 +303,21 @@ &self, request: Request>, ) -> Result, Status> { - unimplemented!(); + let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); + let db_client = self.client.clone(); + + tokio::spawn(async move { + update::handle_server_update_user_messages( + request.into_inner(), + db_client, + tx, + ) + .await; + }); + + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new(Box::pin(out_stream) as Self::UpdateUserStream)) } type UpdateUserStream = Pin< diff --git a/services/identity/src/service/update.rs b/services/identity/src/service/update.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/service/update.rs @@ -0,0 +1,296 @@ +use aws_sdk_dynamodb::output::GetItemOutput; +use comm_opaque::Cipher; +use opaque_ke::{ServerLogin, ServerRegistration}; +use rand::rngs::OsRng; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; +use tonic::Streaming; +use tracing::{debug, error, info}; + +use super::proto::{ + pake_login_response::Data::AccessToken, update_user_request, + update_user_response, update_user_response::Data::PakeLoginResponse, + update_user_response::Data::PakeRegistrationResponse, + PakeRegistrationRequestAndUserId, PakeRegistrationUploadAndCredentialRequest, +}; +use crate::service::PakeLoginResponseStruct; +use crate::token::AuthType; +use crate::{database::DatabaseClient, pake_grpc}; + +use super::{ + handle_db_error, pake_login_start, put_token_helper, Status, + UpdateUserRequest, UpdateUserResponse, +}; + +async fn send_to_client( + tx: &tokio::sync::mpsc::Sender>, + response: Result, +) -> Result<(), Status> { + let transport_result = match response { + Ok(message) => tx.send(Ok(message)).await, + Err(status) => { + error!("{}", status.message()); + tx.send(Err(status)).await + } + }; + + transport_result.map_err(|_| Status::internal("disconnection")) +} + +pub(crate) async fn handle_server_update_user_messages( + in_stream: Streaming, + client: DatabaseClient, + tx: tokio::sync::mpsc::Sender>, +) { + match attempt_update_user(in_stream, &client, &tx).await { + Ok(user_id) => info!("Successfully updated user {}", user_id), + // Attempt to send client the failure to receive immediate feedback + Err(e) => match send_to_client(&tx, Err(e)).await { + Ok(_) => debug!("Attempted to inform user of failed update"), + Err(_) => return, + }, + }; +} + +async fn attempt_update_user( + mut in_stream: Streaming, + client: &DatabaseClient, + tx: &tokio::sync::mpsc::Sender>, +) -> Result { + let first_message = in_stream.next().await; + + let (request, registration_state) = + handle_registration_request(first_message, client, tx).await?; + + let second_message = in_stream.next().await; + let registration_upload = get_registration_upload(second_message)?; + let server_login = handle_registration_upload_and_credential_request( + registration_upload, + tx, + &client, + &request, + registration_state, + ) + .await?; + + let third_message = in_stream.next().await; + let finalization_payload = get_finalization_message(third_message)?; + handle_credential_finalization( + finalization_payload, + tx, + client, + &request, + server_login, + ) + .await?; + + Ok(request.user_id) +} + +pub async fn handle_registration_request( + message: Option>, + client: &DatabaseClient, + tx: &mpsc::Sender>, +) -> Result< + (PakeRegistrationRequestAndUserId, ServerRegistration), + Status, +> { + let request = get_register_request(message)?; + user_exists(&request.user_id, &client).await?; + let server_registration_start_result = pake_grpc::server_registration_start( + &mut OsRng, + &request.pake_registration_request, + )?; + let server_start_payload = + server_registration_start_result.message.serialize(); + let server_start_response = UpdateUserResponse { + data: Some(PakeRegistrationResponse(server_start_payload)), + }; + send_to_client(&tx, Ok(server_start_response)).await?; + + Ok((request, server_registration_start_result.state)) +} + +fn get_register_request( + message: Option>, +) -> Result { + match message { + Some(Ok(UpdateUserRequest { + data: Some(update_user_request::Data::Request(request)), + })) => Ok(request), + e => { + error!( + "Expected to receive registration request, but instead received {:?}", + e + ); + Err(Status::invalid_argument("server error")) + } + } +} + +fn get_finalization_message( + message: Option>, +) -> Result, Status> { + match message { + Some(Ok(UpdateUserRequest { + data: + Some(update_user_request::Data::PakeLoginFinalizationMessage(request)), + })) => Ok(request), + e => { + error!( + "Expected to receive login finalization message, but instead received {:?}", + e); + Err(Status::aborted("server error")) + } + } +} + +async fn user_exists( + user_id: &str, + client: &DatabaseClient, +) -> Result { + match client + .get_item_from_users_table(&user_id) + .await + .map_err(handle_db_error) + { + Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), + Ok(GetItemOutput { item: None, .. }) => { + error!("Unable to find user: {}", user_id); + return Err(Status::not_found("user not found")); + } + Err(e) => Err(e), + } +} + +pub async fn handle_registration_upload_and_credential_request( + message: PakeRegistrationUploadAndCredentialRequest, + tx: &mpsc::Sender>, + client: &DatabaseClient, + request_and_user_info: &PakeRegistrationRequestAndUserId, + pake_state: ServerRegistration, +) -> Result, Status> { + pake_registration_finish( + &request_and_user_info.user_id, + client, + &message.pake_registration_upload, + pake_state, + ) + .await?; + let response_and_state = pake_login_start( + client, + &request_and_user_info.user_id, + &message.pake_credential_request, + ) + .await?; + + let registration_response = UpdateUserResponse { + data: Some(PakeLoginResponse(response_and_state.response)), + }; + + send_to_client(tx, Ok(registration_response)).await?; + Ok(response_and_state.pake_state) +} + +async fn pake_registration_finish( + user_id: &str, + client: &DatabaseClient, + registration_upload_bytes: &Vec, + server_registration: ServerRegistration, +) -> Result<(), Status> { + if user_id.is_empty() { + error!("Incomplete data: user ID not provided"); + return Err(Status::aborted("user not found")); + } + let server_registration_finish_result = + pake_grpc::server_registration_finish( + server_registration, + registration_upload_bytes, + )?; + + client + .update_users_table( + user_id.to_string(), + None, + Some(server_registration_finish_result), + None, + None, + ) + .await + .map_err(handle_db_error)?; + Ok(()) +} + +fn get_registration_upload( + message: Option>, +) -> Result< + crate::service::proto::PakeRegistrationUploadAndCredentialRequest, + Status, +> { + match message { + Some(Ok(UpdateUserRequest { + data: + Some( + update_user_request::Data::PakeRegistrationUploadAndCredentialRequest( + upload, + ), + ), + })) => Ok(upload), + e => { + error!( + "Expected to receive registration upload, but instead received {:?}", + e + ); + Err(Status::aborted("server error")) + } + } +} + +pub async fn handle_credential_finalization( + finalization_payload: Vec, + tx: &mpsc::Sender>, + client: &DatabaseClient, + request_and_user_info: &PakeRegistrationRequestAndUserId, + server_login: ServerLogin, +) -> Result<(), Status> { + let login_finish_result = pake_login_finish( + &request_and_user_info.user_id, + &request_and_user_info.signing_public_key, + client, + server_login, + &finalization_payload, + ) + .await?; + let response = UpdateUserResponse { + data: Some(update_user_response::Data::PakeLoginResponse( + login_finish_result, + )), + }; + send_to_client(tx, Ok(response)).await +} + +async fn pake_login_finish( + user_id: &str, + signing_public_key: &str, + client: &DatabaseClient, + server_login: ServerLogin, + pake_credential_finalization: &Vec, +) -> Result { + if user_id.is_empty() { + error!("Incomplete data: user ID {}", user_id); + return Err(Status::aborted("user not found")); + } + + pake_grpc::server_login_finish(server_login, pake_credential_finalization)?; + let access_token = put_token_helper( + client, + AuthType::Password, + user_id, + signing_public_key, + &mut OsRng, + ) + .await?; + Ok(PakeLoginResponseStruct { + data: Some(AccessToken(access_token)), + }) +}