diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index 833e4f050..99bac8a2a 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,667 +1,667 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::Arc; -use aws_sdk_dynamodb::error::GetItemError; use aws_sdk_dynamodb::model::AttributeValue; use aws_sdk_dynamodb::output::{ GetItemOutput, PutItemOutput, QueryOutput, UpdateItemOutput, }; -use aws_sdk_dynamodb::types::{Blob, SdkError}; +use aws_sdk_dynamodb::types::Blob; use aws_sdk_dynamodb::{Client, Error as DynamoDBError}; use aws_types::sdk_config::SdkConfig; use chrono::{DateTime, Utc}; use opaque_ke::{errors::ProtocolError, ServerRegistration}; use tracing::{error, info, warn}; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, USERS_TABLE_DEVICE_ATTRIBUTE, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USER_PUBLIC_KEY_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::token::{AccessTokenData, AuthType}; use comm_opaque::Cipher; #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &SdkConfig) -> Self { DatabaseClient { client: Arc::new(Client::new(aws_config)), } } pub async fn get_pake_registration( &self, user_id: String, ) -> Result>, Error> { match self.get_item_from_users_table(&user_id).await { Ok(GetItemOutput { item: Some(mut item), .. }) => parse_registration_data_attribute( item.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), ) .map(Some) .map_err(Error::Attribute), Ok(_) => { info!( "No item found for user {} in PAKE registration table", user_id ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", user_id, e ); - Err(Error::AwsSdk(e.into())) + Err(e) } } } pub async fn update_users_table( &self, user_id: String, device_id: String, registration: Option>, username: Option, user_public_key: Option, ) -> Result { let mut update_expression_parts = Vec::new(); let mut expression_attribute_names = HashMap::new(); let mut expression_attribute_values = HashMap::new(); if let Some(reg) = registration { update_expression_parts .push(format!("{} = :r", USERS_TABLE_REGISTRATION_ATTRIBUTE)); expression_attribute_values.insert( ":r".to_string(), AttributeValue::B(Blob::new(reg.serialize())), ); }; if let Some(username) = username { update_expression_parts .push(format!("{} = :u", USERS_TABLE_USERNAME_ATTRIBUTE)); expression_attribute_values .insert(":u".to_string(), AttributeValue::S(username)); }; if let Some(public_key) = user_public_key { update_expression_parts.push(format!( "{}.#{} = :k", USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, )); expression_attribute_names.insert( format!("#{}", USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME), device_id, ); expression_attribute_values.insert( ":k".to_string(), AttributeValue::M(HashMap::from([( USERS_TABLE_USER_PUBLIC_KEY_ATTRIBUTE.to_string(), AttributeValue::S(public_key), )])), ); }; self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(format!("SET {}", update_expression_parts.join(","))) .set_expression_attribute_names( if expression_attribute_names.is_empty() { None } else { Some(expression_attribute_names) }, ) .set_expression_attribute_values( if expression_attribute_values.is_empty() { None } else { Some(expression_attribute_values) }, ) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn add_user_to_users_table( &self, user_id: String, device_id: String, registration: ServerRegistration, username: String, user_public_key: String, ) -> Result { let item = HashMap::from([ ( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ), ( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ), ( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(Blob::new(registration.serialize())), ), ( USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), AttributeValue::M(HashMap::from([( device_id, AttributeValue::M(HashMap::from([( USERS_TABLE_USER_PUBLIC_KEY_ATTRIBUTE.to_string(), AttributeValue::S(user_public_key), )])), )])), ), ]); self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_access_token_data( &self, user_id: String, device_id: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), (ACCESS_TOKEN_SORT_KEY.to_string(), device_id.clone()), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = parse_created_attribute( item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, device_id, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and device {} in token table", user_id, device_id ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} on device {}: {}", user_id, device_id, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.device_id), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: AuthType, ) -> Result, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(mut items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } parse_string_attribute( USERS_TABLE_PARTITION_KEY, items[0].remove(USERS_TABLE_PARTITION_KEY), ) .map(Some) .map_err(Error::Attribute) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user ID from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_user_public_key( &self, user_id: String, device_id: String, ) -> Result, Error> { match self.get_item_from_users_table(&user_id).await { Ok(GetItemOutput { item: Some(mut item), .. }) => { // `devices` is a HashMap that maps device IDs to a HashMap of // device-specific attributes let mut devices = parse_map_attribute( USERS_TABLE_DEVICES_ATTRIBUTE, item.remove(USERS_TABLE_DEVICES_ATTRIBUTE), )?; if devices.get(&device_id).is_none() { return Ok(None); } let mut device = parse_map_attribute( USERS_TABLE_DEVICE_ATTRIBUTE, devices.remove(&device_id), )?; parse_string_attribute( USERS_TABLE_USER_PUBLIC_KEY_ATTRIBUTE, device.remove(USERS_TABLE_USER_PUBLIC_KEY_ATTRIBUTE), ) .map(Some) .map_err(Error::Attribute) } Ok(_) => { info!( "No item found for user {} and device {} in users table", user_id, device_id ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user public key for user {}: {}", user_id, e ); - Err(Error::AwsSdk(e.into())) + Err(e) } } } - async fn get_item_from_users_table( + pub async fn get_item_from_users_table( &self, user_id: &str, - ) -> Result> { + ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await + .map_err(|e| Error::AwsSdk(e.into())) } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: &'static str, attribute_value: Option, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> FmtResult { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] Pake(ProtocolError), } type AttributeName = String; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_created_attribute( attribute: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(created)) = &attribute { created.parse().map_err(|e| { DBItemError::new( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, attribute, DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )) } } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match &attribute { Some(AttributeValue::B(server_registration_bytes)) => { match ServerRegistration::::deserialize( server_registration_bytes.as_ref(), ) { Ok(server_registration) => Ok(server_registration), Err(e) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::Pake(e), )), } } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_string_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } fn parse_map_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result, DBItemError> { match attribute_value { Some(AttributeValue::M(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } } diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs index 62786cfc0..bda7ae3a2 100644 --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -1,737 +1,550 @@ +use aws_sdk_dynamodb::output::GetItemOutput; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::Utc; use comm_opaque::Cipher; use constant_time_eq::constant_time_eq; use futures_core::Stream; use opaque_ke::{ CredentialFinalization, CredentialRequest, RegistrationRequest as PakeRegistrationRequest, ServerLogin, ServerLoginStartParameters, }; use opaque_ke::{RegistrationUpload, ServerRegistration}; use rand::rngs::OsRng; use rand::{CryptoRng, Rng}; use siwe::Message; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{error, info, instrument}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; use crate::database::DatabaseClient; use crate::token::{AccessTokenData, AuthType}; use crate::{config::Config, database::Error as DBError}; pub use proto::identity_service_server::IdentityServiceServer; use proto::{ get_user_id_request::AuthType as ProtoAuthType, identity_service_server::IdentityService, login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as PakeRegistrationCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as PakeRegistrationLoginResponse, registration_response::Data::PakeRegistrationResponse, GetUserIdRequest, GetUserIdResponse, GetUserPublicKeyRequest, GetUserPublicKeyResponse, LoginRequest, LoginResponse, PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, RegistrationRequest, RegistrationResponse, VerifyUserTokenRequest, VerifyUserTokenResponse, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; mod proto { tonic::include_proto!("identity"); } +mod login; +mod registration; + #[derive(Debug)] enum PakeWorkflow { Registration, Login, } #[derive(derive_more::Constructor)] pub struct MyIdentityService { config: Config, client: DatabaseClient, } #[tonic::async_trait] impl IdentityService for MyIdentityService { type RegisterUserStream = Pin< Box< dyn Stream> + Send + 'static, >, >; #[instrument(skip(self))] async fn register_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let config = self.config.clone(); - let client = self.client.clone(); - tokio::spawn(async move { - let mut user_id: String = String::new(); - let mut device_id: String = String::new(); - let mut server_registration: Option> = None; - let mut server_login: Option> = None; - let mut username: String = String::new(); - let mut user_public_key: String = String::new(); - let mut num_messages_received = 0; - while let Some(message) = in_stream.next().await { - match message { - Ok(RegistrationRequest { - data: - Some(PakeRegistrationRequestAndUserId( - pake_registration_request_and_user_id, - )), - }) => { - let registration_start_result = pake_registration_start( - config.clone(), - &mut OsRng, - &pake_registration_request_and_user_id.pake_registration_request, - num_messages_received, - ) - .await - .map(|registration_response_and_server_registration| { - server_registration = - Some(registration_response_and_server_registration.1); - registration_response_and_server_registration.0 - }); - if let Err(e) = tx.send(registration_start_result).await { - error!("Response was dropped: {}", e); - break; - } - user_id = pake_registration_request_and_user_id.user_id; - device_id = pake_registration_request_and_user_id.device_id; - username = pake_registration_request_and_user_id.username; - user_public_key = - pake_registration_request_and_user_id.user_public_key; - } - Ok(RegistrationRequest { - data: - Some(PakeRegistrationUploadAndCredentialRequest( - pake_registration_upload_and_credential_request, - )), - }) => { - let registration_finish_and_login_start_result = - match pake_registration_finish( - &user_id, - &device_id, - client.clone(), - &pake_registration_upload_and_credential_request - .pake_registration_upload, - server_registration, - &username, - &user_public_key, - num_messages_received, - ) - .await - { - Ok(_) => pake_login_start( - config.clone(), - client.clone(), - &user_id.clone(), - &pake_registration_upload_and_credential_request - .pake_credential_request, - num_messages_received, - PakeWorkflow::Registration, - ) - .await - .map(|pake_login_response_and_server_login| { - server_login = Some(pake_login_response_and_server_login.1); - RegistrationResponse { - data: Some(PakeRegistrationLoginResponse( - pake_login_response_and_server_login.0, - )), - } - }), - Err(e) => Err(e), - }; - if let Err(e) = - tx.send(registration_finish_and_login_start_result).await - { - error!("Response was dropped: {}", e); - break; - } - server_registration = None; - } - Ok(RegistrationRequest { - data: - Some(PakeRegistrationCredentialFinalization( - pake_credential_finalization, - )), - }) => { - let login_finish_result = pake_login_finish( - &user_id, - &device_id, - &user_public_key, - client, - server_login, - &pake_credential_finalization, - &mut OsRng, - num_messages_received, - PakeWorkflow::Registration, - ) - .await - .map(|pake_login_response| RegistrationResponse { - data: Some(PakeRegistrationLoginResponse(pake_login_response)), - }); - if let Err(e) = tx.send(login_finish_result).await { - error!("Response was dropped: {}", e); - } - break; - } - unexpected => { - error!("Received an unexpected Result: {:?}", unexpected); - if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await - { - error!("Response was dropped: {}", e); - } - break; - } - } - num_messages_received += 1; - } - }); + let first_message = in_stream.next().await; + let mut registration_state = registration::handle_registration_request( + first_message, + self.client.clone(), + tx.clone(), + &self.config, + ) + .await?; + // ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we + // have to take the value out of registration_state, replacing it with None + let pake_state = + if let Some(pake_state) = registration_state.pake_state.take() { + pake_state + } else { + error!("registration_state is missing opaque-ke ServerRegistration"); + return Err(Status::failed_precondition("internal error")); + }; + let second_message = in_stream.next().await; + let server_login = + registration::handle_registration_upload_and_credential_request( + second_message, + tx.clone(), + self.client.clone(), + ®istration_state, + pake_state, + &self.config, + ) + .await?; + let third_message = in_stream.next().await; + registration::handle_credential_finalization( + third_message, + tx, + self.client.clone(), + ®istration_state, + server_login, + ) + .await?; + let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::RegisterUserStream )) } type LoginUserStream = Pin> + Send + 'static>>; #[instrument(skip(self))] async fn login_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); - let config = self.config.clone(); - let client = self.client.clone(); - tokio::spawn(async move { - let mut user_id: String = String::new(); - let mut device_id: String = String::new(); - let mut server_login: Option> = None; - let mut user_public_key: String = String::new(); - let mut num_messages_received = 0; - while let Some(message) = in_stream.next().await { - match message { - Ok(LoginRequest { - data: Some(WalletLoginRequest(req)), - }) => { - let wallet_login_result = wallet_login_helper( - client, - req, - &mut OsRng, - num_messages_received, - ) - .await; - if let Err(e) = tx.send(wallet_login_result).await { - error!("Response was dropped: {}", e); - } - break; - } - Ok(LoginRequest { - data: - Some(PakeLoginRequest(PakeLoginRequestStruct { - data: - Some(PakeCredentialRequestAndUserId( - pake_credential_request_and_user_id, - )), - })), - }) => { - let login_start_result = pake_login_start( - config.clone(), - client.clone(), - &pake_credential_request_and_user_id.user_id, - &pake_credential_request_and_user_id.pake_credential_request, - num_messages_received, - PakeWorkflow::Login, - ) - .await - .map(|pake_login_response_and_server_login| { - server_login = Some(pake_login_response_and_server_login.1); - LoginResponse { - data: Some(PakeLoginResponse( - pake_login_response_and_server_login.0, - )), - } - }); - if let Err(e) = tx.send(login_start_result).await { - error!("Response was dropped: {}", e); - break; - } - user_id = pake_credential_request_and_user_id.user_id; - device_id = pake_credential_request_and_user_id.device_id; - user_public_key = - pake_credential_request_and_user_id.user_public_key; - } - Ok(LoginRequest { - data: - Some(PakeLoginRequest(PakeLoginRequestStruct { - data: - Some(PakeCredentialFinalization(pake_credential_finalization)), - })), - }) => { - let login_finish_result = pake_login_finish( - &user_id, - &device_id, - &user_public_key, - client, - server_login, - &pake_credential_finalization, - &mut OsRng, - num_messages_received, - PakeWorkflow::Login, - ) - .await - .map(|pake_login_response| LoginResponse { - data: Some(PakeLoginResponse(pake_login_response)), - }); - if let Err(e) = tx.send(login_finish_result).await { - error!("Response was dropped: {}", e); - } - break; - } - unexpected => { - error!("Received an unexpected Result: {:?}", unexpected); - if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await - { - error!("Response was dropped: {}", e); - } - break; - } - } - num_messages_received += 1; - } - }); + + let first_message = in_stream.next().await; + let login_state = login::handle_login_request( + first_message, + tx.clone(), + self.client.clone(), + &self.config, + ) + .await?; + + // login_state will be None if user is logging in with a wallet + if let Some(state) = login_state { + let second_message = in_stream.next().await; + login::handle_credential_finalization( + second_message, + tx, + self.client.clone(), + state, + ) + .await?; + } + let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) } #[instrument(skip(self))] async fn verify_user_token( &self, request: Request, ) -> Result, Status> { info!("Received VerifyUserToken request: {:?}", request); let message = request.into_inner(); let token_valid = match self .client .get_access_token_data(message.user_id, message.device_id) .await { Ok(Some(access_token_data)) => constant_time_eq( access_token_data.access_token.as_bytes(), message.access_token.as_bytes(), ), Ok(None) => false, Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(VerifyUserTokenResponse { token_valid }); info!("Sending VerifyUserToken response: {:?}", response); Ok(response) } #[instrument(skip(self))] async fn get_user_id( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let auth_type = match ProtoAuthType::from_i32(message.auth_type) { Some(ProtoAuthType::Password) => AuthType::Password, Some(ProtoAuthType::Wallet) => AuthType::Wallet, None => { error!( "Unable to parse AuthType from message: {}", message.auth_type ); return Err(Status::invalid_argument("invalid message")); } }; let user_id = match self .client .get_user_id_from_user_info(message.user_info, auth_type) .await { Ok(Some(user_id)) => user_id, Ok(None) => return Err(Status::not_found("no user ID found")), Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(GetUserIdResponse { user_id }); Ok(response) } #[instrument(skip(self))] async fn get_user_public_key( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let public_key = match self .client .get_user_public_key(message.user_id, message.device_id) .await { Ok(Some(public_key)) => public_key, Ok(None) => return Err(Status::not_found("no public key found")), Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(GetUserPublicKeyResponse { public_key }); Ok(response) } } async fn put_token_helper( client: DatabaseClient, auth_type: AuthType, user_id: &str, device_id: &str, rng: &mut (impl Rng + CryptoRng), ) -> Result { if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID \"{}\", device ID \"{}\"", user_id, device_id ); return Err(Status::aborted("user not found")); } let access_token_data = AccessTokenData::new( user_id.to_string(), device_id.to_string(), - auth_type.clone(), + auth_type, rng, ); match client .put_access_token_data(access_token_data.clone()) .await { Ok(_) => Ok(access_token_data.access_token), Err(e) => Err(handle_db_error(e)), } } fn parse_and_verify_siwe_message( user_id: &str, device_id: &str, siwe_message: &str, siwe_signature: Vec, ) -> Result<(), Status> { if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID {}, device ID {}", user_id, device_id ); return Err(Status::aborted("user not found")); } let siwe_message: Message = match siwe_message.parse() { Ok(m) => m, Err(e) => { error!("Failed to parse SIWE message: {}", e); return Err(Status::invalid_argument("invalid message")); } }; match siwe_message.verify( match siwe_signature.try_into() { Ok(s) => s, Err(e) => { error!("Conversion to SIWE signature failed: {:?}", e); return Err(Status::invalid_argument("invalid message")); } }, None, None, Some(&Utc::now()), ) { Err(e) => { error!( "Signature verification failed for user {} on device {}: {}", user_id, device_id, e ); Err(Status::unauthenticated("message not authenticated")) } Ok(_) => Ok(()), } } async fn wallet_login_helper( client: DatabaseClient, wallet_login_request: WalletLoginRequestStruct, rng: &mut (impl Rng + CryptoRng), - num_messages_received: u8, ) -> Result { - if num_messages_received != 0 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } parse_and_verify_siwe_message( &wallet_login_request.user_id, &wallet_login_request.device_id, &wallet_login_request.siwe_message, wallet_login_request.siwe_signature, )?; client .update_users_table( wallet_login_request.user_id.clone(), wallet_login_request.device_id.clone(), None, None, Some(wallet_login_request.user_public_key), ) .await .map_err(handle_db_error)?; Ok(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token: put_token_helper( client, AuthType::Wallet, &wallet_login_request.user_id, &wallet_login_request.device_id, rng, ) .await?, })), }) } async fn pake_login_start( - config: Config, + config: &Config, client: DatabaseClient, user_id: &str, pake_credential_request: &[u8], - num_messages_received: u8, - pake_workflow: PakeWorkflow, -) -> Result<(PakeLoginResponseStruct, ServerLogin), Status> { - if (num_messages_received != 0 - && matches!(pake_workflow, PakeWorkflow::Login)) - || (num_messages_received != 1 - && matches!(pake_workflow, PakeWorkflow::Registration)) - { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } +) -> Result { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration = match client.get_pake_registration(user_id.to_string()).await { Ok(Some(r)) => r, Ok(None) => { return Err(Status::not_found("user not found")); } Err(e) => return Err(handle_db_error(e)), }; let credential_request = CredentialRequest::deserialize(pake_credential_request).map_err(|e| { error!("Failed to deserialize credential request: {}", e); Status::invalid_argument("invalid message") })?; match ServerLogin::start( &mut OsRng, server_registration, config.server_keypair.private(), credential_request, ServerLoginStartParameters::default(), ) { - Ok(server_login_start_result) => Ok(( - PakeLoginResponseStruct { + Ok(server_login_start_result) => Ok(LoginResponseAndPakeState { + response: PakeLoginResponseStruct { data: Some(PakeCredentialResponse( server_login_start_result.message.serialize().map_err(|e| { error!("Failed to serialize PAKE message: {}", e); Status::failed_precondition("internal error") })?, )), }, - server_login_start_result.state, - )), + pake_state: server_login_start_result.state, + }), Err(e) => { error!( "Encountered a PAKE protocol error when starting login: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_login_finish( user_id: &str, device_id: &str, user_public_key: &str, client: DatabaseClient, - server_login: Option>, + server_login: ServerLogin, pake_credential_finalization: &[u8], rng: &mut (impl Rng + CryptoRng), - num_messages_received: u8, pake_workflow: PakeWorkflow, ) -> Result { - if (num_messages_received != 1 - && matches!(pake_workflow, PakeWorkflow::Login)) - || (num_messages_received != 2 - && matches!(pake_workflow, PakeWorkflow::Registration)) - { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID {}, device ID {}", user_id, device_id ); return Err(Status::aborted("user not found")); } server_login - .ok_or_else(|| { - error!("Server login missing in {:?} PAKE workflow", pake_workflow); - Status::aborted("login failed") - })? .finish( CredentialFinalization::deserialize(pake_credential_finalization) .map_err(|e| { error!("Failed to deserialize credential finalization bytes: {}", e); Status::aborted("login failed") })?, ) .map_err(|e| { error!( "Encountered a PAKE protocol error when finishing login: {}", e ); Status::aborted("server error") })?; if matches!(pake_workflow, PakeWorkflow::Login) { client .update_users_table( user_id.to_string(), device_id.to_string(), None, None, Some(user_public_key.to_string()), ) .await .map_err(handle_db_error)?; } Ok(PakeLoginResponseStruct { data: Some(AccessToken( put_token_helper(client, AuthType::Password, user_id, device_id, rng) .await?, )), }) } async fn pake_registration_start( - config: Config, + config: &Config, rng: &mut (impl Rng + CryptoRng), registration_request_bytes: &[u8], - num_messages_received: u8, -) -> Result<(RegistrationResponse, ServerRegistration), Status> { - if num_messages_received != 0 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } +) -> Result { match ServerRegistration::::start( rng, PakeRegistrationRequest::deserialize(registration_request_bytes).unwrap(), config.server_keypair.public(), ) { - Ok(server_registration_start_result) => Ok(( - RegistrationResponse { - data: Some(PakeRegistrationResponse( - server_registration_start_result.message.serialize(), - )), - }, - server_registration_start_result.state, - )), + Ok(server_registration_start_result) => { + Ok(RegistrationResponseAndPakeState { + response: RegistrationResponse { + data: Some(PakeRegistrationResponse( + server_registration_start_result.message.serialize(), + )), + }, + pake_state: server_registration_start_result.state, + }) + } Err(e) => { error!( "Encountered a PAKE protocol error when starting registration: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_registration_finish( user_id: &str, device_id: &str, client: DatabaseClient, registration_upload_bytes: &[u8], server_registration: Option>, username: &str, user_public_key: &str, - num_messages_received: u8, ) -> Result<(), Status> { - if num_messages_received != 1 { - error!("Too many messages received in stream, aborting"); - return Err(Status::aborted("please retry")); - } if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration_finish_result = server_registration .ok_or_else(|| Status::aborted("registration failed"))? .finish( RegistrationUpload::deserialize(registration_upload_bytes).map_err( |e| { error!("Failed to deserialize registration upload bytes: {}", e); Status::aborted("registration failed") }, )?, ) .map_err(|e| { error!( "Encountered a PAKE protocol error when finishing registration: {}", e ); Status::aborted("server error") })?; match client - .update_users_table( + .add_user_to_users_table( user_id.to_string(), device_id.to_string(), - Some(server_registration_finish_result), - Some(username.to_string()), - Some(user_public_key.to_string()), + server_registration_finish_result, + username.to_string(), + user_public_key.to_string(), ) .await { Ok(_) => Ok(()), Err(e) => Err(handle_db_error(e)), } } fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } } + +struct RegistrationResponseAndPakeState { + response: RegistrationResponse, + pake_state: ServerRegistration, +} + +struct LoginResponseAndPakeState { + response: PakeLoginResponseStruct, + pake_state: ServerLogin, +} diff --git a/services/identity/src/service/login.rs b/services/identity/src/service/login.rs new file mode 100644 index 000000000..a30e89235 --- /dev/null +++ b/services/identity/src/service/login.rs @@ -0,0 +1,97 @@ +use super::*; +pub struct LoginState { + user_id: String, + device_id: String, + user_public_key: String, + pake_state: ServerLogin, +} +pub async fn handle_login_request( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + config: &Config, +) -> Result, Status> { + match message { + Some(Ok(LoginRequest { + data: Some(WalletLoginRequest(req)), + })) => { + let wallet_login_result = + wallet_login_helper(client, req, &mut OsRng).await; + if let Err(e) = tx.send(wallet_login_result).await { + error!("Response was dropped: {}", e); + Err(Status::aborted("failure")) + } else { + Ok(None) + } + } + Some(Ok(LoginRequest { + data: + Some(PakeLoginRequest(PakeLoginRequestStruct { + data: + Some(PakeCredentialRequestAndUserId( + pake_credential_request_and_user_id, + )), + })), + })) => { + let response_and_state = pake_login_start( + config, + client, + &pake_credential_request_and_user_id.user_id, + &pake_credential_request_and_user_id.pake_credential_request, + ) + .await?; + let login_response = LoginResponse { + data: Some(PakeLoginResponse(response_and_state.response)), + }; + if let Err(e) = tx.send(Ok(login_response)).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + + Ok(Some(LoginState { + user_id: pake_credential_request_and_user_id.user_id, + device_id: pake_credential_request_and_user_id.device_id, + user_public_key: pake_credential_request_and_user_id.user_public_key, + pake_state: response_and_state.pake_state, + })) + } + Some(_) | None => Err(Status::aborted("failure")), + } +} + +pub async fn handle_credential_finalization( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + login_state: LoginState, +) -> Result<(), Status> { + match message { + Some(Ok(LoginRequest { + data: + Some(PakeLoginRequest(PakeLoginRequestStruct { + data: Some(PakeCredentialFinalization(pake_credential_finalization)), + })), + })) => { + let login_finish_result = pake_login_finish( + &login_state.user_id, + &login_state.device_id, + &login_state.user_public_key, + client, + login_state.pake_state, + &pake_credential_finalization, + &mut OsRng, + PakeWorkflow::Login, + ) + .await + .map(|pake_login_response| LoginResponse { + data: Some(PakeLoginResponse(pake_login_response)), + }); + if let Err(e) = tx.send(login_finish_result).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + Ok(()) + } + Some(_) | None => Err(Status::aborted("failure")), + } +} diff --git a/services/identity/src/service/registration.rs b/services/identity/src/service/registration.rs new file mode 100644 index 000000000..6dfcfcde2 --- /dev/null +++ b/services/identity/src/service/registration.rs @@ -0,0 +1,153 @@ +use super::*; +pub struct RegistrationState { + user_id: String, + device_id: String, + username: String, + user_public_key: String, + pub pake_state: Option>, +} + +pub async fn handle_registration_request( + message: Option>, + client: DatabaseClient, + tx: mpsc::Sender>, + config: &Config, +) -> Result { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationRequestAndUserId( + pake_registration_request_and_user_id, + )), + })) => { + let get_item_output = client + .get_item_from_users_table( + &pake_registration_request_and_user_id.user_id, + ) + .await; + match get_item_output { + Ok(GetItemOutput { item: Some(_), .. }) => { + error!("User already exists"); + if let Err(e) = tx + .send(Err(Status::already_exists("User already exists"))) + .await + { + error!("Response was dropped: {}", e); + } + } + Err(e) => return Err(handle_db_error(e)), + _ => {} + }; + let response_and_state = pake_registration_start( + config, + &mut OsRng, + &pake_registration_request_and_user_id.pake_registration_request, + ) + .await?; + if let Err(e) = tx.send(Ok(response_and_state.response)).await { + error!("Response was dropped: {}", e); + } + + Ok(RegistrationState { + user_id: pake_registration_request_and_user_id.user_id, + device_id: pake_registration_request_and_user_id.device_id, + username: pake_registration_request_and_user_id.username, + user_public_key: pake_registration_request_and_user_id.user_public_key, + pake_state: Some(response_and_state.pake_state), + }) + } + None | Some(_) => Err(Status::aborted("failure")), + } +} + +pub async fn handle_registration_upload_and_credential_request( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + registration_state: &RegistrationState, + pake_state: ServerRegistration, + config: &Config, +) -> Result, Status> { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationUploadAndCredentialRequest( + pake_registration_upload_and_credential_request, + )), + })) => { + let response_and_state = match pake_registration_finish( + ®istration_state.user_id, + ®istration_state.device_id, + client.clone(), + &pake_registration_upload_and_credential_request + .pake_registration_upload, + Some(pake_state), + ®istration_state.username, + ®istration_state.user_public_key, + ) + .await + { + Ok(_) => { + pake_login_start( + config, + client, + ®istration_state.user_id, + &pake_registration_upload_and_credential_request + .pake_credential_request, + ) + .await? + } + + Err(e) => { + return Err(e); + } + }; + let registration_response = RegistrationResponse { + data: Some(PakeRegistrationLoginResponse(response_and_state.response)), + }; + if let Err(e) = tx.send(Ok(registration_response)).await { + error!("Response was dropped: {}", e); + Err(Status::aborted("failure")) + } else { + Ok(response_and_state.pake_state) + } + } + None | Some(_) => Err(Status::aborted("failure")), + } +} + +pub async fn handle_credential_finalization( + message: Option>, + tx: mpsc::Sender>, + client: DatabaseClient, + registration_state: &RegistrationState, + server_login: ServerLogin, +) -> Result<(), Status> { + match message { + Some(Ok(RegistrationRequest { + data: + Some(PakeRegistrationCredentialFinalization(pake_credential_finalization)), + })) => { + let login_finish_result = pake_login_finish( + ®istration_state.user_id, + ®istration_state.device_id, + ®istration_state.user_public_key, + client, + server_login, + &pake_credential_finalization, + &mut OsRng, + PakeWorkflow::Registration, + ) + .await + .map(|pake_login_response| RegistrationResponse { + data: Some(PakeRegistrationLoginResponse(pake_login_response)), + }); + if let Err(e) = tx.send(login_finish_result).await { + error!("Response was dropped: {}", e); + return Err(Status::aborted("failure")); + } + Ok(()) + } + Some(_) | None => Err(Status::aborted("failure")), + } +}