diff --git a/keyserver/addons/rust-node-addon/src/identity_client/login_user.rs b/keyserver/addons/rust-node-addon/src/identity_client/login_user.rs index dbd824737..115fbf794 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client/login_user.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client/login_user.rs @@ -1,203 +1,208 @@ use super::*; #[napi] #[instrument(skip_all)] async fn login_user_wallet( user_id: String, signing_public_key: String, siwe_message: String, siwe_signature: String, mut session_initialization_info: HashMap, social_proof: String, ) -> Result { let channel = get_identity_service_channel().await?; let token: MetadataValue<_> = IDENTITY_SERVICE_CONFIG .identity_auth_token .parse() .map_err(|_| Error::from_status(Status::GenericFailure))?; let mut identity_client = IdentityServiceClient::with_interceptor(channel, |mut req: Request<()>| { req.metadata_mut().insert("authorization", token.clone()); Ok(req) }); // Create a LoginRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); let mut response_stream = identity_client .login_user(request) .await .map_err(|_| Error::from_status(Status::GenericFailure))? .into_inner(); // Start wallet login on client and send initial login request to Identity // service session_initialization_info.insert("socialProof".to_string(), social_proof); let login_request = LoginRequest { data: Some(WalletLoginRequest(WalletLoginRequestStruct { user_id, signing_public_key, siwe_message, siwe_signature, session_initialization_info: Some(SessionInitializationInfo { info: session_initialization_info, }), })), }; if let Err(e) = tx.send(login_request).await { error!("Response was dropped: {}", e); return Err(Error::from_status(Status::GenericFailure)); } // Return access token let message = response_stream.message().await.map_err(|e| { error!("Received an error from inbound message stream: {}", e); Error::from_status(Status::GenericFailure) })?; get_wallet_access_token(message) } #[napi] #[instrument(skip_all)] async fn login_user_pake( user_id: String, signing_public_key: String, password: String, session_initialization_info: HashMap, ) -> Result { let channel = get_identity_service_channel().await?; let token: MetadataValue<_> = IDENTITY_SERVICE_CONFIG .identity_auth_token .parse() .map_err(|_| Error::from_status(Status::GenericFailure))?; let mut identity_client = IdentityServiceClient::with_interceptor(channel, |mut req: Request<()>| { req.metadata_mut().insert("authorization", token.clone()); Ok(req) }); // Create a LoginRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = identity_client .login_user(request) .await .map_err(|_| Error::from_status(Status::GenericFailure))? .into_inner(); // Start PAKE login on client and send initial login request to Identity // service let mut client_rng = OsRng; let client_login_start_result = pake_login_start(&mut client_rng, &password)?; let pake_credential_request = client_login_start_result.message.serialize().map_err(|e| { error!("Could not serialize credential request: {}", e); Error::new(Status::GenericFailure, e.to_string()) })?; let login_request = LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialRequestAndUserId( PakeCredentialRequestAndUserIdStruct { user_id, signing_public_key, pake_credential_request, session_initialization_info: Some(SessionInitializationInfo { info: session_initialization_info, }), }, )), })), }; send_to_mpsc(tx.clone(), login_request).await?; // Handle responses from Identity service sequentially, making sure we get // messages in the correct order // Finish PAKE login; send final login request to Identity service let message = response.message().await.map_err(|e| { error!("Received an error from inbound message stream: {}", e); - Error::from_status(Status::GenericFailure) + match e.code() { + Code::NotFound => { + Error::new(Status::InvalidArg, "user not found".to_string()) + } + _ => Error::new(Status::GenericFailure, e.to_string()), + } })?; handle_login_credential_response( message, client_login_start_result.state, tx, ) .await?; // Return access token let message = response.message().await.map_err(|e| { error!("Received an error from inbound message stream: {}", e); Error::from_status(Status::GenericFailure) })?; handle_login_token_response(message) } async fn handle_login_credential_response( message: Option, client_login: ClientLogin, tx: mpsc::Sender, ) -> Result<(), Status> { if let Some(LoginResponse { data: Some(LoginPakeLoginResponse(PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response_bytes)), })), }) = message { let credential_finalization_bytes = pake_login_finish(&credential_response_bytes, client_login)? .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Error::from_status(Status::GenericFailure) })?; let login_request = LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(LoginPakeCredentialFinalization( credential_finalization_bytes, )), })), }; send_to_mpsc(tx, login_request).await } else { Err(handle_unexpected_response(message)) } } fn handle_login_token_response( message: Option, ) -> Result { if let Some(LoginResponse { data: Some(LoginPakeLoginResponse(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } fn get_wallet_access_token( message: Option, ) -> Result { if let Some(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } diff --git a/keyserver/addons/rust-node-addon/src/identity_client/mod.rs b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs index 385c560c8..4468f6f87 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client/mod.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs @@ -1,138 +1,138 @@ pub mod delete_user; pub mod login_user; pub mod register_user; pub mod identity { tonic::include_proto!("identity"); } pub mod update_user; use comm_opaque::Cipher; use identity::identity_service_client::IdentityServiceClient; use identity::{ login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse as LoginPakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization as LoginPakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as RegistrationPakeCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as RegistrationPakeLoginResponse, registration_response::Data::PakeRegistrationResponse, DeleteUserRequest, LoginRequest, LoginResponse, PakeCredentialRequestAndUserId as PakeCredentialRequestAndUserIdStruct, PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, PakeRegistrationRequestAndUserId as PakeRegistrationRequestAndUserIdStruct, PakeRegistrationUploadAndCredentialRequest as PakeRegistrationUploadAndCredentialRequestStruct, RegistrationRequest, RegistrationResponse as RegistrationResponseMessage, SessionInitializationInfo, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; use lazy_static::lazy_static; use napi::bindgen_prelude::*; use opaque_ke::{ ClientLogin, ClientLoginFinishParameters, ClientLoginStartParameters, ClientLoginStartResult, ClientRegistration, ClientRegistrationFinishParameters, CredentialFinalization, CredentialResponse, RegistrationResponse, RegistrationUpload, }; use rand::{rngs::OsRng, CryptoRng, Rng}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env::var; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tonic::{metadata::MetadataValue, transport::Channel, Request}; +use tonic::{metadata::MetadataValue, transport::Channel, Code, Request}; use tracing::{error, instrument}; lazy_static! { static ref IDENTITY_SERVICE_CONFIG: IdentityServiceConfig = { let config_json_string = var("COMM_JSONCONFIG_secrets_identity_service_config"); match config_json_string { Ok(json) => serde_json::from_str(&json).unwrap(), Err(_) => IdentityServiceConfig::default(), } }; } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct IdentityServiceConfig { identity_socket_addr: String, identity_auth_token: String, } impl Default for IdentityServiceConfig { fn default() -> Self { Self { identity_socket_addr: "https://[::1]:50051".to_string(), identity_auth_token: "test".to_string(), } } } fn handle_unexpected_response(message: Option) -> Error { error!("Received an unexpected message: {:?}", message); Error::from_status(Status::GenericFailure) } async fn send_to_mpsc(tx: mpsc::Sender, request: T) -> Result<()> { if let Err(e) = tx.send(request).await { error!("Response was dropped: {}", e); return Err(Error::from_status(Status::GenericFailure)); } Ok(()) } fn pake_login_start( rng: &mut (impl Rng + CryptoRng), password: &str, ) -> Result> { ClientLogin::::start( rng, password.as_bytes(), ClientLoginStartParameters::default(), ) .map_err(|e| { error!("Failed to start PAKE login: {}", e); Error::from_status(Status::GenericFailure) }) } fn pake_login_finish( credential_response_bytes: &[u8], client_login: ClientLogin, ) -> Result> { client_login .finish( CredentialResponse::deserialize(credential_response_bytes).map_err( |e| { error!("Could not deserialize credential response bytes: {}", e); Error::from_status(Status::GenericFailure) }, )?, ClientLoginFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE login: {}", e); Error::from_status(Status::GenericFailure) }) .map(|res| res.message) } async fn get_identity_service_channel() -> Result { Channel::from_static(&IDENTITY_SERVICE_CONFIG.identity_socket_addr) .connect() .await .map_err(|_| { Error::new( Status::GenericFailure, "Unable to connect to identity service".to_string(), ) }) } diff --git a/keyserver/src/responders/user-responders.js b/keyserver/src/responders/user-responders.js index 58c1cc12c..f6cca5239 100644 --- a/keyserver/src/responders/user-responders.js +++ b/keyserver/src/responders/user-responders.js @@ -1,674 +1,686 @@ // @flow import invariant from 'invariant'; import { getRustAPI } from 'rust-node-addon'; import { ErrorTypes, SiweMessage } from 'siwe'; import t from 'tcomb'; import bcrypt from 'twin-bcrypt'; import { baseLegalPolicies, policies } from 'lib/facts/policies.js'; import { hasMinCodeVersion } from 'lib/shared/version-utils.js'; import type { ResetPasswordRequest, LogOutResponse, DeleteAccountRequest, RegisterResponse, RegisterRequest, LogInResponse, LogInRequest, UpdatePasswordRequest, UpdateUserSettingsRequest, PolicyAcknowledgmentRequest, } from 'lib/types/account-types.js'; import { userSettingsTypes, notificationTypeValues, logInActionSources, } from 'lib/types/account-types.js'; import type { IdentityKeysBlob, SignedIdentityKeysBlob, } from 'lib/types/crypto-types.js'; import type { CalendarQuery } from 'lib/types/entry-types.js'; import { defaultNumberPerThread } from 'lib/types/message-types.js'; import type { SIWEAuthRequest, SIWEMessage, SIWESocialProof, } from 'lib/types/siwe-types.js'; import type { SubscriptionUpdateRequest, SubscriptionUpdateResponse, } from 'lib/types/subscription-types.js'; import type { PasswordUpdate } from 'lib/types/user-types.js'; import { identityKeysBlobValidator, signedIdentityKeysBlobValidator, } from 'lib/utils/crypto-utils.js'; import { ServerError } from 'lib/utils/errors.js'; import { values } from 'lib/utils/objects.js'; import { promiseAll } from 'lib/utils/promises.js'; import { getPublicKeyFromSIWEStatement, isValidSIWEMessage, isValidSIWEStatementWithPublicKey, primaryIdentityPublicKeyRegex, } from 'lib/utils/siwe-utils.js'; import { tShape, tPlatformDetails, tPassword, tEmail, tOldValidUsername, tRegex, } from 'lib/utils/validation-utils.js'; import { entryQueryInputValidator, newEntryQueryInputValidator, normalizeCalendarQuery, verifyCalendarQueryThreadIDs, } from './entry-responders.js'; import { handleAsyncPromise } from './handlers.js'; import { createAccount, processSIWEAccountCreation, } from '../creators/account-creator.js'; import { dbQuery, SQL } from '../database/database.js'; import { deleteAccount } from '../deleters/account-deleters.js'; import { deleteCookie } from '../deleters/cookie-deleters.js'; import { checkAndInvalidateSIWENonceEntry } from '../deleters/siwe-nonce-deleters.js'; import { fetchEntryInfos } from '../fetchers/entry-fetchers.js'; import { fetchMessageInfos } from '../fetchers/message-fetchers.js'; import { fetchNotAcknowledgedPolicies } from '../fetchers/policy-acknowledgment-fetchers.js'; import { fetchThreadInfos } from '../fetchers/thread-fetchers.js'; import { fetchKnownUserInfos, fetchLoggedInUserInfo, fetchUserIDForEthereumAddress, } from '../fetchers/user-fetchers.js'; import { createNewAnonymousCookie, createNewUserCookie, setNewSession, } from '../session/cookies.js'; import type { Viewer } from '../session/viewer.js'; import { accountUpdater, checkAndSendVerificationEmail, checkAndSendPasswordResetEmail, updatePassword, updateUserSettings, } from '../updaters/account-updaters.js'; import { userSubscriptionUpdater } from '../updaters/user-subscription-updaters.js'; import { viewerAcknowledgmentUpdater } from '../updaters/viewer-acknowledgment-updater.js'; import { getOLMUtility } from '../utils/olm-utils.js'; import type { OLMUtility } from '../utils/olm-utils.js'; import { validateInput } from '../utils/validation-utils.js'; const subscriptionUpdateRequestInputValidator = tShape({ threadID: t.String, updatedFields: tShape({ pushNotifs: t.maybe(t.Boolean), home: t.maybe(t.Boolean), }), }); async function userSubscriptionUpdateResponder( viewer: Viewer, input: any, ): Promise { const request: SubscriptionUpdateRequest = input; await validateInput(viewer, subscriptionUpdateRequestInputValidator, request); const threadSubscription = await userSubscriptionUpdater(viewer, request); return { threadSubscription }; } const accountUpdateInputValidator = tShape({ updatedFields: tShape({ email: t.maybe(tEmail), password: t.maybe(tPassword), }), currentPassword: tPassword, }); async function passwordUpdateResponder( viewer: Viewer, input: any, ): Promise { const request: PasswordUpdate = input; await validateInput(viewer, accountUpdateInputValidator, request); await accountUpdater(viewer, request); } async function sendVerificationEmailResponder(viewer: Viewer): Promise { await validateInput(viewer, null, null); await checkAndSendVerificationEmail(viewer); } const resetPasswordRequestInputValidator = tShape({ usernameOrEmail: t.union([tEmail, tOldValidUsername]), }); async function sendPasswordResetEmailResponder( viewer: Viewer, input: any, ): Promise { const request: ResetPasswordRequest = input; await validateInput(viewer, resetPasswordRequestInputValidator, request); await checkAndSendPasswordResetEmail(request); } async function logOutResponder(viewer: Viewer): Promise { await validateInput(viewer, null, null); if (viewer.loggedIn) { const [anonymousViewerData] = await Promise.all([ createNewAnonymousCookie({ platformDetails: viewer.platformDetails, deviceToken: viewer.deviceToken, }), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(anonymousViewerData); } return { currentUserInfo: { id: viewer.id, anonymous: true, }, }; } const deleteAccountRequestInputValidator = tShape({ password: t.maybe(tPassword), }); async function accountDeletionResponder( viewer: Viewer, input: any, ): Promise { const request: DeleteAccountRequest = input; await validateInput(viewer, deleteAccountRequestInputValidator, request); const result = await deleteAccount(viewer, request); invariant(result, 'deleteAccount should return result if handed request'); return result; } const deviceTokenUpdateRequestInputValidator = tShape({ deviceType: t.maybe(t.enums.of(['ios', 'android'])), deviceToken: t.String, }); const registerRequestInputValidator = tShape({ username: t.String, email: t.maybe(tEmail), password: tPassword, calendarQuery: t.maybe(newEntryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, // We include `primaryIdentityPublicKey` to avoid breaking // old clients, but we no longer do anything with it. primaryIdentityPublicKey: t.maybe(tRegex(primaryIdentityPublicKeyRegex)), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), }); async function accountCreationResponder( viewer: Viewer, input: any, ): Promise { const request: RegisterRequest = input; await validateInput(viewer, registerRequestInputValidator, request); const { signedIdentityKeysBlob } = request; if (signedIdentityKeysBlob) { const identityKeys: IdentityKeysBlob = JSON.parse( signedIdentityKeysBlob.payload, ); if (!identityKeysBlobValidator.is(identityKeys)) { throw new ServerError('invalid_identity_keys_blob'); } const olmUtil: OLMUtility = getOLMUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } return await createAccount(viewer, request); } type ProcessSuccessfulLoginParams = { +viewer: Viewer, +input: any, +userID: string, +calendarQuery: ?CalendarQuery, +socialProof?: ?SIWESocialProof, +signedIdentityKeysBlob?: ?SignedIdentityKeysBlob, }; async function processSuccessfulLogin( params: ProcessSuccessfulLoginParams, ): Promise { const { viewer, input, userID, calendarQuery, socialProof, signedIdentityKeysBlob, } = params; const request: LogInRequest = input; const newServerTime = Date.now(); const deviceToken = request.deviceTokenUpdateRequest ? request.deviceTokenUpdateRequest.deviceToken : viewer.deviceToken; const [userViewerData, notAcknowledgedPolicies] = await Promise.all([ createNewUserCookie(userID, { platformDetails: request.platformDetails, deviceToken, socialProof, signedIdentityKeysBlob, }), fetchNotAcknowledgedPolicies(userID, baseLegalPolicies), deleteCookie(viewer.cookieID), ]); viewer.setNewCookie(userViewerData); if ( notAcknowledgedPolicies.length && hasMinCodeVersion(viewer.platformDetails, 181) ) { const currentUserInfo = await fetchLoggedInUserInfo(viewer); return { notAcknowledgedPolicies, currentUserInfo: currentUserInfo, rawMessageInfos: [], truncationStatuses: {}, userInfos: [], rawEntryInfos: [], serverTime: 0, cookieChange: { threadInfos: {}, userInfos: [], }, }; } if (calendarQuery) { await setNewSession(viewer, calendarQuery, newServerTime); } const threadCursors = {}; for (const watchedThreadID of request.watchedIDs) { threadCursors[watchedThreadID] = null; } const messageSelectionCriteria = { threadCursors, joinedThreads: true }; const [ threadsResult, messagesResult, entriesResult, userInfos, currentUserInfo, ] = await Promise.all([ fetchThreadInfos(viewer), fetchMessageInfos(viewer, messageSelectionCriteria, defaultNumberPerThread), calendarQuery ? fetchEntryInfos(viewer, [calendarQuery]) : undefined, fetchKnownUserInfos(viewer), fetchLoggedInUserInfo(viewer), ]); const rawEntryInfos = entriesResult ? entriesResult.rawEntryInfos : null; const response: LogInResponse = { currentUserInfo, rawMessageInfos: messagesResult.rawMessageInfos, truncationStatuses: messagesResult.truncationStatuses, serverTime: newServerTime, userInfos: values(userInfos), cookieChange: { threadInfos: threadsResult.threadInfos, userInfos: [], }, }; if (rawEntryInfos) { return { ...response, rawEntryInfos, }; } return response; } const logInRequestInputValidator = tShape({ username: t.maybe(t.String), usernameOrEmail: t.maybe(t.union([tEmail, tOldValidUsername])), password: tPassword, watchedIDs: t.list(t.String), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, source: t.maybe(t.enums.of(values(logInActionSources))), // We include `primaryIdentityPublicKey` to avoid breaking // old clients, but we no longer do anything with it. primaryIdentityPublicKey: t.maybe(tRegex(primaryIdentityPublicKeyRegex)), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), }); async function logInResponder( viewer: Viewer, input: any, ): Promise { await validateInput(viewer, logInRequestInputValidator, input); const request: LogInRequest = input; let identityKeys: ?IdentityKeysBlob; const { signedIdentityKeysBlob } = request; if (signedIdentityKeysBlob) { identityKeys = JSON.parse(signedIdentityKeysBlob.payload); const olmUtil: OLMUtility = getOLMUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } const calendarQuery = request.calendarQuery ? normalizeCalendarQuery(request.calendarQuery) : null; const promises = {}; if (calendarQuery) { promises.verifyCalendarQueryThreadIDs = verifyCalendarQueryThreadIDs(calendarQuery); } const username = request.username ?? request.usernameOrEmail; if (!username) { if (hasMinCodeVersion(viewer.platformDetails, 150)) { throw new ServerError('invalid_credentials'); } else { throw new ServerError('invalid_parameters'); } } const userQuery = SQL` SELECT id, hash, username FROM users WHERE LCASE(username) = LCASE(${username}) `; promises.userQuery = dbQuery(userQuery); const { userQuery: [userResult], } = await promiseAll(promises); if (userResult.length === 0) { if (hasMinCodeVersion(viewer.platformDetails, 150)) { throw new ServerError('invalid_credentials'); } else { throw new ServerError('invalid_parameters'); } } const userRow = userResult[0]; if (!userRow.hash || !bcrypt.compareSync(request.password, userRow.hash)) { throw new ServerError('invalid_credentials'); } const id = userRow.id.toString(); if (identityKeys && signedIdentityKeysBlob) { const constIdentityKeys = identityKeys; handleAsyncPromise( (async () => { const rustAPI = await getRustAPI(); - await rustAPI.loginUserPake( - id, - constIdentityKeys.primaryIdentityPublicKeys.ed25519, - request.password, - signedIdentityKeysBlob, - ); + try { + await rustAPI.loginUserPake( + id, + constIdentityKeys.primaryIdentityPublicKeys.ed25519, + request.password, + signedIdentityKeysBlob, + ); + } catch (e) { + if (e.code === 'InvalidArg' && e.message === 'user not found') { + await rustAPI.registerUser( + id, + constIdentityKeys.primaryIdentityPublicKeys.ed25519, + username, + request.password, + signedIdentityKeysBlob, + ); + } + } })(), ); } return await processSuccessfulLogin({ viewer, input, userID: id, calendarQuery, signedIdentityKeysBlob, }); } const siweAuthRequestInputValidator = tShape({ signature: t.String, message: t.String, calendarQuery: entryQueryInputValidator, deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, watchedIDs: t.list(t.String), signedIdentityKeysBlob: t.maybe(signedIdentityKeysBlobValidator), }); async function siweAuthResponder( viewer: Viewer, input: any, ): Promise { await validateInput(viewer, siweAuthRequestInputValidator, input); const request: SIWEAuthRequest = input; const { message, signature, deviceTokenUpdateRequest, platformDetails, signedIdentityKeysBlob, } = request; const calendarQuery = normalizeCalendarQuery(request.calendarQuery); // 1. Ensure that `message` is a well formed Comm SIWE Auth message. const siweMessage: SIWEMessage = new SiweMessage(message); if (!isValidSIWEMessage(siweMessage)) { throw new ServerError('invalid_parameters'); } // 2. Ensure that the `nonce` exists in the `siwe_nonces` table // AND hasn't expired. If those conditions are met, delete the entry to // ensure that the same `nonce` can't be re-used in a future request. const wasNonceCheckedAndInvalidated = await checkAndInvalidateSIWENonceEntry( siweMessage.nonce, ); if (!wasNonceCheckedAndInvalidated) { throw new ServerError('invalid_parameters'); } // 3. Validate SIWEMessage signature and handle possible errors. try { await siweMessage.validate(signature); } catch (error) { if (error === ErrorTypes.EXPIRED_MESSAGE) { // Thrown when the `expirationTime` is present and in the past. throw new ServerError('expired_message'); } else if (error === ErrorTypes.INVALID_SIGNATURE) { // Thrown when the `validate()` function can't verify the message. throw new ServerError('invalid_signature'); } else if (error === ErrorTypes.MALFORMED_SESSION) { // Thrown when some required field is missing. throw new ServerError('malformed_session'); } else { throw new ServerError('unknown_error'); } } // 4. Pull `primaryIdentityPublicKey` out from SIWEMessage `statement`. // We expect it to be included for BOTH native and web clients. const { statement } = siweMessage; const primaryIdentityPublicKey = statement && isValidSIWEStatementWithPublicKey(statement) ? getPublicKeyFromSIWEStatement(statement) : null; if (!primaryIdentityPublicKey) { throw new ServerError('invalid_siwe_statement_public_key'); } // 5. Verify `signedIdentityKeysBlob.payload` with included `signature` // if `signedIdentityKeysBlob` was included in the `SIWEAuthRequest`. let identityKeys: ?IdentityKeysBlob; if (signedIdentityKeysBlob) { identityKeys = JSON.parse(signedIdentityKeysBlob.payload); if (!identityKeysBlobValidator.is(identityKeys)) { throw new ServerError('invalid_identity_keys_blob'); } const olmUtil: OLMUtility = getOLMUtility(); try { olmUtil.ed25519_verify( identityKeys.primaryIdentityPublicKeys.ed25519, signedIdentityKeysBlob.payload, signedIdentityKeysBlob.signature, ); } catch (e) { throw new ServerError('invalid_signature'); } } // 6. Ensure that `primaryIdentityPublicKeys.ed25519` matches SIWE // statement `primaryIdentityPublicKey` if `identityKeys` exists. if ( identityKeys && identityKeys.primaryIdentityPublicKeys.ed25519 !== primaryIdentityPublicKey ) { throw new ServerError('primary_public_key_mismatch'); } // 7. Construct `SIWESocialProof` object with the stringified // SIWEMessage and the corresponding signature. const socialProof: SIWESocialProof = { siweMessage: siweMessage.toMessage(), siweMessageSignature: signature, }; // 8. Create account with call to `processSIWEAccountCreation(...)` // if address does not correspond to an existing user. let userID = await fetchUserIDForEthereumAddress(siweMessage.address); if (!userID) { const siweAccountCreationRequest = { address: siweMessage.address, calendarQuery, deviceTokenUpdateRequest, platformDetails, socialProof, }; userID = await processSIWEAccountCreation( viewer, siweAccountCreationRequest, ); } // 9. Try to double-write SIWE account info to the Identity service. const userIDCopy = userID; if (identityKeys && signedIdentityKeysBlob) { const identityKeysCopy = identityKeys; handleAsyncPromise( (async () => { const rustAPI = await getRustAPI(); await rustAPI.loginUserWallet( userIDCopy, identityKeysCopy.primaryIdentityPublicKeys.ed25519, siweMessage.toMessage(), signature, signedIdentityKeysBlob, JSON.stringify(socialProof), ); })(), ); } // 10. Complete login with call to `processSuccessfulLogin(...)`. return await processSuccessfulLogin({ viewer, input, userID, calendarQuery, socialProof, signedIdentityKeysBlob, }); } const updatePasswordRequestInputValidator = tShape({ code: t.String, password: tPassword, watchedIDs: t.list(t.String), calendarQuery: t.maybe(entryQueryInputValidator), deviceTokenUpdateRequest: t.maybe(deviceTokenUpdateRequestInputValidator), platformDetails: tPlatformDetails, }); async function oldPasswordUpdateResponder( viewer: Viewer, input: any, ): Promise { await validateInput(viewer, updatePasswordRequestInputValidator, input); const request: UpdatePasswordRequest = input; if (request.calendarQuery) { request.calendarQuery = normalizeCalendarQuery(request.calendarQuery); } return await updatePassword(viewer, request); } const updateUserSettingsInputValidator = tShape({ name: t.irreducible( userSettingsTypes.DEFAULT_NOTIFICATIONS, x => x === userSettingsTypes.DEFAULT_NOTIFICATIONS, ), data: t.enums.of(notificationTypeValues), }); async function updateUserSettingsResponder( viewer: Viewer, input: any, ): Promise { const request: UpdateUserSettingsRequest = input; await validateInput(viewer, updateUserSettingsInputValidator, request); return await updateUserSettings(viewer, request); } const policyAcknowledgmentRequestInputValidator = tShape({ policy: t.maybe(t.enums.of(policies)), }); async function policyAcknowledgmentResponder( viewer: Viewer, input: any, ): Promise { const request: PolicyAcknowledgmentRequest = input; await validateInput( viewer, policyAcknowledgmentRequestInputValidator, request, ); await viewerAcknowledgmentUpdater(viewer, request.policy); } export { userSubscriptionUpdateResponder, passwordUpdateResponder, sendVerificationEmailResponder, sendPasswordResetEmailResponder, logOutResponder, accountDeletionResponder, accountCreationResponder, logInResponder, siweAuthResponder, oldPasswordUpdateResponder, updateUserSettingsResponder, policyAcknowledgmentResponder, }; diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs index 829ccf288..ee871226a 100644 --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -1,602 +1,617 @@ use aws_sdk_dynamodb::output::GetItemOutput; use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::Utc; use comm_opaque::Cipher; use constant_time_eq::constant_time_eq; use futures_core::Stream; use opaque_ke::{ServerLogin, ServerRegistration}; use rand::rngs::OsRng; use rand::{CryptoRng, Rng}; use siwe::Message; use std::collections::{HashMap, HashSet}; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{Request, Response, Status}; use tracing::{error, info, instrument}; use crate::constants::MPSC_CHANNEL_BUFFER_CAPACITY; use crate::database::{DatabaseClient, Error as DBError}; use crate::nonce::generate_nonce_data; use crate::pake_grpc; use crate::token::{AccessTokenData, AuthType}; pub use proto::identity_service_server::IdentityServiceServer; use proto::{ get_user_id_request::AuthType as ProtoAuthType, identity_service_server::IdentityService, login_request::Data::PakeLoginRequest, login_request::Data::WalletLoginRequest, login_response::Data::PakeLoginResponse, login_response::Data::WalletLoginResponse, pake_login_request::Data::PakeCredentialFinalization, pake_login_request::Data::PakeCredentialRequestAndUserId, pake_login_response::Data::AccessToken, pake_login_response::Data::PakeCredentialResponse, registration_request::Data::PakeCredentialFinalization as PakeRegistrationCredentialFinalization, registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as PakeRegistrationLoginResponse, registration_response::Data::PakeRegistrationResponse, CompareUsersRequest, CompareUsersResponse, DeleteUserRequest, DeleteUserResponse, GenerateNonceRequest, GenerateNonceResponse, GetSessionInitializationInfoRequest, GetSessionInitializationInfoResponse, GetUserIdRequest, GetUserIdResponse, LoginRequest, LoginResponse, PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, RegistrationRequest, RegistrationResponse, SessionInitializationInfo, UpdateUserRequest, UpdateUserResponse, VerifyUserTokenRequest, VerifyUserTokenResponse, WalletLoginRequest as WalletLoginRequestStruct, WalletLoginResponse as WalletLoginResponseStruct, }; mod proto { tonic::include_proto!("identity"); } mod login; mod registration; mod update; #[derive(Debug)] enum PakeWorkflow { Registration, Login, } #[derive(derive_more::Constructor)] pub struct MyIdentityService { client: DatabaseClient, } #[tonic::async_trait] impl IdentityService for MyIdentityService { type RegisterUserStream = Pin< Box< dyn Stream> + Send + 'static, >, >; #[instrument(skip(self))] async fn register_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client = self.client.clone(); tokio::spawn(async move { let first_message = in_stream.next().await; let mut registration_state = registration::handle_registration_request( first_message, &client, tx.clone(), ) .await?; // ServerRegistration in opaque-ke v1.2 doesn't implement Clone, so we // have to take the value out of registration_state, replacing it with None let pake_state = if let Some(pake_state) = registration_state.pake_state.take() { pake_state } else { error!("registration_state is missing opaque-ke ServerRegistration"); return Err(Status::failed_precondition("internal error")); }; let second_message = in_stream.next().await; let server_login = registration::handle_registration_upload_and_credential_request( second_message, tx.clone(), &client, ®istration_state, pake_state, ) .await?; let third_message = in_stream.next().await; registration::handle_credential_finalization( third_message, tx, &client, ®istration_state, server_login, ) .await?; Ok(()) }); let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::RegisterUserStream )) } type LoginUserStream = Pin> + Send + 'static>>; #[instrument(skip(self))] async fn login_user( &self, request: Request>, ) -> Result, Status> { let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let client = self.client.clone(); tokio::spawn(async move { let first_message = in_stream.next().await; let login_state = login::handle_login_request(first_message, tx.clone(), &client).await?; // login_state will be None if user is logging in with a wallet if let Some(state) = login_state { let second_message = in_stream.next().await; login::handle_credential_finalization( second_message, tx, &client, state, ) .await?; } Ok::<(), Status>(()) }); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::LoginUserStream)) } #[instrument(skip(self))] async fn verify_user_token( &self, request: Request, ) -> Result, Status> { info!("Received VerifyUserToken request: {:?}", request); let message = request.into_inner(); let token_valid = match self .client .get_access_token_data(message.user_id, message.signing_public_key) .await { Ok(Some(access_token_data)) => constant_time_eq( access_token_data.access_token.as_bytes(), message.access_token.as_bytes(), ), Ok(None) => false, Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(VerifyUserTokenResponse { token_valid }); info!("Sending VerifyUserToken response: {:?}", response); Ok(response) } #[instrument(skip(self))] async fn get_user_id( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let auth_type = match ProtoAuthType::from_i32(message.auth_type) { Some(ProtoAuthType::Password) => AuthType::Password, Some(ProtoAuthType::Wallet) => AuthType::Wallet, None => { error!( "Unable to parse AuthType from message: {}", message.auth_type ); return Err(Status::invalid_argument("invalid message")); } }; let user_id = match self .client .get_user_id_from_user_info(message.user_info, auth_type) .await { Ok(Some(user_id)) => user_id, Ok(None) => return Err(Status::not_found("no user ID found")), Err(e) => return Err(handle_db_error(e)), }; let response = Response::new(GetUserIdResponse { user_id }); Ok(response) } #[instrument(skip(self))] async fn delete_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); match self.client.delete_user(message.user_id).await { Ok(_) => Ok(Response::new(DeleteUserResponse {})), Err(e) => Err(handle_db_error(e)), } } #[instrument(skip(self))] async fn compare_users( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let mut mysql_users_vec = message.users; let mut ddb_users_vec = match self.client.get_users().await { Ok(user_list) => user_list, Err(e) => return Err(handle_db_error(e)), }; // We use HashSets here for faster lookups let mysql_users_set = HashSet::::from_iter(mysql_users_vec.clone()); let ddb_users_set = HashSet::::from_iter(ddb_users_vec.clone()); ddb_users_vec.retain(|user| !mysql_users_set.contains(user)); mysql_users_vec.retain(|user| !ddb_users_set.contains(user)); Ok(Response::new(CompareUsersResponse { users_missing_from_keyserver: ddb_users_vec, users_missing_from_identity: mysql_users_vec, })) } #[instrument(skip(self))] async fn generate_nonce( &self, _request: Request, ) -> Result, Status> { let nonce_data = generate_nonce_data(&mut OsRng); match self .client .add_nonce_to_nonces_table(nonce_data.clone()) .await { Ok(_) => Ok(Response::new(GenerateNonceResponse { nonce: nonce_data.nonce, })), Err(e) => Err(handle_db_error(e)), } } #[instrument(skip(self))] async fn get_session_initialization_info( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); match self .client .get_session_initialization_info(&message.user_id) .await { Ok(Some(session_initialization_info)) => { let mut devices = HashMap::new(); for (device, info) in session_initialization_info { devices.insert(device, SessionInitializationInfo { info }); } Ok(Response::new(GetSessionInitializationInfoResponse { devices, })) } Ok(None) => return Err(Status::not_found("user not found")), Err(e) => Err(handle_db_error(e)), } } #[instrument(skip(self))] async fn update_user( &self, request: Request>, ) -> Result, Status> { let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY); let db_client = self.client.clone(); tokio::spawn(async move { update::handle_server_update_user_messages( request.into_inner(), db_client, tx, ) .await; }); let out_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(out_stream) as Self::UpdateUserStream)) } type UpdateUserStream = Pin< Box> + Send + 'static>, >; } async fn put_token_helper( client: &DatabaseClient, auth_type: AuthType, user_id: &str, signing_public_key: &str, rng: &mut (impl Rng + CryptoRng), ) -> Result { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID \"{}\", signing public key \"{}\"", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } let access_token_data = AccessTokenData::new( user_id.to_string(), signing_public_key.to_string(), auth_type, rng, ); match client .put_access_token_data(access_token_data.clone()) .await { Ok(_) => Ok(access_token_data.access_token), Err(e) => Err(handle_db_error(e)), } } fn parse_and_verify_siwe_message( user_id: &str, signing_public_key: &str, siwe_message: &str, siwe_signature: &str, ) -> Result<(), Status> { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID {}, signing public key {}", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } let siwe_message: Message = match siwe_message.parse() { Ok(m) => m, Err(e) => { error!("Failed to parse SIWE message: {}", e); return Err(Status::invalid_argument("invalid message")); } }; let decoded_signature = hex::decode(siwe_signature.trim_start_matches("0x")) .map_err(|e| { error!("Failed to decode SIWE signature: {}", e); Status::invalid_argument("invalid signature") })?; match siwe_message.verify( match decoded_signature.try_into() { Ok(s) => s, Err(e) => { error!("Conversion to SIWE signature failed: {:?}", e); return Err(Status::invalid_argument("invalid message")); } }, None, None, Some(&Utc::now()), ) { Err(e) => { error!( "Signature verification failed for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Status::unauthenticated("message not authenticated")) } Ok(_) => Ok(()), } } async fn wallet_login_helper( client: &DatabaseClient, wallet_login_request: WalletLoginRequestStruct, rng: &mut (impl Rng + CryptoRng), ) -> Result { parse_and_verify_siwe_message( &wallet_login_request.user_id, &wallet_login_request.signing_public_key, &wallet_login_request.siwe_message, &wallet_login_request.siwe_signature, )?; client .update_users_table( wallet_login_request.user_id.clone(), Some(wallet_login_request.signing_public_key.clone()), None, None, Some( &wallet_login_request .session_initialization_info .ok_or_else(|| Status::invalid_argument("Invalid message"))? .info, ), ) .await .map_err(handle_db_error)?; Ok(LoginResponse { data: Some(WalletLoginResponse(WalletLoginResponseStruct { access_token: put_token_helper( client, AuthType::Wallet, &wallet_login_request.user_id, &wallet_login_request.signing_public_key, rng, ) .await?, })), }) } async fn pake_login_start( client: &DatabaseClient, user_id: &str, pake_credential_request: &[u8], ) -> Result { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration = match client.get_pake_registration(user_id.to_string()).await { Ok(Some(r)) => r, Ok(None) => { return Err(Status::not_found("user not found")); } Err(e) => return Err(handle_db_error(e)), }; let server_login_start_result = pake_grpc::server_login_start( &mut OsRng, server_registration, pake_credential_request, )?; let credential_response = server_login_start_result.message.serialize().map_err(|e| { error!("Failed to serialize PAKE message: {}", e); Status::failed_precondition("internal error") })?; Ok(LoginResponseAndPakeState { response: PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response)), }, pake_state: server_login_start_result.state, }) } async fn pake_login_finish( user_id: &str, signing_public_key: &str, client: &DatabaseClient, server_login: ServerLogin, pake_credential_finalization: &Vec, rng: &mut (impl Rng + CryptoRng), pake_workflow: PakeWorkflow, session_initialization_info: &HashMap, ) -> Result { if user_id.is_empty() || signing_public_key.is_empty() { error!( "Incomplete data: user ID {}, signing public key {}", user_id, signing_public_key ); return Err(Status::aborted("user not found")); } pake_grpc::server_login_finish(server_login, pake_credential_finalization)?; if matches!(pake_workflow, PakeWorkflow::Login) { client .update_users_table( user_id.to_string(), Some(signing_public_key.to_string()), None, None, Some(session_initialization_info), ) .await .map_err(handle_db_error)?; } Ok(PakeLoginResponseStruct { data: Some(AccessToken( put_token_helper( client, AuthType::Password, user_id, signing_public_key, rng, ) .await?, )), }) } async fn server_register_response( registration_request_bytes: &Vec, ) -> Result { let server_registration_start_result = pake_grpc::server_registration_start( &mut OsRng, registration_request_bytes, )?; Ok(RegistrationResponseAndPakeState { response: RegistrationResponse { data: Some(PakeRegistrationResponse( server_registration_start_result.message.serialize(), )), }, pake_state: server_registration_start_result.state, }) } async fn pake_registration_finish( user_id: &str, client: &DatabaseClient, registration_upload_bytes: &Vec, server_registration: ServerRegistration, username: &str, signing_public_key: &str, session_initialization_info: &HashMap, ) -> Result<(), Status> { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration_finish_result = pake_grpc::server_registration_finish( server_registration, registration_upload_bytes, )?; match client .add_user_to_users_table( user_id.to_string(), server_registration_finish_result, username.to_string(), signing_public_key.to_string(), session_initialization_info, ) .await { Ok(_) => Ok(()), Err(e) => Err(handle_db_error(e)), } } fn handle_db_error(db_error: DBError) -> Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { Status::unavailable("please retry") } e => { error!("Encountered an unexpected error: {}", e); Status::failed_precondition("unexpected error") } } } struct RegistrationResponseAndPakeState { response: RegistrationResponse, pake_state: ServerRegistration, } struct LoginResponseAndPakeState { response: PakeLoginResponseStruct, pake_state: ServerLogin, } + +async fn send_to_client( + tx: &tokio::sync::mpsc::Sender>, + response: Result, +) -> Result<(), Status> { + let transport_result = match response { + Ok(message) => tx.send(Ok(message)).await, + Err(status) => { + error!("{}", status.message()); + tx.send(Err(status)).await + } + }; + + transport_result.map_err(|_| Status::internal("disconnection")) +} diff --git a/services/identity/src/service/login.rs b/services/identity/src/service/login.rs index 298af11b6..cbb317a4e 100644 --- a/services/identity/src/service/login.rs +++ b/services/identity/src/service/login.rs @@ -1,99 +1,106 @@ use super::*; pub struct LoginState { user_id: String, signing_public_key: String, pake_state: ServerLogin, session_initialization_info: HashMap, } pub async fn handle_login_request( message: Option>, tx: mpsc::Sender>, client: &DatabaseClient, ) -> Result, Status> { match message { Some(Ok(LoginRequest { data: Some(WalletLoginRequest(req)), })) => { let wallet_login_result = wallet_login_helper(client, req, &mut OsRng).await; if let Err(e) = tx.send(wallet_login_result).await { error!("Response was dropped: {}", e); Err(Status::aborted("failure")) } else { Ok(None) } } Some(Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialRequestAndUserId( pake_credential_request_and_user_id, )), })), })) => { - let response_and_state = pake_login_start( + let response_and_state = match pake_login_start( client, &pake_credential_request_and_user_id.user_id, &pake_credential_request_and_user_id.pake_credential_request, ) - .await?; + .await + { + Ok(r) => r, + Err(e) => { + send_to_client(&tx, Err(e.clone())).await?; + return Err(e); + } + }; let login_response = LoginResponse { data: Some(PakeLoginResponse(response_and_state.response)), }; if let Err(e) = tx.send(Ok(login_response)).await { error!("Response was dropped: {}", e); return Err(Status::aborted("failure")); } Ok(Some(LoginState { user_id: pake_credential_request_and_user_id.user_id, signing_public_key: pake_credential_request_and_user_id .signing_public_key, pake_state: response_and_state.pake_state, session_initialization_info: pake_credential_request_and_user_id .session_initialization_info .ok_or_else(|| Status::invalid_argument("Invalid message"))? .info, })) } Some(_) | None => Err(Status::aborted("failure")), } } pub async fn handle_credential_finalization( message: Option>, tx: mpsc::Sender>, client: &DatabaseClient, login_state: LoginState, ) -> Result<(), Status> { match message { Some(Ok(LoginRequest { data: Some(PakeLoginRequest(PakeLoginRequestStruct { data: Some(PakeCredentialFinalization(pake_credential_finalization)), })), })) => { let login_finish_result = pake_login_finish( &login_state.user_id, &login_state.signing_public_key, client, login_state.pake_state, &pake_credential_finalization, &mut OsRng, PakeWorkflow::Login, &login_state.session_initialization_info, ) .await .map(|pake_login_response| LoginResponse { data: Some(PakeLoginResponse(pake_login_response)), }); if let Err(e) = tx.send(login_finish_result).await { error!("Response was dropped: {}", e); return Err(Status::aborted("failure")); } Ok(()) } Some(_) | None => Err(Status::aborted("failure")), } } diff --git a/services/identity/src/service/update.rs b/services/identity/src/service/update.rs index 97c326844..894370bff 100644 --- a/services/identity/src/service/update.rs +++ b/services/identity/src/service/update.rs @@ -1,296 +1,281 @@ use aws_sdk_dynamodb::output::GetItemOutput; use comm_opaque::Cipher; use opaque_ke::{ServerLogin, ServerRegistration}; use rand::rngs::OsRng; use tokio::sync::mpsc; use tokio_stream::StreamExt; use tonic::Streaming; use tracing::{debug, error, info}; use super::proto::{ pake_login_response::Data::AccessToken, update_user_request, update_user_response, update_user_response::Data::PakeLoginResponse, update_user_response::Data::PakeRegistrationResponse, PakeRegistrationRequestAndUserId, PakeRegistrationUploadAndCredentialRequest, }; use crate::service::PakeLoginResponseStruct; use crate::token::AuthType; use crate::{database::DatabaseClient, pake_grpc}; use super::{ - handle_db_error, pake_login_start, put_token_helper, Status, + handle_db_error, pake_login_start, put_token_helper, send_to_client, Status, UpdateUserRequest, UpdateUserResponse, }; -async fn send_to_client( - tx: &tokio::sync::mpsc::Sender>, - response: Result, -) -> Result<(), Status> { - let transport_result = match response { - Ok(message) => tx.send(Ok(message)).await, - Err(status) => { - error!("{}", status.message()); - tx.send(Err(status)).await - } - }; - - transport_result.map_err(|_| Status::internal("disconnection")) -} - pub(crate) async fn handle_server_update_user_messages( in_stream: Streaming, client: DatabaseClient, tx: tokio::sync::mpsc::Sender>, ) { match attempt_update_user(in_stream, &client, &tx).await { Ok(user_id) => info!("Successfully updated user {}", user_id), // Attempt to send client the failure to receive immediate feedback Err(e) => match send_to_client(&tx, Err(e)).await { Ok(_) => debug!("Attempted to inform user of failed update"), Err(_) => return, }, }; } async fn attempt_update_user( mut in_stream: Streaming, client: &DatabaseClient, tx: &tokio::sync::mpsc::Sender>, ) -> Result { let first_message = in_stream.next().await; let (request, registration_state) = handle_registration_request(first_message, client, tx).await?; let second_message = in_stream.next().await; let registration_upload = get_registration_upload(second_message)?; let server_login = handle_registration_upload_and_credential_request( registration_upload, tx, &client, &request, registration_state, ) .await?; let third_message = in_stream.next().await; let finalization_payload = get_finalization_message(third_message)?; handle_credential_finalization( finalization_payload, tx, client, &request, server_login, ) .await?; Ok(request.user_id) } pub async fn handle_registration_request( message: Option>, client: &DatabaseClient, tx: &mpsc::Sender>, ) -> Result< (PakeRegistrationRequestAndUserId, ServerRegistration), Status, > { let request = get_register_request(message)?; user_exists(&request.user_id, &client).await?; let server_registration_start_result = pake_grpc::server_registration_start( &mut OsRng, &request.pake_registration_request, )?; let server_start_payload = server_registration_start_result.message.serialize(); let server_start_response = UpdateUserResponse { data: Some(PakeRegistrationResponse(server_start_payload)), }; send_to_client(&tx, Ok(server_start_response)).await?; Ok((request, server_registration_start_result.state)) } fn get_register_request( message: Option>, ) -> Result { match message { Some(Ok(UpdateUserRequest { data: Some(update_user_request::Data::Request(request)), })) => Ok(request), e => { error!( "Expected to receive registration request, but instead received {:?}", e ); Err(Status::invalid_argument("server error")) } } } fn get_finalization_message( message: Option>, ) -> Result, Status> { match message { Some(Ok(UpdateUserRequest { data: Some(update_user_request::Data::PakeLoginFinalizationMessage(request)), })) => Ok(request), e => { error!( "Expected to receive login finalization message, but instead received {:?}", e); Err(Status::aborted("server error")) } } } async fn user_exists( user_id: &str, client: &DatabaseClient, ) -> Result { match client .get_item_from_users_table(&user_id) .await .map_err(handle_db_error) { Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), Ok(GetItemOutput { item: None, .. }) => { error!("Unable to find user: {}", user_id); return Err(Status::not_found("user not found")); } Err(e) => Err(e), } } pub async fn handle_registration_upload_and_credential_request( message: PakeRegistrationUploadAndCredentialRequest, tx: &mpsc::Sender>, client: &DatabaseClient, request_and_user_info: &PakeRegistrationRequestAndUserId, pake_state: ServerRegistration, ) -> Result, Status> { pake_registration_finish( &request_and_user_info.user_id, client, &message.pake_registration_upload, pake_state, ) .await?; let response_and_state = pake_login_start( client, &request_and_user_info.user_id, &message.pake_credential_request, ) .await?; let registration_response = UpdateUserResponse { data: Some(PakeLoginResponse(response_and_state.response)), }; send_to_client(tx, Ok(registration_response)).await?; Ok(response_and_state.pake_state) } async fn pake_registration_finish( user_id: &str, client: &DatabaseClient, registration_upload_bytes: &Vec, server_registration: ServerRegistration, ) -> Result<(), Status> { if user_id.is_empty() { error!("Incomplete data: user ID not provided"); return Err(Status::aborted("user not found")); } let server_registration_finish_result = pake_grpc::server_registration_finish( server_registration, registration_upload_bytes, )?; client .update_users_table( user_id.to_string(), None, Some(server_registration_finish_result), None, None, ) .await .map_err(handle_db_error)?; Ok(()) } fn get_registration_upload( message: Option>, ) -> Result< crate::service::proto::PakeRegistrationUploadAndCredentialRequest, Status, > { match message { Some(Ok(UpdateUserRequest { data: Some( update_user_request::Data::PakeRegistrationUploadAndCredentialRequest( upload, ), ), })) => Ok(upload), e => { error!( "Expected to receive registration upload, but instead received {:?}", e ); Err(Status::aborted("server error")) } } } pub async fn handle_credential_finalization( finalization_payload: Vec, tx: &mpsc::Sender>, client: &DatabaseClient, request_and_user_info: &PakeRegistrationRequestAndUserId, server_login: ServerLogin, ) -> Result<(), Status> { let login_finish_result = pake_login_finish( &request_and_user_info.user_id, &request_and_user_info.signing_public_key, client, server_login, &finalization_payload, ) .await?; let response = UpdateUserResponse { data: Some(update_user_response::Data::PakeLoginResponse( login_finish_result, )), }; send_to_client(tx, Ok(response)).await } async fn pake_login_finish( user_id: &str, signing_public_key: &str, client: &DatabaseClient, server_login: ServerLogin, pake_credential_finalization: &Vec, ) -> Result { if user_id.is_empty() { error!("Incomplete data: user ID {}", user_id); return Err(Status::aborted("user not found")); } pake_grpc::server_login_finish(server_login, pake_credential_finalization)?; let access_token = put_token_helper( client, AuthType::Password, user_id, signing_public_key, &mut OsRng, ) .await?; Ok(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), }) }