diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -2,12 +2,11 @@ use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::Arc; -use aws_sdk_dynamodb::error::GetItemError; use aws_sdk_dynamodb::model::AttributeValue; use aws_sdk_dynamodb::output::{ GetItemOutput, PutItemOutput, QueryOutput, UpdateItemOutput, }; -use aws_sdk_dynamodb::types::{Blob, SdkError}; +use aws_sdk_dynamodb::types::Blob; use aws_sdk_dynamodb::{Client, Error as DynamoDBError}; use aws_types::sdk_config::SdkConfig; use chrono::{DateTime, Utc}; @@ -65,7 +64,7 @@ "DynamoDB client failed to get registration data for user {}: {}", user_id, e ); - Err(Error::AwsSdk(e.into())) + Err(e) } } } @@ -391,15 +390,15 @@ "DynamoDB client failed to get user public key for user {}: {}", user_id, e ); - Err(Error::AwsSdk(e.into())) + Err(e) } } } - async fn get_item_from_users_table( + pub async fn get_item_from_users_table( &self, user_id: &str, - ) -> Result> { + ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), @@ -412,6 +411,7 @@ .consistent_read(true) .send() .await + .map_err(|e| Error::AwsSdk(e.into())) } } diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -1,3 +1,4 @@ +use aws_sdk_dynamodb::output::GetItemOutput; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::Utc; use comm_opaque::Cipher; @@ -79,132 +80,44 @@ ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let config = self.config.clone(); - let client = self.client.clone(); - tokio::spawn(async move { - let mut user_id: String = String::new(); - let mut device_id: String = String::new(); - let mut server_registration: Option> = None; - let mut server_login: Option> = None; - let mut username: String = String::new(); - let mut user_public_key: String = String::new(); - let mut num_messages_received = 0; - while let Some(message) = in_stream.next().await { - match message { - Ok(RegistrationRequest { - data: - Some(PakeRegistrationRequestAndUserId( - pake_registration_request_and_user_id, - )), - }) => { - let registration_start_result = pake_registration_start( - config.clone(), - &mut OsRng, - &pake_registration_request_and_user_id.pake_registration_request, - num_messages_received, - ) - .await - .map(|registration_response_and_server_registration| { - server_registration = - Some(registration_response_and_server_registration.1); - registration_response_and_server_registration.0 - }); - if let Err(e) = tx.send(registration_start_result).await { - error!("Response was dropped: {}", e); - break; - } - user_id = pake_registration_request_and_user_id.user_id; - device_id = pake_registration_request_and_user_id.device_id; - username = pake_registration_request_and_user_id.username; - user_public_key = - pake_registration_request_and_user_id.user_public_key; - } - Ok(RegistrationRequest { - data: - Some(PakeRegistrationUploadAndCredentialRequest( - pake_registration_upload_and_credential_request, - )), - }) => { - let registration_finish_and_login_start_result = - match pake_registration_finish( - &user_id, - &device_id, - client.clone(), - &pake_registration_upload_and_credential_request - .pake_registration_upload, - server_registration, - &username, - &user_public_key, - num_messages_received, - ) - .await - { - Ok(_) => pake_login_start( - config.clone(), - client.clone(), - &user_id.clone(), - &pake_registration_upload_and_credential_request - .pake_credential_request, - num_messages_received, - PakeWorkflow::Registration, - ) - .await - .map(|pake_login_response_and_server_login| { - server_login = Some(pake_login_response_and_server_login.1); - RegistrationResponse { - data: Some(PakeRegistrationLoginResponse( - pake_login_response_and_server_login.0, - )), - } - }), - Err(e) => Err(e), - }; - if let Err(e) = - tx.send(registration_finish_and_login_start_result).await - { - error!("Response was dropped: {}", e); - break; - } - server_registration = None; - } - Ok(RegistrationRequest { - data: - Some(PakeRegistrationCredentialFinalization( - pake_credential_finalization, - )), - }) => { - let login_finish_result = pake_login_finish( - &user_id, - &device_id, - &user_public_key, - client, - server_login, - &pake_credential_finalization, - &mut OsRng, - num_messages_received, - PakeWorkflow::Registration, - ) - .await - .map(|pake_login_response| RegistrationResponse { - data: Some(PakeRegistrationLoginResponse(pake_login_response)), - }); - if let Err(e) = tx.send(login_finish_result).await { - error!("Response was dropped: {}", e); - } - break; - } - unexpected => { - error!("Received an unexpected Result: {:?}", unexpected); - if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await - { - error!("Response was dropped: {}", e); - } - break; - } - } - num_messages_received += 1; - } - }); + let first_message = in_stream.next().await; + let mut registration_state = registration::handle_registration_request( + first_message, + self.client.clone(), + tx.clone(), + self.config.clone(), + ) + .await?; + // ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we + // have to take the value out of registration_state, replacing it with None + let pake_state = + if let Some(pake_state) = registration_state.pake_state.take() { + pake_state + } else { + error!("registration_state is missing opaque-ke ServerRegistration"); + return Err(Status::failed_precondition("internal error")); + }; + let second_message = in_stream.next().await; + let server_login = + registration::handle_registration_upload_and_credential_request( + second_message, + tx.clone(), + self.client.clone(), + ®istration_state, + pake_state, + self.config.clone(), + ) + .await?; + let third_message = in_stream.next().await; + registration::handle_credential_finalization( + third_message, + tx, + self.client.clone(), + ®istration_state, + server_login, + ) + .await?; + let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::RegisterUserStream @@ -223,103 +136,23 @@ let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let config = self.config.clone(); let client = self.client.clone(); - tokio::spawn(async move { - let mut user_id: String = String::new(); - let mut device_id: String = String::new(); - let mut server_login: Option> = None; - let mut user_public_key: String = String::new(); - let mut num_messages_received = 0; - while let Some(message) = in_stream.next().await { - match message { - Ok(LoginRequest { - data: Some(WalletLoginRequest(req)), - }) => { - let wallet_login_result = wallet_login_helper( - client, - req, - &mut OsRng, - num_messages_received, - ) - .await; - if let Err(e) = tx.send(wallet_login_result).await { - error!("Response was dropped: {}", e); - } - break; - } - Ok(LoginRequest { - data: - Some(PakeLoginRequest(PakeLoginRequestStruct { - data: - Some(PakeCredentialRequestAndUserId( - pake_credential_request_and_user_id, - )), - })), - }) => { - let login_start_result = pake_login_start( - config.clone(), - client.clone(), - &pake_credential_request_and_user_id.user_id, - &pake_credential_request_and_user_id.pake_credential_request, - num_messages_received, - PakeWorkflow::Login, - ) - .await - .map(|pake_login_response_and_server_login| { - server_login = Some(pake_login_response_and_server_login.1); - LoginResponse { - data: Some(PakeLoginResponse( - pake_login_response_and_server_login.0, - )), - } - }); - if let Err(e) = tx.send(login_start_result).await { - error!("Response was dropped: {}", e); - break; - } - user_id = pake_credential_request_and_user_id.user_id; - device_id = pake_credential_request_and_user_id.device_id; - user_public_key = - pake_credential_request_and_user_id.user_public_key; - } - Ok(LoginRequest { - data: - Some(PakeLoginRequest(PakeLoginRequestStruct { - data: - Some(PakeCredentialFinalization(pake_credential_finalization)), - })), - }) => { - let login_finish_result = pake_login_finish( - &user_id, - &device_id, - &user_public_key, - client, - server_login, - &pake_credential_finalization, - &mut OsRng, - num_messages_received, - PakeWorkflow::Login, - ) - .await - .map(|pake_login_response| LoginResponse { - data: Some(PakeLoginResponse(pake_login_response)), - }); - if let Err(e) = tx.send(login_finish_result).await { - error!("Response was dropped: {}", e); - } - break; - } - unexpected => { - error!("Received an unexpected Result: {:?}", unexpected); - if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await - { - error!("Response was dropped: {}", e); - } - break; - } - } - num_messages_received += 1; - } - }); + + let first_message = in_stream.next().await; + let login_state = login::handle_login_request( + first_message, + tx.clone(), + client.clone(), + config, + ) + .await?; + + // login_state will be None if user is logging in with a wallet + if let Some(state) = login_state { + let second_message = in_stream.next().await; + login::handle_credential_finalization(second_message, tx, client, state) + .await?; + } + let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) } @@ -474,12 +307,7 @@ client: DatabaseClient, wallet_login_request: WalletLoginRequestStruct, rng: &mut (impl Rng + CryptoRng), - num_messages_received: u8, ) -> Result { - if num_messages_received != 0 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } parse_and_verify_siwe_message( &wallet_login_request.user_id, &wallet_login_request.device_id, @@ -510,22 +338,17 @@ }) } +struct PakeLoginResponseStructAndServerLogin { + pake_login_response: PakeLoginResponseStruct, + server_login: ServerLogin, +} + async fn pake_login_start( config: Config, client: DatabaseClient, user_id: &str, pake_credential_request: &[u8], - num_messages_received: u8, - pake_workflow: PakeWorkflow, -) -> Result<(PakeLoginResponseStruct, ServerLogin), Status> { - if (num_messages_received != 0 - && matches!(pake_workflow, PakeWorkflow::Login)) - || (num_messages_received != 1 - && matches!(pake_workflow, PakeWorkflow::Registration)) - { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } +) -> Result { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); @@ -550,17 +373,19 @@ credential_request, ServerLoginStartParameters::default(), ) { - Ok(server_login_start_result) => Ok(( - PakeLoginResponseStruct { - data: Some(PakeCredentialResponse( - server_login_start_result.message.serialize().map_err(|e| { - error!("Failed to serialize PAKE message: {}", e); - Status::failed_precondition("internal error") - })?, - )), - }, - server_login_start_result.state, - )), + Ok(server_login_start_result) => { + Ok(PakeLoginResponseStructAndServerLogin { + pake_login_response: PakeLoginResponseStruct { + data: Some(PakeCredentialResponse( + server_login_start_result.message.serialize().map_err(|e| { + error!("Failed to serialize PAKE message: {}", e); + Status::failed_precondition("internal error") + })?, + )), + }, + server_login: server_login_start_result.state, + }) + } Err(e) => { error!( "Encountered a PAKE protocol error when starting login: {}", @@ -576,20 +401,11 @@ device_id: &str, user_public_key: &str, client: DatabaseClient, - server_login: Option>, + server_login: ServerLogin, pake_credential_finalization: &[u8], rng: &mut (impl Rng + CryptoRng), - num_messages_received: u8, pake_workflow: PakeWorkflow, ) -> Result { - if (num_messages_received != 1 - && matches!(pake_workflow, PakeWorkflow::Login)) - || (num_messages_received != 2 - && matches!(pake_workflow, PakeWorkflow::Registration)) - { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID {}, device ID {}", @@ -598,10 +414,6 @@ return Err(Status::aborted("user not found")); } server_login - .ok_or_else(|| { - error!("Server login missing in {:?} PAKE workflow", pake_workflow); - Status::aborted("login failed") - })? .finish( CredentialFinalization::deserialize(pake_credential_finalization) .map_err(|e| { @@ -640,25 +452,22 @@ config: Config, rng: &mut (impl Rng + CryptoRng), registration_request_bytes: &[u8], - num_messages_received: u8, -) -> Result<(RegistrationResponse, ServerRegistration), Status> { - if num_messages_received != 0 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } +) -> Result { match ServerRegistration::::start( rng, PakeRegistrationRequest::deserialize(registration_request_bytes).unwrap(), config.server_keypair.public(), ) { - Ok(server_registration_start_result) => Ok(( - RegistrationResponse { - data: Some(PakeRegistrationResponse( - server_registration_start_result.message.serialize(), - )), - }, - server_registration_start_result.state, - )), + Ok(server_registration_start_result) => { + Ok(RegistrationResponseAndPakeState { + response: RegistrationResponse { + data: Some(PakeRegistrationResponse( + server_registration_start_result.message.serialize(), + )), + }, + pake_state: server_registration_start_result.state, + }) + } Err(e) => { error!( "Encountered a PAKE protocol error when starting registration: {}", @@ -677,12 +486,7 @@ server_registration: Option>, username: &str, user_public_key: &str, - num_messages_received: u8, ) -> Result<(), Status> { - if num_messages_received != 1 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); @@ -706,12 +510,12 @@ })?; match client - .update_users_table( + .add_user_to_users_table( user_id.to_string(), device_id.to_string(), - Some(server_registration_finish_result), - Some(username.to_string()), - Some(user_public_key.to_string()), + server_registration_finish_result, + username.to_string(), + user_public_key.to_string(), ) .await { @@ -735,3 +539,272 @@ } } } + +mod registration { + use super::*; + pub struct RegistrationState { + user_id: String, + device_id: String, + username: String, + user_public_key: String, + pub pake_state: Option>, + } + + pub async fn handle_registration_request( + message: Option>, + client: DatabaseClient, + tx: mpsc::Sender>, + config: Config, + ) -> Result { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationRequestAndUserId( + pake_registration_request_and_user_id, + )), + })) => { + let get_item_output = client + .get_item_from_users_table( + &pake_registration_request_and_user_id.user_id, + ) + .await; + match get_item_output { + Ok(GetItemOutput { item: Some(_), .. }) => { + error!("User already exists"); + if let Err(e) = tx + .send(Err(Status::already_exists("User already exists"))) + .await + { + error!("Response was dropped: {}", e); + } + } + Err(e) => return Err(handle_db_error(e)), + _ => {} + }; + let response_and_state = pake_registration_start( + config.clone(), + &mut OsRng, + &pake_registration_request_and_user_id.pake_registration_request, + ) + .await?; + if let Err(e) = tx.send(Ok(response_and_state.response)).await { + error!("Response was dropped: {}", e); + } + + Ok(RegistrationState { + user_id: pake_registration_request_and_user_id.user_id, + device_id: pake_registration_request_and_user_id.device_id, + username: pake_registration_request_and_user_id.username, + user_public_key: pake_registration_request_and_user_id + .user_public_key, + pake_state: Some(response_and_state.pake_state), + }) + } + None | Some(_) => Err(Status::aborted("failure")), + } + } + + pub async fn handle_registration_upload_and_credential_request( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + registration_state: &RegistrationState, + pake_state: ServerRegistration, + config: Config, + ) -> Result, Status> { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationUploadAndCredentialRequest( + pake_registration_upload_and_credential_request, + )), + })) => { + let pake_login_response_and_server_login = + match pake_registration_finish( + ®istration_state.user_id, + ®istration_state.device_id, + client.clone(), + &pake_registration_upload_and_credential_request + .pake_registration_upload, + Some(pake_state), + ®istration_state.username, + ®istration_state.user_public_key, + ) + .await + { + Ok(_) => { + pake_login_start( + config.clone(), + client.clone(), + ®istration_state.user_id, + &pake_registration_upload_and_credential_request + .pake_credential_request, + ) + .await? + } + + Err(e) => { + return Err(e); + } + }; + let registration_response = RegistrationResponse { + data: Some(PakeRegistrationLoginResponse( + pake_login_response_and_server_login.pake_login_response, + )), + }; + if let Err(e) = tx.send(Ok(registration_response)).await { + error!("Response was dropped: {}", e); + Err(Status::aborted("failure")) + } else { + Ok(pake_login_response_and_server_login.server_login) + } + } + None | Some(_) => Err(Status::aborted("failure")), + } + } + + pub async fn handle_credential_finalization( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + registration_state: &RegistrationState, + server_login: ServerLogin, + ) -> Result<(), Status> { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationCredentialFinalization( + pake_credential_finalization, + )), + })) => { + let login_finish_result = pake_login_finish( + ®istration_state.user_id, + ®istration_state.device_id, + ®istration_state.user_public_key, + client, + server_login, + &pake_credential_finalization, + &mut OsRng, + PakeWorkflow::Registration, + ) + .await + .map(|pake_login_response| RegistrationResponse { + data: Some(PakeRegistrationLoginResponse(pake_login_response)), + }); + if let Err(e) = tx.send(login_finish_result).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + Ok(()) + } + Some(_) | None => Err(Status::aborted("failure")), + } + } +} + +mod login { + use super::*; + pub struct LoginState { + user_id: String, + device_id: String, + user_public_key: String, + pake_state: ServerLogin, + } + pub async fn handle_login_request( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + config: Config, + ) -> Result, Status> { + match message { + Some(Ok(LoginRequest { + data: Some(WalletLoginRequest(req)), + })) => { + let wallet_login_result = + wallet_login_helper(client, req, &mut OsRng).await; + if let Err(e) = tx.send(wallet_login_result).await { + error!("Response was dropped: {}", e); + Err(Status::aborted("failure")) + } else { + Ok(None) + } + } + Some(Ok(LoginRequest { + data: + Some(PakeLoginRequest(PakeLoginRequestStruct { + data: + Some(PakeCredentialRequestAndUserId( + pake_credential_request_and_user_id, + )), + })), + })) => { + let pake_login_response_struct_and_server_login = pake_login_start( + config.clone(), + client.clone(), + &pake_credential_request_and_user_id.user_id, + &pake_credential_request_and_user_id.pake_credential_request, + ) + .await?; + let login_response = LoginResponse { + data: Some(PakeLoginResponse( + pake_login_response_struct_and_server_login.pake_login_response, + )), + }; + if let Err(e) = tx.send(Ok(login_response)).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + + Ok(Some(LoginState { + user_id: pake_credential_request_and_user_id.user_id, + device_id: pake_credential_request_and_user_id.device_id, + user_public_key: pake_credential_request_and_user_id.user_public_key, + pake_state: pake_login_response_struct_and_server_login.server_login, + })) + } + Some(_) | None => Err(Status::aborted("failure")), + } + } + + pub async fn handle_credential_finalization( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + login_state: LoginState, + ) -> Result<(), Status> { + match message { + Some(Ok(LoginRequest { + data: + Some(PakeLoginRequest(PakeLoginRequestStruct { + data: Some(PakeCredentialFinalization(pake_credential_finalization)), + })), + })) => { + let login_finish_result = pake_login_finish( + &login_state.user_id, + &login_state.device_id, + &login_state.user_public_key, + client, + login_state.pake_state, + &pake_credential_finalization, + &mut OsRng, + PakeWorkflow::Login, + ) + .await + .map(|pake_login_response| LoginResponse { + data: Some(PakeLoginResponse(pake_login_response)), + }); + if let Err(e) = tx.send(login_finish_result).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + Ok(()) + } + Some(_) | None => Err(Status::aborted("failure")), + } + } +} + +struct RegistrationResponseAndPakeState { + response: RegistrationResponse, + pake_state: ServerRegistration, +}