diff --git a/native/cpp/CommonCpp/grpc/protos/identity.proto b/native/cpp/CommonCpp/grpc/protos/identity.proto index 9d15243aa..c35f5c575 100644 --- a/native/cpp/CommonCpp/grpc/protos/identity.proto +++ b/native/cpp/CommonCpp/grpc/protos/identity.proto @@ -1,121 +1,139 @@ syntax = "proto3"; package identity; service IdentityService { // Called by user to register with the Identity Service (PAKE only) rpc RegisterUser(stream RegistrationRequest) returns (stream RegistrationResponse) {} // Called by user to create an active session and get an access token rpc LoginUser(stream LoginRequest) returns (stream LoginResponse) {} // Called by other services to verify a user's token rpc VerifyUserToken(VerifyUserTokenRequest) returns (VerifyUserTokenResponse) {} + // Called by users and keyservers to get userID corresponding to a wallet + // address or username + rpc GetUserID(GetUserIDRequest) returns (GetUserIDResponse) {} } // Helper types message PakeRegistrationRequestAndUserID { string userID = 1; string deviceID = 2; // Message sent to initiate PAKE registration (step 1) bytes pakeRegistrationRequest = 3; string username = 4; string userPublicKey = 5; } message PakeCredentialRequestAndUserID { string userID = 1; string deviceID = 2; // Message sent to initiate PAKE login (step 1) bytes pakeCredentialRequest = 3; } message PakeLoginRequest { oneof data { PakeCredentialRequestAndUserID pakeCredentialRequestAndUserID = 1; // Final message in PAKE login (step 3) bytes pakeCredentialFinalization = 2; } } message PakeLoginResponse { oneof data { // Answer sent to the user upon reception of the PAKE login attempt, // containing a sealed envelope with the user's private key (step 2) bytes pakeCredentialResponse = 1; string accessToken = 2; } } message PakeRegistrationUploadAndCredentialRequest { // Final message in PAKE registration, containing sealed cryptographic // identifiers (step 3) bytes pakeRegistrationUpload = 1; // Message sent to initiate PAKE login (Same as in // PakeCredentialRequestAndUserID) (step 1) bytes pakeCredentialRequest = 2; } message WalletLoginRequest { string userID = 1; string deviceID = 2; string siweMessage = 3; bytes siweSignature = 4; string userPublicKey = 5; } message WalletLoginResponse { string accessToken = 1; } // RegisterUser message RegistrationRequest { oneof data { PakeRegistrationRequestAndUserID pakeRegistrationRequestAndUserID = 1; // We combine the last step of PAKE registration with the first step of PAKE // login here to reduce the number of messages sent PakeRegistrationUploadAndCredentialRequest pakeRegistrationUploadAndCredentialRequest = 2; // Final message in PAKE login (Same as in PakeLoginRequest) (step 3) bytes pakeCredentialFinalization = 3; } } message RegistrationResponse { oneof data { // Answer sent to the user upon reception of the PAKE registration attempt // (step 2) bytes pakeRegistrationResponse = 1; PakeLoginResponse pakeLoginResponse = 2; } } // LoginUser message LoginRequest { oneof data { PakeLoginRequest pakeLoginRequest = 1; WalletLoginRequest walletLoginRequest = 2; } } message LoginResponse { oneof data { PakeLoginResponse pakeLoginResponse = 1; WalletLoginResponse walletLoginResponse = 2; } } // VerifyUserToken message VerifyUserTokenRequest { string userID = 1; string deviceID = 2; string accessToken = 3; } message VerifyUserTokenResponse { bool tokenValid = 1; } + +// GetUserID + +message GetUserIDRequest { + enum AuthType { + PASSWORD = 0; + WALLET = 1; + } + AuthType authType = 1; + string userInfo = 2; +} + +message GetUserIDResponse { + string userID = 1; +} diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs index 85ac79a47..a17b0f9ec 100644 --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -1,668 +1,677 @@ use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::Utc; use constant_time_eq::constant_time_eq; use futures_core::Stream; use opaque_ke::{ CredentialFinalization, CredentialRequest, RegistrationRequest as PakeRegistrationRequest, ServerLogin, ServerLoginStartParameters, }; use opaque_ke::{RegistrationUpload, ServerRegistration}; use rand::rngs::OsRng; use rand::{CryptoRng, Rng}; use siwe::Message; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{error, info, instrument}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; use crate::database::DatabaseClient; use crate::opaque::Cipher; use crate::token::{AccessTokenData, AuthType}; use crate::{config::Config, database::Error}; pub use proto::identity_service_server::IdentityServiceServer; use proto::{ identity_service_server::IdentityService, login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as PakeRegistrationCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as PakeRegistrationLoginResponse, - registration_response::Data::PakeRegistrationResponse, LoginRequest, - LoginResponse, PakeLoginRequest as PakeLoginRequestStruct, + registration_response::Data::PakeRegistrationResponse, GetUserIdRequest, + GetUserIdResponse, LoginRequest, LoginResponse, + PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, RegistrationRequest, RegistrationResponse, VerifyUserTokenRequest, VerifyUserTokenResponse, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; mod proto { tonic::include_proto!("identity"); } #[derive(Debug)] enum PakeWorkflow { Registration, Login, } #[derive(derive_more::Constructor)] pub struct MyIdentityService { config: Config, client: DatabaseClient, } #[tonic::async_trait] impl IdentityService for MyIdentityService { type RegisterUserStream = Pin< Box< dyn Stream> + Send + 'static, >, >; #[instrument(skip(self))] async fn register_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let config = self.config.clone(); let client = self.client.clone(); tokio::spawn(async move { let mut user_id: String = String::new(); let mut device_id: String = String::new(); let mut server_registration: Option> = None; let mut server_login: Option> = None; let mut num_messages_received = 0; while let Some(message) = in_stream.next().await { match message { Ok(RegistrationRequest { data: Some(PakeRegistrationRequestAndUserId( pake_registration_request_and_user_id, )), }) => { let registration_start_result = pake_registration_start( config.clone(), &mut OsRng, &pake_registration_request_and_user_id.pake_registration_request, num_messages_received, ) .await .map(|registration_response_and_server_registration| { server_registration = Some(registration_response_and_server_registration.1); registration_response_and_server_registration.0 }); if let Err(e) = tx.send(registration_start_result).await { error!("Response was dropped: {}", e); break; } user_id = pake_registration_request_and_user_id.user_id; device_id = pake_registration_request_and_user_id.device_id; } Ok(RegistrationRequest { data: Some(PakeRegistrationUploadAndCredentialRequest( pake_registration_upload_and_credential_request, )), }) => { let registration_finish_and_login_start_result = match pake_registration_finish( &user_id, client.clone(), &pake_registration_upload_and_credential_request .pake_registration_upload, server_registration, num_messages_received, ) .await { Ok(_) => pake_login_start( config.clone(), client.clone(), &user_id.clone(), &pake_registration_upload_and_credential_request .pake_credential_request, num_messages_received, PakeWorkflow::Registration, ) .await .map(|pake_login_response_and_server_login| { server_login = Some(pake_login_response_and_server_login.1); RegistrationResponse { data: Some(PakeRegistrationLoginResponse( pake_login_response_and_server_login.0, )), } }), Err(e) => Err(e), }; if let Err(e) = tx.send(registration_finish_and_login_start_result).await { error!("Response was dropped: {}", e); break; } server_registration = None; } Ok(RegistrationRequest { data: Some(PakeRegistrationCredentialFinalization( pake_credential_finalization, )), }) => { let login_finish_result = pake_login_finish( &user_id, &device_id, client, server_login, &pake_credential_finalization, &mut OsRng, num_messages_received, PakeWorkflow::Registration, ) .await .map(|pake_login_response| RegistrationResponse { data: Some(PakeRegistrationLoginResponse(pake_login_response)), }); if let Err(e) = tx.send(login_finish_result).await { error!("Response was dropped: {}", e); } break; } unexpected => { error!("Received an unexpected Result: {:?}", unexpected); if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await { error!("Response was dropped: {}", e); } break; } } num_messages_received += 1; } }); let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::RegisterUserStream )) } type LoginUserStream = Pin> + Send + 'static>>; #[instrument(skip(self))] async fn login_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let config = self.config.clone(); let client = self.client.clone(); tokio::spawn(async move { let mut user_id: String = String::new(); let mut device_id: String = String::new(); let mut server_login: Option> = None; let mut num_messages_received = 0; while let Some(message) = in_stream.next().await { match message { Ok(LoginRequest { data: Some(WalletLoginRequest(req)), }) => { let wallet_login_result = wallet_login_helper( client, req, &mut OsRng, num_messages_received, ) .await; if let Err(e) = tx.send(wallet_login_result).await { error!("Response was dropped: {}", e); } break; } Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialRequestAndUserId( pake_credential_request_and_user_id, )), })), }) => { let login_start_result = pake_login_start( config.clone(), client.clone(), &pake_credential_request_and_user_id.user_id, &pake_credential_request_and_user_id.pake_credential_request, num_messages_received, PakeWorkflow::Login, ) .await .map(|pake_login_response_and_server_login| { server_login = Some(pake_login_response_and_server_login.1); LoginResponse { data: Some(PakeLoginResponse( pake_login_response_and_server_login.0, )), } }); if let Err(e) = tx.send(login_start_result).await { error!("Response was dropped: {}", e); break; } user_id = pake_credential_request_and_user_id.user_id; device_id = pake_credential_request_and_user_id.device_id; } Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialFinalization(pake_credential_finalization)), })), }) => { let login_finish_result = pake_login_finish( &user_id, &device_id, client, server_login, &pake_credential_finalization, &mut OsRng, num_messages_received, PakeWorkflow::Login, ) .await .map(|pake_login_response| LoginResponse { data: Some(PakeLoginResponse(pake_login_response)), }); if let Err(e) = tx.send(login_finish_result).await { error!("Response was dropped: {}", e); } break; } unexpected => { error!("Received an unexpected Result: {:?}", unexpected); if let Err(e) = tx.send(Err(Status::unknown("unknown error"))).await { error!("Response was dropped: {}", e); } break; } } num_messages_received += 1; } }); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) } #[instrument(skip(self))] async fn verify_user_token( &self, request: Request, ) -> Result, Status> { info!("Received VerifyUserToken request: {:?}", request); let message = request.into_inner(); let token_valid = match self .client .get_access_token_data(message.user_id, message.device_id) .await { Ok(Some(access_token_data)) => constant_time_eq( access_token_data.access_token.as_bytes(), message.access_token.as_bytes(), ), Ok(None) => false, Err(Error::AwsSdk(DynamoDBError::InternalServerError(_))) | Err(Error::AwsSdk( DynamoDBError::ProvisionedThroughputExceededException(_), )) | Err(Error::AwsSdk(DynamoDBError::RequestLimitExceeded(_))) => { return Err(Status::unavailable("please retry")) } Err(e) => { error!("Encountered an unexpected error: {}", e); return Err(Status::failed_precondition("unexpected error")); } }; let response = Response::new(VerifyUserTokenResponse { token_valid }); info!("Sending VerifyUserToken response: {:?}", response); Ok(response) } + + #[instrument(skip(self))] + async fn get_user_id( + &self, + request: Request, + ) -> Result, Status> { + unimplemented!(); + } } async fn put_token_helper( client: DatabaseClient, auth_type: AuthType, user_id: &str, device_id: &str, rng: &mut (impl Rng + CryptoRng), ) -> Result { if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID \"{}\", device ID \"{}\"", user_id, device_id ); return Err(Status::aborted("user not found")); } let access_token_data = AccessTokenData::new( user_id.to_string(), device_id.to_string(), auth_type.clone(), rng, ); match client .put_access_token_data(access_token_data.clone()) .await { Ok(_) => Ok(access_token_data.access_token), Err(Error::AwsSdk(DynamoDBError::InternalServerError(_))) | Err(Error::AwsSdk( DynamoDBError::ProvisionedThroughputExceededException(_), )) | Err(Error::AwsSdk(DynamoDBError::RequestLimitExceeded(_))) => { Err(Status::unavailable("please retry")) } Err(e) => { error!("Encountered an unexpected error: {}", e); Err(Status::failed_precondition("unexpected error")) } } } fn parse_and_verify_siwe_message( user_id: &str, device_id: &str, siwe_message: &str, siwe_signature: Vec, ) -> Result<(), Status> { if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID {}, device ID {}", user_id, device_id ); return Err(Status::aborted("user not found")); } let siwe_message: Message = match siwe_message.parse() { Ok(m) => m, Err(e) => { error!("Failed to parse SIWE message: {}", e); return Err(Status::invalid_argument("invalid message")); } }; match siwe_message.verify( match siwe_signature.try_into() { Ok(s) => s, Err(e) => { error!("Conversion to SIWE signature failed: {:?}", e); return Err(Status::invalid_argument("invalid message")); } }, None, None, Some(&Utc::now()), ) { Err(e) => { error!( "Signature verification failed for user {} on device {}: {}", user_id, device_id, e ); Err(Status::unauthenticated("message not authenticated")) } Ok(_) => Ok(()), } } async fn wallet_login_helper( client: DatabaseClient, wallet_login_request: WalletLoginRequestStruct, rng: &mut (impl Rng + CryptoRng), num_messages_received: u8, ) -> Result { if num_messages_received != 0 { error!("Too many messages received in stream, aborting"); return Err(Status::aborted("please retry")); } match parse_and_verify_siwe_message( &wallet_login_request.user_id, &wallet_login_request.device_id, &wallet_login_request.siwe_message, wallet_login_request.siwe_signature, ) { Ok(()) => Ok(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token: put_token_helper( client, AuthType::Wallet, &wallet_login_request.user_id, &wallet_login_request.device_id, rng, ) .await?, })), }), Err(e) => Err(e), } } async fn pake_login_start( config: Config, client: DatabaseClient, user_id: &str, pake_credential_request: &[u8], num_messages_received: u8, pake_workflow: PakeWorkflow, ) -> Result<(PakeLoginResponseStruct, ServerLogin), Status> { if (num_messages_received != 0 && matches!(pake_workflow, PakeWorkflow::Login)) || (num_messages_received != 1 && matches!(pake_workflow, PakeWorkflow::Registration)) { error!("Too many messages received in stream, aborting"); return Err(Status::aborted("please retry")); } if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration = match client.get_pake_registration(user_id.to_string()).await { Ok(Some(r)) => r, Ok(None) => { return Err(Status::not_found("user not found")); } Err(Error::AwsSdk(DynamoDBError::InternalServerError(_))) | Err(Error::AwsSdk( DynamoDBError::ProvisionedThroughputExceededException(_), )) | Err(Error::AwsSdk(DynamoDBError::RequestLimitExceeded(_))) => { return Err(Status::unavailable("please retry")) } Err(e) => { error!("Encountered an unexpected error: {}", e); return Err(Status::failed_precondition("unexpected error")); } }; let credential_request = CredentialRequest::deserialize(pake_credential_request).map_err(|e| { error!("Failed to deserialize credential request: {}", e); Status::invalid_argument("invalid message") })?; match ServerLogin::start( &mut OsRng, server_registration, config.server_keypair.private(), credential_request, ServerLoginStartParameters::default(), ) { Ok(server_login_start_result) => Ok(( PakeLoginResponseStruct { data: Some(PakeCredentialResponse( server_login_start_result.message.serialize().map_err(|e| { error!("Failed to serialize PAKE message: {}", e); Status::failed_precondition("internal error") })?, )), }, server_login_start_result.state, )), Err(e) => { error!( "Encountered a PAKE protocol error when starting login: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_login_finish( user_id: &str, device_id: &str, client: DatabaseClient, server_login: Option>, pake_credential_finalization: &[u8], rng: &mut (impl Rng + CryptoRng), num_messages_received: u8, pake_workflow: PakeWorkflow, ) -> Result { if (num_messages_received != 1 && matches!(pake_workflow, PakeWorkflow::Login)) || (num_messages_received != 2 && matches!(pake_workflow, PakeWorkflow::Registration)) { error!("Too many messages received in stream, aborting"); return Err(Status::aborted("please retry")); } if user_id.is_empty() || device_id.is_empty() { error!( "Incomplete data: user ID {}, device ID {}", user_id, device_id ); return Err(Status::aborted("user not found")); } match server_login .ok_or_else(|| { error!("Server login missing in {:?} PAKE workflow", pake_workflow); Status::aborted("login failed") })? .finish( CredentialFinalization::deserialize(pake_credential_finalization) .map_err(|e| { error!("Failed to deserialize credential finalization bytes: {}", e); Status::aborted("login failed") })?, ) { Ok(_) => Ok(PakeLoginResponseStruct { data: Some(AccessToken( put_token_helper(client, AuthType::Password, user_id, device_id, rng) .await?, )), }), Err(e) => { error!( "Encountered a PAKE protocol error when finishing login: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_registration_start( config: Config, rng: &mut (impl Rng + CryptoRng), registration_request_bytes: &[u8], num_messages_received: u8, ) -> Result<(RegistrationResponse, ServerRegistration), Status> { if num_messages_received != 0 { error!("Too many messages received in stream, aborting"); return Err(Status::aborted("please retry")); } match ServerRegistration::::start( rng, PakeRegistrationRequest::deserialize(registration_request_bytes).unwrap(), config.server_keypair.public(), ) { Ok(server_registration_start_result) => Ok(( RegistrationResponse { data: Some(PakeRegistrationResponse( server_registration_start_result.message.serialize(), )), }, server_registration_start_result.state, )), Err(e) => { error!( "Encountered a PAKE protocol error when starting registration: {}", e ); Err(Status::aborted("server error")) } } } async fn pake_registration_finish( user_id: &str, client: DatabaseClient, registration_upload_bytes: &[u8], server_registration: Option>, num_messages_received: u8, ) -> Result<(), Status> { if num_messages_received != 1 { error!("Too many messages received in stream, aborting"); return Err(Status::aborted("please retry")); } if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration_finish_result = server_registration .ok_or_else(|| Status::aborted("registration failed"))? .finish( RegistrationUpload::deserialize(registration_upload_bytes).map_err( |e| { error!("Failed to deserialize registration upload bytes: {}", e); Status::aborted("registration failed") }, )?, ) .map_err(|e| { error!( "Encountered a PAKE protocol error when finishing registration: {}", e ); Status::aborted("server error") })?; match client .put_pake_registration( user_id.to_string(), server_registration_finish_result, ) .await { Ok(_) => Ok(()), Err(Error::AwsSdk(DynamoDBError::InternalServerError(_))) | Err(Error::AwsSdk( DynamoDBError::ProvisionedThroughputExceededException(_), )) | Err(Error::AwsSdk(DynamoDBError::RequestLimitExceeded(_))) => { Err(Status::unavailable("please retry")) } Err(_) => Err(Status::failed_precondition("internal error")), } }