diff --git a/keyserver/addons/rust-node-addon/src/identity_client.rs b/keyserver/addons/rust-node-addon/src/identity_client.rs index 4c08570bc..dda609bc9 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client.rs @@ -1,300 +1,308 @@ use crate::identity::identity_service_client::IdentityServiceClient; use crate::identity::{ pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as RegistrationPakeCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as RegistrationPakeLoginResponse, registration_response::Data::PakeRegistrationResponse, PakeLoginResponse as PakeLoginResponseStruct, PakeRegistrationRequestAndUserId as PakeRegistrationRequestAndUserIdStruct, PakeRegistrationUploadAndCredentialRequest as PakeRegistrationUploadAndCredentialRequestStruct, RegistrationRequest, RegistrationResponse as RegistrationResponseMessage, + SessionInitializationInfo, }; use crate::{AUTH_TOKEN, IDENTITY_SERVICE_SOCKET_ADDR}; use comm_opaque::Cipher; use napi::bindgen_prelude::*; use opaque_ke::{ ClientLogin, ClientLoginFinishParameters, ClientLoginStartParameters, ClientLoginStartResult, ClientRegistration, ClientRegistrationFinishParameters, CredentialFinalization, CredentialResponse, RegistrationResponse, RegistrationUpload, }; use rand::{rngs::OsRng, CryptoRng, Rng}; +use std::collections::HashMap; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{metadata::MetadataValue, transport::Channel, Request}; use tracing::{error, instrument}; #[napi] #[instrument(skip_all)] pub async fn register_user( user_id: String, signing_public_key: String, username: String, password: String, + session_initialization_info: HashMap, ) -> Result { let channel = Channel::from_static(&IDENTITY_SERVICE_SOCKET_ADDR) .connect() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; let token: MetadataValue<_> = AUTH_TOKEN .parse() .map_err(|_| Error::from_status(Status::GenericFailure))?; let mut identity_client = IdentityServiceClient::with_interceptor(channel, |mut req: Request<()>| { req.metadata_mut().insert("authorization", token.clone()); Ok(req) }); // Create a RegistrationRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = identity_client .register_user(request) .await .map_err(|_| Error::from_status(Status::GenericFailure))? .into_inner(); // Start PAKE registration on client and send initial registration request // to Identity service let mut client_rng = OsRng; let (registration_request, client_registration) = pake_registration_start( &mut client_rng, user_id, signing_public_key, &password, username, + SessionInitializationInfo { + info: session_initialization_info, + }, )?; send_to_mpsc(tx.clone(), registration_request).await?; // Handle responses from Identity service sequentially, making sure we get // messages in the correct order // Finish PAKE registration and begin PAKE login; send the final // registration request and initial login request together to reduce the // number of trips let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; let client_login = handle_registration_response( message, &mut client_rng, client_registration, &password, tx.clone(), ) .await?; // Finish PAKE login; send final login request to Identity service let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; handle_registration_credential_response(message, client_login, tx) .await .map_err(|_| Error::from_status(Status::GenericFailure))?; // Return access token let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; handle_registration_token_response(message) } fn handle_unexpected_response(message: Option) -> Error { error!("Received an unexpected message: {:?}", message); Error::from_status(Status::GenericFailure) } async fn send_to_mpsc(tx: mpsc::Sender, request: T) -> Result<()> { if let Err(e) = tx.send(request).await { error!("Response was dropped: {}", e); return Err(Error::from_status(Status::GenericFailure)); } Ok(()) } fn pake_login_start( rng: &mut (impl Rng + CryptoRng), password: &str, ) -> Result> { ClientLogin::::start( rng, password.as_bytes(), ClientLoginStartParameters::default(), ) .map_err(|e| { error!("Failed to start PAKE login: {}", e); Error::from_status(Status::GenericFailure) }) } fn pake_login_finish( credential_response_bytes: &[u8], client_login: ClientLogin, ) -> Result> { client_login .finish( CredentialResponse::deserialize(credential_response_bytes).map_err( |e| { error!("Could not deserialize credential response bytes: {}", e); Error::from_status(Status::GenericFailure) }, )?, ClientLoginFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE login: {}", e); Error::from_status(Status::GenericFailure) }) .map(|res| res.message) } fn pake_registration_start( rng: &mut (impl Rng + CryptoRng), user_id: String, signing_public_key: String, password: &str, username: String, + session_initialization_info: SessionInitializationInfo, ) -> Result<(RegistrationRequest, ClientRegistration)> { let client_registration_start_result = ClientRegistration::::start(rng, password.as_bytes()).map_err( |e| { error!("Failed to start PAKE registration: {}", e); Error::from_status(Status::GenericFailure) }, )?; let pake_registration_request = client_registration_start_result.message.serialize(); Ok(( RegistrationRequest { data: Some(PakeRegistrationRequestAndUserId( PakeRegistrationRequestAndUserIdStruct { user_id, pake_registration_request, username, signing_public_key, + session_initialization_info: Some(session_initialization_info), }, )), }, client_registration_start_result.state, )) } async fn handle_registration_response( message: Option, client_rng: &mut (impl Rng + CryptoRng), client_registration: ClientRegistration, password: &str, tx: mpsc::Sender, ) -> Result> { if let Some(RegistrationResponseMessage { data: Some(PakeRegistrationResponse(registration_response_bytes)), .. }) = message { let pake_registration_upload = pake_registration_finish( client_rng, ®istration_response_bytes, client_registration, )? .serialize(); let client_login_start_result = pake_login_start(client_rng, password)?; // `registration_request` is a gRPC message containing serialized bytes to // complete PAKE registration and begin PAKE login let registration_request = RegistrationRequest { data: Some(PakeRegistrationUploadAndCredentialRequest( PakeRegistrationUploadAndCredentialRequestStruct { pake_registration_upload, pake_credential_request: client_login_start_result .message .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Error::from_status(Status::GenericFailure) })?, }, )), }; send_to_mpsc(tx, registration_request).await?; Ok(client_login_start_result.state) } else { Err(handle_unexpected_response(message)) } } async fn handle_registration_credential_response( message: Option, client_login: ClientLogin, tx: mpsc::Sender, ) -> Result<()> { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response_bytes)), })), }) = message { let registration_request = RegistrationRequest { data: Some(RegistrationPakeCredentialFinalization( pake_login_finish(&credential_response_bytes, client_login)? .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Error::from_status(Status::GenericFailure) })?, )), }; send_to_mpsc(tx, registration_request).await } else { Err(handle_unexpected_response(message)) } } fn handle_registration_token_response( message: Option, ) -> Result { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } fn pake_registration_finish( rng: &mut (impl Rng + CryptoRng), registration_response_bytes: &[u8], client_registration: ClientRegistration, ) -> Result> { client_registration .finish( rng, RegistrationResponse::deserialize(registration_response_bytes).map_err( |e| { error!("Could not deserialize registration response bytes: {}", e); Error::from_status(Status::GenericFailure) }, )?, ClientRegistrationFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE registration: {}", e); Error::from_status(Status::GenericFailure) }) .map(|res| res.message) } diff --git a/native/native_rust_library/src/identity_client.rs b/native/native_rust_library/src/identity_client.rs index fa3456f27..27d98bc8c 100644 --- a/native/native_rust_library/src/identity_client.rs +++ b/native/native_rust_library/src/identity_client.rs @@ -1,495 +1,501 @@ use opaque_ke::{ ClientLogin, ClientLoginFinishParameters, ClientLoginStartParameters, ClientLoginStartResult, ClientRegistration, ClientRegistrationFinishParameters, CredentialFinalization, CredentialResponse, RegistrationResponse, RegistrationUpload, }; use rand::{rngs::OsRng, CryptoRng, Rng}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Status}; use tracing::error; use crate::identity::{ login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse as LoginPakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization as LoginPakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as RegistrationPakeCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as RegistrationPakeLoginResponse, registration_response::Data::PakeRegistrationResponse, GetUserIdRequest, LoginRequest, LoginResponse, PakeCredentialRequestAndUserId as PakeCredentialRequestAndUserIdStruct, PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, PakeRegistrationRequestAndUserId as PakeRegistrationRequestAndUserIdStruct, PakeRegistrationUploadAndCredentialRequest as PakeRegistrationUploadAndCredentialRequestStruct, RegistrationRequest, RegistrationResponse as RegistrationResponseMessage, VerifyUserTokenRequest, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; use crate::IdentityClient; use comm_opaque::Cipher; pub async fn get_user_id( mut client: Box, auth_type: i32, user_info: String, ) -> Result { Ok( client .identity_client .get_user_id(GetUserIdRequest { auth_type, user_info, }) .await? .into_inner() .user_id, ) } pub async fn verify_user_token( mut client: Box, user_id: String, signing_public_key: String, access_token: String, ) -> Result { Ok( client .identity_client .verify_user_token(VerifyUserTokenRequest { user_id, signing_public_key, access_token, }) .await? .into_inner() .token_valid, ) } pub async fn register_user( mut client: Box, user_id: String, signing_public_key: String, username: String, password: String, ) -> Result { // Create a RegistrationRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = client .identity_client .register_user(request) .await? .into_inner(); // Start PAKE registration on client and send initial registration request // to Identity service let mut client_rng = OsRng; let (registration_request, client_registration) = pake_registration_start( &mut client_rng, user_id, &password, signing_public_key, username, )?; if let Err(e) = tx.send(registration_request).await { error!("Response was dropped: {}", e); return Err(Status::aborted("Dropped response")); } // Handle responses from Identity service sequentially, making sure we get // messages in the correct order // Finish PAKE registration and begin PAKE login; send the final // registration request and initial login request together to reduce the // number of trips let message = response.message().await?; let client_login = handle_registration_response( message, &mut client_rng, client_registration, &password, tx.clone(), ) .await?; // Finish PAKE login; send final login request to Identity service let message = response.message().await?; handle_registration_credential_response(message, client_login, tx).await?; // Return access token let message = response.message().await?; handle_registration_token_response(message) } pub async fn login_user_pake( mut client: Box, user_id: String, signing_public_key: String, password: String, ) -> Result { // Create a LoginRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = client .identity_client .login_user(request) .await? .into_inner(); // Start PAKE login on client and send initial login request to Identity // service let mut client_rng = OsRng; let client_login_start_result = pake_login_start(&mut client_rng, &password)?; let login_request = LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialRequestAndUserId( PakeCredentialRequestAndUserIdStruct { user_id, signing_public_key, pake_credential_request: client_login_start_result .message .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Status::failed_precondition("PAKE failure") })?, + // Todo: provide actual session initialization info + session_initialization_info: None, }, )), })), }; if let Err(e) = tx.send(login_request).await { error!("Response was dropped: {}", e); return Err(Status::aborted("Dropped response")); } // Handle responses from Identity service sequentially, making sure we get // messages in the correct order // Finish PAKE login; send final login request to Identity service let message = response.message().await?; handle_login_credential_response( message, client_login_start_result.state, tx, ) .await?; // Return access token let message = response.message().await?; handle_login_token_response(message) } pub async fn login_user_wallet( mut client: Box, user_id: String, signing_public_key: String, siwe_message: String, siwe_signature: Vec, ) -> Result { // Create a LoginRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = client .identity_client .login_user(request) .await? .into_inner(); // Start wallet login on client and send initial login request to Identity // service let login_request = LoginRequest { data: Some(WalletLoginRequest(WalletLoginRequestStruct { user_id, signing_public_key, siwe_message, siwe_signature, + // Todo: provide actual session initialization info + session_initialization_info: None, })), }; if let Err(e) = tx.send(login_request).await { error!("Response was dropped: {}", e); return Err(Status::aborted("Dropped response")); } // Return access token let message = response.message().await?; handle_wallet_login_response(message) } fn pake_registration_start( rng: &mut (impl Rng + CryptoRng), user_id: String, password: &str, signing_public_key: String, username: String, ) -> Result<(RegistrationRequest, ClientRegistration), Status> { let client_registration_start_result = ClientRegistration::::start(rng, password.as_bytes()).map_err( |e| { error!("Failed to start PAKE registration: {}", e); Status::failed_precondition("PAKE failure") }, )?; let pake_registration_request = client_registration_start_result.message.serialize(); Ok(( RegistrationRequest { data: Some(PakeRegistrationRequestAndUserId( PakeRegistrationRequestAndUserIdStruct { user_id, signing_public_key, pake_registration_request, username, + // Todo: provide actual session initialization info + session_initialization_info: None, }, )), }, client_registration_start_result.state, )) } fn pake_registration_finish( rng: &mut (impl Rng + CryptoRng), registration_response_bytes: &[u8], client_registration: ClientRegistration, ) -> Result, Status> { client_registration .finish( rng, RegistrationResponse::deserialize(registration_response_bytes).map_err( |e| { error!("Could not deserialize registration response bytes: {}", e); Status::aborted("Invalid response bytes") }, )?, ClientRegistrationFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE registration: {}", e); Status::aborted("PAKE failure") }) .map(|res| res.message) } fn pake_login_start( rng: &mut (impl Rng + CryptoRng), password: &str, ) -> Result, Status> { ClientLogin::::start( rng, password.as_bytes(), ClientLoginStartParameters::default(), ) .map_err(|e| { error!("Failed to start PAKE login: {}", e); Status::failed_precondition("PAKE failure") }) } fn pake_login_finish( credential_response_bytes: &[u8], client_login: ClientLogin, ) -> Result, Status> { client_login .finish( CredentialResponse::deserialize(credential_response_bytes).map_err( |e| { error!("Could not deserialize credential response bytes: {}", e); Status::aborted("Invalid response bytes") }, )?, ClientLoginFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE login: {}", e); Status::aborted("PAKE failure") }) .map(|res| res.message) } fn handle_unexpected_response( message: Option, ) -> Status { error!("Received an unexpected message: {:?}", message); Status::invalid_argument("Invalid response data") } async fn handle_registration_response( message: Option, client_rng: &mut (impl Rng + CryptoRng), client_registration: ClientRegistration, password: &str, tx: mpsc::Sender, ) -> Result, Status> { if let Some(RegistrationResponseMessage { data: Some(PakeRegistrationResponse(registration_response_bytes)), .. }) = message { let pake_registration_upload = pake_registration_finish( client_rng, ®istration_response_bytes, client_registration, )? .serialize(); let client_login_start_result = pake_login_start(client_rng, password)?; // `registration_request` is a gRPC message containing serialized bytes to // complete PAKE registration and begin PAKE login let registration_request = RegistrationRequest { data: Some(PakeRegistrationUploadAndCredentialRequest( PakeRegistrationUploadAndCredentialRequestStruct { pake_registration_upload, pake_credential_request: client_login_start_result .message .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Status::failed_precondition("PAKE failure") })?, }, )), }; if let Err(e) = tx.send(registration_request).await { error!("Response was dropped: {}", e); return Err(Status::aborted("Dropped response")); } Ok(client_login_start_result.state) } else { Err(handle_unexpected_response(message)) } } async fn handle_registration_credential_response( message: Option, client_login: ClientLogin, tx: mpsc::Sender, ) -> Result<(), Status> { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response_bytes)), })), }) = message { let registration_request = RegistrationRequest { data: Some(RegistrationPakeCredentialFinalization( pake_login_finish(&credential_response_bytes, client_login)? .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Status::failed_precondition("PAKE failure") })?, )), }; send_to_mpsc(tx, registration_request).await } else { Err(handle_unexpected_response(message)) } } async fn handle_login_credential_response( message: Option, client_login: ClientLogin, tx: mpsc::Sender, ) -> Result<(), Status> { if let Some(LoginResponse { data: Some(LoginPakeLoginResponse(PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response_bytes)), })), }) = message { let login_request = LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(LoginPakeCredentialFinalization( pake_login_finish(&credential_response_bytes, client_login)? .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Status::failed_precondition("PAKE failure") })?, )), })), }; send_to_mpsc(tx, login_request).await } else { Err(handle_unexpected_response(message)) } } fn handle_registration_token_response( message: Option, ) -> Result { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } fn handle_login_token_response( message: Option, ) -> Result { if let Some(LoginResponse { data: Some(LoginPakeLoginResponse(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } fn handle_wallet_login_response( message: Option, ) -> Result { if let Some(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } async fn send_to_mpsc( tx: mpsc::Sender, request: T, ) -> Result<(), Status> { if let Err(e) = tx.send(request).await { error!("Response was dropped: {}", e); return Err(Status::aborted("Dropped response")); } Ok(()) } diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index 5f1f412f1..4cbc57a99 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,746 +1,761 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::sync::Arc; use aws_sdk_dynamodb::model::AttributeValue; use aws_sdk_dynamodb::output::{ DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, UpdateItemOutput, }; use aws_sdk_dynamodb::types::Blob; use aws_sdk_dynamodb::{Client, Error as DynamoDBError}; use aws_types::sdk_config::SdkConfig; use chrono::{DateTime, Utc}; use opaque_ke::{errors::ProtocolError, ServerRegistration}; use tracing::{debug, error, info, warn}; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, USERS_TABLE, USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, USERS_TABLE_DEVICE_ATTRIBUTE_NAME, USERS_TABLE_INITIALIZATION_INFO, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::nonce::NonceData; use crate::token::{AccessTokenData, AuthType}; use comm_opaque::Cipher; #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &SdkConfig) -> Self { DatabaseClient { client: Arc::new(Client::new(aws_config)), } } pub async fn get_pake_registration( &self, user_id: String, ) -> Result>, Error> { match self.get_item_from_users_table(&user_id).await { Ok(GetItemOutput { item: Some(mut item), .. }) => parse_registration_data_attribute( item.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), ) .map(Some) .map_err(Error::Attribute), Ok(_) => { info!( "No item found for user {} in PAKE registration table", user_id ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", user_id, e ); Err(e) } } } pub async fn get_session_initialization_info( &self, user_id: &str, ) -> Result>>, Error> { match self.get_item_from_users_table(user_id).await { Ok(GetItemOutput { item: Some(mut item), .. }) => parse_devices_attribute(item.remove(USERS_TABLE_DEVICES_ATTRIBUTE)) .map(Some) .map_err(Error::Attribute), Ok(_) => { info!("No item found for user {} in users table", user_id); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get session initialization info for user {}: {}", user_id, e ); Err(e) } } } pub async fn update_users_table( &self, user_id: String, signing_public_key: Option, registration: Option>, username: Option, + session_initialization_info: Option<&HashMap>, ) -> Result { let mut update_expression_parts = Vec::new(); let mut expression_attribute_names = HashMap::new(); let mut expression_attribute_values = HashMap::new(); if let Some(reg) = registration { update_expression_parts .push(format!("{} = :r", USERS_TABLE_REGISTRATION_ATTRIBUTE)); expression_attribute_values.insert( ":r".to_string(), AttributeValue::B(Blob::new(reg.serialize())), ); }; if let Some(username) = username { update_expression_parts .push(format!("{} = :u", USERS_TABLE_USERNAME_ATTRIBUTE)); expression_attribute_values .insert(":u".to_string(), AttributeValue::S(username)); }; if let Some(public_key) = signing_public_key { update_expression_parts.push(format!( "{}.#{} = :k", USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, )); expression_attribute_names.insert( format!("#{}", USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME), public_key, ); + let device_info = match session_initialization_info { + Some(info) => info + .iter() + .map(|(k, v)| (k.to_string(), AttributeValue::S(v.to_string()))) + .collect(), + None => HashMap::new(), + }; expression_attribute_values - .insert(":k".to_string(), AttributeValue::M(HashMap::new())); + .insert(":k".to_string(), AttributeValue::M(device_info)); }; self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(format!("SET {}", update_expression_parts.join(","))) .set_expression_attribute_names( if expression_attribute_names.is_empty() { None } else { Some(expression_attribute_names) }, ) .set_expression_attribute_values( if expression_attribute_values.is_empty() { None } else { Some(expression_attribute_values) }, ) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn add_user_to_users_table( &self, user_id: String, registration: ServerRegistration, username: String, signing_public_key: String, + session_initialization_info: &HashMap, ) -> Result { + let device_info: HashMap = + session_initialization_info + .iter() + .map(|(k, v)| (k.to_string(), AttributeValue::S(v.to_string()))) + .collect(); + let item = HashMap::from([ ( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ), ( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ), ( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(Blob::new(registration.serialize())), ), ( USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), AttributeValue::M(HashMap::from([( signing_public_key, - AttributeValue::M(HashMap::new()), + AttributeValue::M(device_info), )])), ), ]); self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn delete_user( &self, user_id: String, ) -> Result { debug!("Attempting to delete user: {}", user_id); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete user {}", user_id); Err(Error::AwsSdk(e.into())) } } } pub async fn get_access_token_data( &self, user_id: String, signing_public_key: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), signing_public_key.clone(), ), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = parse_created_attribute( item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, signing_public_key, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and signing public key {} in token table", user_id, signing_public_key ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.signing_public_key), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: AuthType, ) -> Result, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(mut items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } parse_string_attribute( USERS_TABLE_PARTITION_KEY, items[0].remove(USERS_TABLE_PARTITION_KEY), ) .map(Some) .map_err(Error::Attribute) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user ID from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_users(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_PARTITION_KEY) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { let id = parse_string_attribute( USERS_TABLE_PARTITION_KEY, attribute.remove(USERS_TABLE_PARTITION_KEY), ) .map_err(Error::Attribute)?; result.push(id); } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { attribute_name: &'static str, attribute_value: Option, attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> FmtResult { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] Pake(ProtocolError), } type AttributeName = String; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_created_attribute( attribute: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(created)) = &attribute { created.parse().map_err(|e| { DBItemError::new( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, attribute, DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )) } } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match &attribute { Some(AttributeValue::B(server_registration_bytes)) => { match ServerRegistration::::deserialize( server_registration_bytes.as_ref(), ) { Ok(server_registration) => Ok(server_registration), Err(e) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::Pake(e), )), } } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE, attribute, DBItemAttributeError::Missing, )), } } fn parse_devices_attribute( attribute: Option, ) -> Result>, DBItemError> { let mut devices = HashMap::new(); let ddb_devices = parse_map_attribute(USERS_TABLE_DEVICES_ATTRIBUTE, attribute)?; for (signing_public_key, session_initialization_info) in ddb_devices { let session_initialization_info_map = parse_map_attribute( USERS_TABLE_DEVICE_ATTRIBUTE_NAME, Some(session_initialization_info), )?; let mut inner_hash_map = HashMap::new(); for (initialization_component_name, initialization_component_value) in session_initialization_info_map { let initialization_piece_value_string = parse_string_attribute( USERS_TABLE_INITIALIZATION_INFO, Some(initialization_component_value), )?; inner_hash_map.insert( initialization_component_name, initialization_piece_value_string, ); } devices.insert(signing_public_key, inner_hash_map); } Ok(devices) } fn parse_map_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result, DBItemError> { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } fn parse_string_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name, attribute_value, DBItemAttributeError::Missing, )), } } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } } diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs index 7ab4feab3..66cc182f5 100644 --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -1,609 +1,619 @@ use aws_sdk_dynamodb::output::GetItemOutput; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::Utc; use comm_opaque::Cipher; use constant_time_eq::constant_time_eq; use futures_core::Stream; use opaque_ke::{ CredentialFinalization, CredentialRequest, RegistrationRequest as PakeRegistrationRequest, ServerLogin, ServerLoginStartParameters, }; use opaque_ke::{RegistrationUpload, ServerRegistration}; use rand::rngs::OsRng; use rand::{CryptoRng, Rng}; use siwe::Message; use std::collections::{HashMap, HashSet}; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{error, info, instrument}; use crate::config::CONFIG; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; use crate::database::{DatabaseClient, Error as DBError}; use crate::nonce::generate_nonce_data; use crate::token::{AccessTokenData, AuthType}; pub use proto::identity_service_server::IdentityServiceServer; use proto::{ get_user_id_request::AuthType as ProtoAuthType, identity_service_server::IdentityService, login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as PakeRegistrationCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as PakeRegistrationLoginResponse, registration_response::Data::PakeRegistrationResponse, CompareUsersRequest, CompareUsersResponse, DeleteUserRequest, DeleteUserResponse, GenerateNonceRequest, GenerateNonceResponse, GetSessionInitializationInfoRequest, GetSessionInitializationInfoResponse, GetUserIdRequest, GetUserIdResponse, LoginRequest, LoginResponse, PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, RegistrationRequest, RegistrationResponse, SessionInitializationInfo, VerifyUserTokenRequest, VerifyUserTokenResponse, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; mod proto { tonic::include_proto!("identity"); } mod login; mod registration; #[derive(Debug)] enum PakeWorkflow { Registration, Login, } #[derive(derive_more::Constructor)] pub struct MyIdentityService { client: DatabaseClient, } #[tonic::async_trait] impl IdentityService for MyIdentityService { type RegisterUserStream = Pin< Box< dyn Stream> + Send + 'static, >, >; #[instrument(skip(self))] async fn register_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let first_message = in_stream.next().await; let mut registration_state = registration::handle_registration_request( first_message, self.client.clone(), tx.clone(), ) .await?; // ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we // have to take the value out of registration_state, replacing it with None let pake_state = if let Some(pake_state) = registration_state.pake_state.take() { pake_state } else { error!("registration_state is missing opaque-ke ServerRegistration"); return Err(Status::failed_precondition("internal error")); }; let second_message = in_stream.next().await; let server_login = registration::handle_registration_upload_and_credential_request( second_message, tx.clone(), self.client.clone(), ®istration_state, pake_state, ) .await?; let third_message = in_stream.next().await; registration::handle_credential_finalization( third_message, tx, self.client.clone(), ®istration_state, server_login, ) .await?; let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::RegisterUserStream )) } type LoginUserStream = Pin> + Send + 'static>>; #[instrument(skip(self))] async fn login_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let first_message = in_stream.next().await; let login_state = login::handle_login_request( first_message, tx.clone(), self.client.clone(), ) .await?; // login_state will be None if user is logging in with a wallet if let Some(state) = login_state { let second_message = in_stream.next().await; login::handle_credential_finalization( second_message, tx, self.client.clone(), state, ) .await?; } let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) } #[instrument(skip(self))] async fn verify_user_token( &self, request: Request, ) -> Result, Status> { info!("Received VerifyUserToken request: {:?}", request); let message = request.into_inner(); let token_valid = match self .client .get_access_token_data(message.user_id, message.signing_public_key) .await { Ok(Some(access_token_data)) => constant_time_eq( access_token_data.access_token.as_bytes(), message.access_token.as_bytes(), ), Ok(None) => false, Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(VerifyUserTokenResponse { token_valid }); info!("Sending VerifyUserToken response: {:?}", response); Ok(response) } #[instrument(skip(self))] async fn get_user_id( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let auth_type = match ProtoAuthType::from_i32(message.auth_type) { Some(ProtoAuthType::Password) => AuthType::Password, Some(ProtoAuthType::Wallet) => AuthType::Wallet, None => { error!( "Unable to parse AuthType from message: {}", message.auth_type ); return Err(Status::invalid_argument("invalid message")); } }; let user_id = match self .client .get_user_id_from_user_info(message.user_info, auth_type) .await { Ok(Some(user_id)) => user_id, Ok(None) => return Err(Status::not_found("no user ID found")), Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(GetUserIdResponse { user_id }); Ok(response) } #[instrument(skip(self))] async fn delete_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); match self.client.delete_user(message.user_id).await { Ok(_) => Ok(Response::new(DeleteUserResponse {})), Err(e) => Err(handle_db_error(e)), } } #[instrument(skip(self))] async fn compare_users( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let mut mysql_users_vec = message.users; let mut ddb_users_vec = match self.client.get_users().await { Ok(user_list) => user_list, Err(e) => return Err(handle_db_error(e)), }; // We use HashSets here for faster lookups let mysql_users_set = HashSet::::from_iter(mysql_users_vec.clone()); let ddb_users_set = HashSet::::from_iter(ddb_users_vec.clone()); ddb_users_vec.retain(|user| !mysql_users_set.contains(user)); mysql_users_vec.retain(|user| !ddb_users_set.contains(user)); Ok(Response::new(CompareUsersResponse { users_missing_from_keyserver: ddb_users_vec, users_missing_from_identity: mysql_users_vec, })) } #[instrument(skip(self))] async fn generate_nonce( &self, _request: Request, ) -> Result, Status> { let nonce_data = generate_nonce_data(&mut OsRng); match self .client .add_nonce_to_nonces_table(nonce_data.clone()) .await { Ok(_) => Ok(Response::new(GenerateNonceResponse { nonce: nonce_data.nonce, })), Err(e) => Err(handle_db_error(e)), } } #[instrument(skip(self))] async fn get_session_initialization_info( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); match self .client .get_session_initialization_info(&message.user_id) .await { Ok(Some(session_initialization_info)) => { let mut devices = HashMap::new(); for (device, info) in session_initialization_info { devices.insert(device, SessionInitializationInfo { info }); } Ok(Response::new(GetSessionInitializationInfoResponse { devices, })) } Ok(None) => return Err(Status::not_found("user not found")), Err(e) => Err(handle_db_error(e)), } } } async fn put_token_helper( client: DatabaseClient, auth_type: AuthType, user_id: &str, signing_public_key: &str, rng: &mut (impl Rng + CryptoRng), ) -> Result { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID \"{}\", signing public key \"{}\"", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } let access_token_data = AccessTokenData::new( user_id.to_string(), signing_public_key.to_string(), auth_type, rng, ); match client .put_access_token_data(access_token_data.clone()) .await { Ok(_) => Ok(access_token_data.access_token), Err(e) => Err(handle_db_error(e)), } } fn parse_and_verify_siwe_message( user_id: &str, signing_public_key: &str, siwe_message: &str, siwe_signature: Vec, ) -> Result<(), Status> { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID {}, signing public key {}", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } let siwe_message: Message = match siwe_message.parse() { Ok(m) => m, Err(e) => { error!("Failed to parse SIWE message: {}", e); return Err(Status::invalid_argument("invalid message")); } }; match siwe_message.verify( match siwe_signature.try_into() { Ok(s) => s, Err(e) => { error!("Conversion to SIWE signature failed: {:?}", e); return Err(Status::invalid_argument("invalid message")); } }, None, None, Some(&Utc::now()), ) { Err(e) => { error!( "Signature verification failed for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Status::unauthenticated("message not authenticated")) } Ok(_) => Ok(()), } } async fn wallet_login_helper( client: DatabaseClient, wallet_login_request: WalletLoginRequestStruct, rng: &mut (impl Rng + CryptoRng), ) -> Result { parse_and_verify_siwe_message( &wallet_login_request.user_id, &wallet_login_request.signing_public_key, &wallet_login_request.siwe_message, wallet_login_request.siwe_signature, )?; client .update_users_table( wallet_login_request.user_id.clone(), Some(wallet_login_request.signing_public_key.clone()), None, None, + Some( + &wallet_login_request + .session_initialization_info + .ok_or_else(|| Status::invalid_argument("Invalid message"))? + .info, + ), ) .await .map_err(handle_db_error)?; Ok(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token: put_token_helper( client, AuthType::Wallet, &wallet_login_request.user_id, &wallet_login_request.signing_public_key, rng, ) .await?, })), }) } async fn pake_login_start( client: DatabaseClient, user_id: &str, pake_credential_request: &[u8], ) -> Result { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration = match client.get_pake_registration(user_id.to_string()).await { Ok(Some(r)) => r, Ok(None) => { return Err(Status::not_found("user not found")); } Err(e) => return Err(handle_db_error(e)), }; let credential_request = CredentialRequest::deserialize(pake_credential_request).map_err(|e| { error!("Failed to deserialize credential request: {}", e); Status::invalid_argument("invalid message") })?; match ServerLogin::start( &mut OsRng, server_registration, CONFIG.server_keypair.private(), credential_request, ServerLoginStartParameters::default(), ) { Ok(server_login_start_result) => Ok(LoginResponseAndPakeState { response: PakeLoginResponseStruct { data: Some(PakeCredentialResponse( server_login_start_result.message.serialize().map_err(|e| { error!("Failed to serialize PAKE message: {}", e); Status::failed_precondition("internal error") })?, )), }, pake_state: server_login_start_result.state, }), Err(e) => { error!( "Encountered a PAKE protocol error when starting login: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_login_finish( user_id: &str, signing_public_key: &str, client: DatabaseClient, server_login: ServerLogin, pake_credential_finalization: &[u8], rng: &mut (impl Rng + CryptoRng), pake_workflow: PakeWorkflow, + session_initialization_info: &HashMap, ) -> Result { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID {}, signing public key {}", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } server_login .finish( CredentialFinalization::deserialize(pake_credential_finalization) .map_err(|e| { error!("Failed to deserialize credential finalization bytes: {}", e); Status::aborted("login failed") })?, ) .map_err(|e| { error!( "Encountered a PAKE protocol error when finishing login: {}", e ); Status::aborted("server error") })?; if matches!(pake_workflow, PakeWorkflow::Login) { client .update_users_table( user_id.to_string(), Some(signing_public_key.to_string()), None, None, + Some(session_initialization_info), ) .await .map_err(handle_db_error)?; } Ok(PakeLoginResponseStruct { data: Some(AccessToken( put_token_helper( client, AuthType::Password, user_id, signing_public_key, rng, ) .await?, )), }) } async fn pake_registration_start( rng: &mut (impl Rng + CryptoRng), registration_request_bytes: &[u8], ) -> Result { match ServerRegistration::::start( rng, PakeRegistrationRequest::deserialize(registration_request_bytes).unwrap(), CONFIG.server_keypair.public(), ) { Ok(server_registration_start_result) => { Ok(RegistrationResponseAndPakeState { response: RegistrationResponse { data: Some(PakeRegistrationResponse( server_registration_start_result.message.serialize(), )), }, pake_state: server_registration_start_result.state, }) } Err(e) => { error!( "Encountered a PAKE protocol error when starting registration: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_registration_finish( user_id: &str, client: DatabaseClient, registration_upload_bytes: &[u8], server_registration: Option>, username: &str, signing_public_key: &str, + session_initialization_info: &HashMap, ) -> Result<(), Status> { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration_finish_result = server_registration .ok_or_else(|| Status::aborted("registration failed"))? .finish( RegistrationUpload::deserialize(registration_upload_bytes).map_err( |e| { error!("Failed to deserialize registration upload bytes: {}", e); Status::aborted("registration failed") }, )?, ) .map_err(|e| { error!( "Encountered a PAKE protocol error when finishing registration: {}", e ); Status::aborted("server error") })?; match client .add_user_to_users_table( user_id.to_string(), server_registration_finish_result, username.to_string(), signing_public_key.to_string(), + session_initialization_info, ) .await { Ok(_) => Ok(()), Err(e) => Err(handle_db_error(e)), } } fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } } struct RegistrationResponseAndPakeState { response: RegistrationResponse, pake_state: ServerRegistration, } struct LoginResponseAndPakeState { response: PakeLoginResponseStruct, pake_state: ServerLogin, } diff --git a/services/identity/src/service/login.rs b/services/identity/src/service/login.rs index 8873b59e1..001be5f95 100644 --- a/services/identity/src/service/login.rs +++ b/services/identity/src/service/login.rs @@ -1,93 +1,99 @@ use super::*; pub struct LoginState { user_id: String, signing_public_key: String, pake_state: ServerLogin, + session_initialization_info: HashMap, } pub async fn handle_login_request( message: Option>, tx: mpsc::Sender>, client: DatabaseClient, ) -> Result, Status> { match message { Some(Ok(LoginRequest { data: Some(WalletLoginRequest(req)), })) => { let wallet_login_result = wallet_login_helper(client, req, &mut OsRng).await; if let Err(e) = tx.send(wallet_login_result).await { error!("Response was dropped: {}", e); Err(Status::aborted("failure")) } else { Ok(None) } } Some(Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialRequestAndUserId( pake_credential_request_and_user_id, )), })), })) => { let response_and_state = pake_login_start( client, &pake_credential_request_and_user_id.user_id, &pake_credential_request_and_user_id.pake_credential_request, ) .await?; let login_response = LoginResponse { data: Some(PakeLoginResponse(response_and_state.response)), }; if let Err(e) = tx.send(Ok(login_response)).await { error!("Response was dropped: {}", e); return Err(Status::aborted("failure")); } Ok(Some(LoginState { user_id: pake_credential_request_and_user_id.user_id, signing_public_key: pake_credential_request_and_user_id .signing_public_key, pake_state: response_and_state.pake_state, + session_initialization_info: pake_credential_request_and_user_id + .session_initialization_info + .ok_or_else(|| Status::invalid_argument("Invalid message"))? + .info, })) } Some(_) | None => Err(Status::aborted("failure")), } } pub async fn handle_credential_finalization( message: Option>, tx: mpsc::Sender>, client: DatabaseClient, login_state: LoginState, ) -> Result<(), Status> { match message { Some(Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialFinalization(pake_credential_finalization)), })), })) => { let login_finish_result = pake_login_finish( &login_state.user_id, &login_state.signing_public_key, client, login_state.pake_state, &pake_credential_finalization, &mut OsRng, PakeWorkflow::Login, + &login_state.session_initialization_info, ) .await .map(|pake_login_response| LoginResponse { data: Some(PakeLoginResponse(pake_login_response)), }); if let Err(e) = tx.send(login_finish_result).await { error!("Response was dropped: {}", e); return Err(Status::aborted("failure")); } Ok(()) } Some(_) | None => Err(Status::aborted("failure")), } } diff --git a/services/identity/src/service/registration.rs b/services/identity/src/service/registration.rs index ff9383120..f663b9b71 100644 --- a/services/identity/src/service/registration.rs +++ b/services/identity/src/service/registration.rs @@ -1,146 +1,153 @@ use super::*; pub struct RegistrationState { user_id: String, username: String, signing_public_key: String, pub pake_state: Option>, + session_initialization_info: HashMap, } pub async fn handle_registration_request( message: Option>, client: DatabaseClient, tx: mpsc::Sender>, ) -> Result { match message { Some(Ok(RegistrationRequest { data: Some(PakeRegistrationRequestAndUserId( pake_registration_request_and_user_id, )), })) => { let get_item_output = client .get_item_from_users_table( &pake_registration_request_and_user_id.user_id, ) .await; match get_item_output { Ok(GetItemOutput { item: Some(_), .. }) => { error!("User already exists"); if let Err(e) = tx .send(Err(Status::already_exists("User already exists"))) .await { error!("Response was dropped: {}", e); } } Err(e) => return Err(handle_db_error(e)), _ => {} }; let response_and_state = pake_registration_start( &mut OsRng, &pake_registration_request_and_user_id.pake_registration_request, ) .await?; if let Err(e) = tx.send(Ok(response_and_state.response)).await { error!("Response was dropped: {}", e); } Ok(RegistrationState { user_id: pake_registration_request_and_user_id.user_id, username: pake_registration_request_and_user_id.username, signing_public_key: pake_registration_request_and_user_id .signing_public_key, pake_state: Some(response_and_state.pake_state), + session_initialization_info: pake_registration_request_and_user_id + .session_initialization_info + .ok_or_else(|| Status::invalid_argument("Invalid message"))? + .info, }) } None | Some(_) => Err(Status::aborted("failure")), } } pub async fn handle_registration_upload_and_credential_request( message: Option>, tx: mpsc::Sender>, client: DatabaseClient, registration_state: &RegistrationState, pake_state: ServerRegistration, ) -> Result, Status> { match message { Some(Ok(RegistrationRequest { data: Some(PakeRegistrationUploadAndCredentialRequest( pake_registration_upload_and_credential_request, )), })) => { let response_and_state = match pake_registration_finish( ®istration_state.user_id, client.clone(), &pake_registration_upload_and_credential_request .pake_registration_upload, Some(pake_state), ®istration_state.username, ®istration_state.signing_public_key, + ®istration_state.session_initialization_info, ) .await { Ok(_) => { pake_login_start( client, ®istration_state.user_id, &pake_registration_upload_and_credential_request .pake_credential_request, ) .await? } Err(e) => { return Err(e); } }; let registration_response = RegistrationResponse { data: Some(PakeRegistrationLoginResponse(response_and_state.response)), }; if let Err(e) = tx.send(Ok(registration_response)).await { error!("Response was dropped: {}", e); Err(Status::aborted("failure")) } else { Ok(response_and_state.pake_state) } } None | Some(_) => Err(Status::aborted("failure")), } } pub async fn handle_credential_finalization( message: Option>, tx: mpsc::Sender>, client: DatabaseClient, registration_state: &RegistrationState, server_login: ServerLogin, ) -> Result<(), Status> { match message { Some(Ok(RegistrationRequest { data: Some(PakeRegistrationCredentialFinalization(pake_credential_finalization)), })) => { let login_finish_result = pake_login_finish( ®istration_state.user_id, ®istration_state.signing_public_key, client, server_login, &pake_credential_finalization, &mut OsRng, PakeWorkflow::Registration, + ®istration_state.session_initialization_info, ) .await .map(|pake_login_response| RegistrationResponse { data: Some(PakeRegistrationLoginResponse(pake_login_response)), }); if let Err(e) = tx.send(login_finish_result).await { error!("Response was dropped: {}", e); return Err(Status::aborted("failure")); } Ok(()) } Some(_) | None => Err(Status::aborted("failure")), } } diff --git a/shared/protos/identity.proto b/shared/protos/identity.proto index b9dd603df..f338b8f05 100644 --- a/shared/protos/identity.proto +++ b/shared/protos/identity.proto @@ -1,202 +1,211 @@ syntax = "proto3"; package identity; service IdentityService { // Called by user to register with the Identity Service (PAKE only) rpc RegisterUser(stream RegistrationRequest) returns (stream RegistrationResponse) {} // Called by user to create an active session and get an access token rpc LoginUser(stream LoginRequest) returns (stream LoginResponse) {} // Called by other services to verify a user's token rpc VerifyUserToken(VerifyUserTokenRequest) returns (VerifyUserTokenResponse) {} // Called by users and keyservers to get userID corresponding to a wallet // address or username rpc GetUserID(GetUserIDRequest) returns (GetUserIDResponse) {} rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse) {} // Called by Ashoat's keyserver with a list of user IDs in MySQL and returns: // 1. a list of user IDs that are in DynamoDB but not in the supplied list // 2. a list of user IDs that are in the supplied list but not in DynamoDB rpc CompareUsers(CompareUsersRequest) returns (CompareUsersResponse) {} // Called by clients to get a nonce for a Sign-In with Ethereum message rpc GenerateNonce(GenerateNonceRequest) returns (GenerateNonceResponse) {} // Called by clients to get session initialization info needed to open a new // channel of communication with a given user rpc GetSessionInitializationInfo(GetSessionInitializationInfoRequest) returns (GetSessionInitializationInfoResponse) {} } // Helper types message PakeRegistrationRequestAndUserID { string userID = 1; // ed25519 key for the given user's device string signingPublicKey = 2; // Message sent to initiate PAKE registration (step 1) bytes pakeRegistrationRequest = 3; string username = 4; + // Information specific to a user's device needed to open a new channel of + // communication with this user + SessionInitializationInfo sessionInitializationInfo = 5; } message PakeCredentialRequestAndUserID { string userID = 1; // ed25519 key for the given user's device string signingPublicKey = 2; // Message sent to initiate PAKE login (step 1) bytes pakeCredentialRequest = 3; + // Information specific to a user's device needed to open a new channel of + // communication with this user + SessionInitializationInfo sessionInitializationInfo = 4; } message PakeLoginRequest { oneof data { PakeCredentialRequestAndUserID pakeCredentialRequestAndUserID = 1; // Final message in PAKE login (step 3) bytes pakeCredentialFinalization = 2; } } message PakeLoginResponse { oneof data { // Answer sent to the user upon reception of the PAKE login attempt, // containing a sealed envelope with the user's private key (step 2) bytes pakeCredentialResponse = 1; string accessToken = 2; } } message PakeRegistrationUploadAndCredentialRequest { // Final message in PAKE registration, containing sealed cryptographic // identifiers (step 3) bytes pakeRegistrationUpload = 1; // Message sent to initiate PAKE login (Same as in // PakeCredentialRequestAndUserID) (step 1) bytes pakeCredentialRequest = 2; } message WalletLoginRequest { string userID = 1; // ed25519 key for the given user's device string signingPublicKey = 2; string siweMessage = 3; bytes siweSignature = 4; + // Information specific to a user's device needed to open a new channel of + // communication with this user + SessionInitializationInfo sessionInitializationInfo = 5; } message WalletLoginResponse { string accessToken = 1; } message SessionInitializationInfo { // Initially, the key-value pairs will be as follows: // payload -> stringified JSON containing primary and notification public keys // signature -> above payload signed with the signing ed25519 key // socialProof -> a signed message used for SIWE (optional) map info = 1; } // RegisterUser message RegistrationRequest { oneof data { PakeRegistrationRequestAndUserID pakeRegistrationRequestAndUserID = 1; // We combine the last step of PAKE registration with the first step of PAKE // login here to reduce the number of messages sent PakeRegistrationUploadAndCredentialRequest pakeRegistrationUploadAndCredentialRequest = 2; // Final message in PAKE login (Same as in PakeLoginRequest) (step 3) bytes pakeCredentialFinalization = 3; } } message RegistrationResponse { oneof data { // Answer sent to the user upon reception of the PAKE registration attempt // (step 2) bytes pakeRegistrationResponse = 1; PakeLoginResponse pakeLoginResponse = 2; } } // LoginUser message LoginRequest { oneof data { PakeLoginRequest pakeLoginRequest = 1; WalletLoginRequest walletLoginRequest = 2; } } message LoginResponse { oneof data { PakeLoginResponse pakeLoginResponse = 1; WalletLoginResponse walletLoginResponse = 2; } } // VerifyUserToken message VerifyUserTokenRequest { string userID = 1; // ed25519 key for the given user's device string signingPublicKey = 2; string accessToken = 3; } message VerifyUserTokenResponse { bool tokenValid = 1; } // GetUserID message GetUserIDRequest { enum AuthType { PASSWORD = 0; WALLET = 1; } AuthType authType = 1; string userInfo = 2; } message GetUserIDResponse { string userID = 1; } // DeleteUser message DeleteUserRequest { string userID = 1; } // Need to respond with a message to show success, an // empty reponse should work just fine message DeleteUserResponse {} // CompareUsers message CompareUsersRequest { repeated string users = 1; } message CompareUsersResponse { repeated string usersMissingFromKeyserver = 1; repeated string usersMissingFromIdentity = 2; } // GenerateNonce message GenerateNonceRequest { } message GenerateNonceResponse{ string nonce = 1; } // GetSessionInitializationInfo message GetSessionInitializationInfoRequest { string userID = 1; } message GetSessionInitializationInfoResponse { // Map is keyed on devices' public ed25519 key used for signing map devices = 1; }