diff --git a/services/identity/src/client_service.rs b/services/identity/src/client_service.rs index 228c5626e..09b16c76d 100644 --- a/services/identity/src/client_service.rs +++ b/services/identity/src/client_service.rs @@ -1,1348 +1,1349 @@ // Standard library imports use std::str::FromStr; // External crate imports use comm_lib::shared::reserved_users::RESERVED_USERNAME_SET; use comm_opaque2::grpc::protocol_error_to_grpc_status; use grpc_clients::identity::PlatformMetadata; use rand::rngs::OsRng; use serde::{Deserialize, Serialize}; use siwe::eip55; use tonic::Response; use tracing::{debug, error, info, warn}; use crate::comm_service::tunnelbroker; // Workspace crate imports use crate::config::CONFIG; use crate::constants::{error_types, tonic_status_messages}; use crate::database::{ DBDeviceTypeInt, DatabaseClient, DeviceType, KeyPayload, UserInfoAndPasswordFile, }; use crate::ddb_utils::{Identifier, is_transaction_conflict}; use crate::device_list::SignedDeviceList; use crate::error::{DeviceListError, Error as DBError, consume_error}; -use crate::grpc_services::authenticated::{DeletePasswordUserInfo, UpdatePasswordInfo}; +use crate::grpc_services::authenticated::{DeletePasswordUserInfo, UpdatePasswordInfo, PrivilegedPasswordResetInfo}; use crate::grpc_services::protos::unauth::{ find_user_id_request, AddReservedUsernamesRequest, AuthResponse, Empty, ExistingDeviceLoginRequest, FindUserIdRequest, FindUserIdResponse, GenerateNonceResponse, OpaqueLoginFinishRequest, OpaqueLoginStartRequest, OpaqueLoginStartResponse, RegistrationFinishRequest, RegistrationStartRequest, RegistrationStartResponse, RemoveReservedUsernameRequest, ReservedRegistrationStartRequest, RestoreUserRequest, SecondaryDeviceKeysUploadRequest, VerifyUserAccessTokenRequest, VerifyUserAccessTokenResponse, WalletAuthRequest, GetFarcasterUsersRequest, GetFarcasterUsersResponse }; use crate::grpc_services::shared::get_platform_metadata; use crate::grpc_utils::{ DeviceKeyUploadActions, RegistrationActions, SignedNonce }; use crate::log::redact_sensitive_data; use crate::nonce::generate_nonce_data; use crate::reserved_users::{ validate_account_ownership_message_and_get_user_id, validate_add_reserved_usernames_message, validate_remove_reserved_username_message, }; use crate::siwe::{ is_valid_ethereum_address, parse_and_verify_siwe_message, SocialProof, }; use crate::token::{AccessTokenData, AuthType}; pub use crate::grpc_services::protos::unauth::identity_client_service_server::{ IdentityClientService, IdentityClientServiceServer, }; use crate::regex::is_valid_username; #[derive(Clone, Serialize, Deserialize)] pub enum WorkflowInProgress { Registration(Box), Login(Box), Update(Box), PasswordUserDeletion(Box), + PrivilegedPasswordReset(Box), } #[derive(Clone, Serialize, Deserialize)] pub struct UserRegistrationInfo { pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub user_id: Option, pub farcaster_id: Option, pub initial_device_list: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct UserLoginInfo { pub user_id: String, pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub opaque_server_login: comm_opaque2::server::Login, pub device_to_remove: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct FlattenedDeviceKeyUpload { pub device_id_key: String, pub key_payload: String, pub key_payload_signature: String, pub content_prekey: String, pub content_prekey_signature: String, pub content_one_time_keys: Vec, pub notif_prekey: String, pub notif_prekey_signature: String, pub notif_one_time_keys: Vec, pub device_type: DeviceType, } #[derive(derive_more::Constructor)] pub struct ClientService { client: DatabaseClient, } #[tonic::async_trait] impl IdentityClientService for ClientService { #[tracing::instrument(skip_all)] async fn register_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Received registration request for: {}", message.username); if !is_valid_username(&message.username) || is_valid_ethereum_address(&message.username) { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_USERNAME, )); } self.check_username_taken(&message.username).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&message.username) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } if RESERVED_USERNAME_SET.contains(&message.username.to_lowercase()) { return Err(tonic::Status::invalid_argument( tonic_status_messages::USERNAME_RESERVED, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let registration_state = construct_user_registration_info( &message, None, message.username.clone(), message.farcaster_id.clone(), )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_reserved_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); self.check_username_taken(&message.username).await?; let Some(original_username) = self .client .get_original_username_from_reserved_usernames_table(&message.username) .await? else { return Err(tonic::Status::permission_denied( tonic_status_messages::USERNAME_NOT_RESERVED, )); }; let user_id = validate_account_ownership_message_and_get_user_id( &message.username, &message.keyserver_message, &message.keyserver_signature, )?; let registration_state = construct_user_registration_info( &message, Some(user_id), original_username, None, )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); if let Some(WorkflowInProgress::Registration(state)) = self.client.get_workflow(message.session_id).await? { let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; let login_time = chrono::Utc::now(); let device_id = state.flattened_device_key_upload.device_id_key.clone(); let username = state.username.clone(); let user_id = self .client .add_password_user_to_users_table( *state, password_file, platform_metadata, login_time, ) .await?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } else { Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )) } } #[tracing::instrument(skip_all)] async fn log_in_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Attempting to log in user: {:?}", &message.username); let user_id_and_password_file = self .client .get_user_info_and_password_file_from_username(&message.username) .await?; let UserInfoAndPasswordFile { user_id, original_username: username, password_file: password_file_bytes, } = if let Some(data) = user_id_and_password_file { data } else { // It's possible that the user attempting login is already registered // on Ashoat's keyserver. If they are, we should send back a gRPC status // code instructing them to get a signed message from Ashoat's keyserver // in order to claim their username and register with the Identity // service. let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&message.username) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::permission_denied( tonic_status_messages::NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME, )); } return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let maybe_device_to_remove = self .get_keyserver_device_to_remove( &user_id, &flattened_device_key_upload.device_id_key, message.force.unwrap_or(false), &flattened_device_key_upload.device_type, ) .await?; let mut server_login = comm_opaque2::server::Login::new(); let server_response = match server_login.start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, message.username.to_lowercase().as_bytes(), ) { Ok(response) => response, Err(_) => { // Retry with original username bytes if the first attempt fails server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)? } }; let login_state = construct_user_login_info( user_id, username, server_login, flattened_device_key_upload, maybe_device_to_remove, )?; let session_id = self .client .insert_workflow(WorkflowInProgress::Login(Box::new(login_state))) .await?; let response = Response::new(OpaqueLoginStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn log_in_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Login(state)) = self.client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; if let Some(device_to_remove) = state.device_to_remove { self .client .remove_device(state.user_id.clone(), device_to_remove) .await?; } let login_time = chrono::Utc::now(); self .client .add_user_device( state.user_id.clone(), state.flattened_device_key_upload.clone(), platform_metadata, login_time, ) .await?; // Create access token let token = AccessTokenData::with_created_time( state.user_id.clone(), state.flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id: state.user_id, access_token, username: state.username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); // WalletAuthRequest is used for both log_in_wallet_user and register_wallet_user if !message.initial_device_list.is_empty() { return Err(tonic::Status::invalid_argument( tonic_status_messages::UNEXPECTED_INITIAL_DEVICE_LIST, )); } let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, ) .await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let login_time = chrono::Utc::now(); let user_id = if let Some(user_id) = self .client .get_user_id_from_user_info(wallet_address.clone(), &AuthType::Wallet) .await? { self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_user_device( user_id.clone(), flattened_device_key_upload.clone(), platform_metadata, login_time, ) .await?; user_id } else { let Some(user_id) = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await? else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; // It's possible that the user attempting login is already registered // on Ashoat's keyserver. If they are, we should try to register them if // they're on a mobile device, otherwise we should send back a gRPC status // code instructing them to try logging in from a mobile device first. if platform_metadata.device_type.to_uppercase() != "ANDROID" && platform_metadata.device_type.to_uppercase() != "IOS" { return Err(tonic::Status::permission_denied( tonic_status_messages::RETRY_FROM_NATIVE, )); }; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, Some(user_id.clone()), platform_metadata, login_time, message.farcaster_id, None, ) .await?; user_id }; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, ) .await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); self.check_wallet_address_taken(&wallet_address).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, None) .await?; let login_time = chrono::Utc::now(); let initial_device_list = message.get_and_verify_initial_device_list()?; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); let user_id = self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, None, platform_metadata, login_time, message.farcaster_id, initial_device_list, ) .await?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn restore_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); debug!( "Attempting to restore user: {}", redact_sensitive_data(&message.user_id) ); let user_identifier = self .client .get_user_identity(&message.user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })? .identifier; if matches!(user_identifier, Identifier::Username(_)) && (message.siwe_message.is_some() || message.siwe_signature.is_some()) { debug!("SIWE data present for password user!"); return Err(tonic::Status::invalid_argument( tonic_status_messages::PASSWORD_USER, )); } if let Identifier::WalletAddress(ref eth_identity) = &user_identifier { let (Some(siwe_message), Some(siwe_signature)) = (&message.siwe_message, &message.siwe_signature) else { debug!("SIWE data absent for wallet user!"); return Err(tonic::Status::invalid_argument( tonic_status_messages::WALLET_USER, )); }; let parsed_message = parse_and_verify_siwe_message(siwe_message, siwe_signature).await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); if wallet_address != eth_identity.wallet_address { debug!( "Wallet address mismatch: expected '{}' but got '{}' instead", ð_identity.wallet_address, &wallet_address ); return Err(tonic::Status::invalid_argument( tonic_status_messages::WALLET_ADDRESS_MISMATCH, )); } debug!("Updating social proof..."); let social_proof = SocialProof::new(siwe_message.to_string(), siwe_signature.to_string()); self .client .update_wallet_user_social_proof(&message.user_id, social_proof) .await?; } let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let access_token = self .restore_primary_device_and_get_csat( message.user_id.clone(), message.device_list, platform_metadata, flattened_device_key_upload, ) .await?; let response = AuthResponse { user_id: message.user_id, access_token, username: user_identifier.username().to_string(), }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn upload_keys_for_registered_device_and_log_in( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let user_id = message.user_id; let device_id = flattened_device_key_upload.device_id_key.clone(); let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let user_identity = self .client .get_user_identity(&user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(device_list) = self.client.get_current_device_list(&user_id).await? else { warn!("User {} does not have valid device list. Secondary device auth impossible.", redact_sensitive_data(&user_id)); return Err(tonic::Status::aborted( tonic_status_messages::DEVICE_LIST_ERROR, )); }; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; self .client .put_device_data( &user_id, flattened_device_key_upload, platform_metadata, login_time, ) .await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_existing_device( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let ExistingDeviceLoginRequest { user_id, device_id, .. } = message; let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; let (identity_response, device_list_response) = tokio::join!( self.client.get_user_identity(&user_id), self.client.get_current_device_list(&user_id) ); let user_identity = identity_response?.ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let device_list = device_list_response?.ok_or_else(|| { warn!( "User {} does not have a valid device list.", redact_sensitive_data(&user_id) ); tonic::Status::aborted(tonic_status_messages::DEVICE_LIST_ERROR) })?; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn generate_nonce( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let nonce_data = generate_nonce_data(&mut OsRng); self .client .add_nonce_to_nonces_table(nonce_data.clone()) .await?; let response = GenerateNonceResponse { nonce: nonce_data.nonce, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn verify_user_access_token( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Verifying device: {}", &message.device_id); let token_valid = self .client .verify_access_token( message.user_id, message.device_id.clone(), message.access_token, ) .await?; let response = Response::new(VerifyUserAccessTokenResponse { token_valid }); debug!( "device {} was verified: {}", &message.device_id, token_valid ); Ok(response) } #[tracing::instrument(skip_all)] async fn add_reserved_usernames( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_details = validate_add_reserved_usernames_message( &message.message, &message.signature, )?; let filtered_user_details = self.client.filter_out_taken_usernames(user_details).await?; self .client .add_usernames_to_reserved_usernames_table(filtered_user_details) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn remove_reserved_username( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let username = validate_remove_reserved_username_message( &message.message, &message.signature, )?; self .client .delete_username_from_reserved_usernames_table(username) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn ping( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn find_user_id( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); use find_user_id_request::Identifier; let (user_ident, auth_type) = match message.identifier { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::NO_IDENTIFIER_PROVIDED, )) } Some(Identifier::Username(username)) => (username, AuthType::Password), Some(Identifier::WalletAddress(address)) => (address, AuthType::Wallet), }; let (get_user_id_from_reserved_usernames_table_result, user_id_result) = tokio::join!( self .client .get_user_id_from_reserved_usernames_table(&user_ident), self .client .get_user_id_from_user_info(user_ident.clone(), &auth_type), ); let is_reserved = get_user_id_from_reserved_usernames_table_result?.is_some(); let user_id = user_id_result?; Ok(Response::new(FindUserIdResponse { user_id, is_reserved, })) } #[tracing::instrument(skip_all)] async fn get_farcaster_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let farcaster_users = self .client .get_farcaster_users(message.farcaster_ids) .await? .into_iter() .map(|d| d.0) .collect(); Ok(Response::new(GetFarcasterUsersResponse { farcaster_users })) } } impl ClientService { async fn check_username_taken( &self, username: &str, ) -> Result<(), tonic::Status> { let username_taken = self.client.username_taken(username.to_string()).await?; if username_taken { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } Ok(()) } async fn check_wallet_address_taken( &self, wallet_address: &str, ) -> Result<(), tonic::Status> { let wallet_address_taken = self .client .wallet_address_taken(wallet_address.to_string()) .await?; if wallet_address_taken { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } Ok(()) } async fn check_farcaster_id_taken( &self, farcaster_id: &str, ) -> Result<(), tonic::Status> { let fid_already_registered = !self .client .get_farcaster_users(vec![farcaster_id.to_string()]) .await? .is_empty(); if fid_already_registered { return Err(tonic::Status::already_exists( tonic_status_messages::FID_TAKEN, )); } Ok(()) } async fn check_device_id_taken( &self, key_upload: &FlattenedDeviceKeyUpload, requesting_user_id: Option<&str>, ) -> Result<(), tonic::Status> { let device_id = key_upload.device_id_key.as_str(); let Some(existing_device_user_id) = self.client.find_user_id_for_device(device_id).await? else { // device ID doesn't exist return Ok(()); }; // allow already-existing device ID for the same user match requesting_user_id { Some(user_id) if user_id == existing_device_user_id => { debug!( "Found already-existing device {} for user {}", device_id, user_id ); Ok(()) } _ => { warn!("Device ID already exists: {device_id}"); Err(tonic::Status::already_exists( tonic_status_messages::DEVICE_ID_ALREADY_EXISTS, )) } } } async fn verify_and_remove_nonce( &self, nonce: &str, ) -> Result<(), tonic::Status> { match self.client.get_nonce_from_nonces_table(nonce).await? { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_NONCE, )) } Some(nonce) if nonce.is_expired() => { // we don't need to remove the nonce from the table here // because the DynamoDB TTL will take care of it return Err(tonic::Status::aborted( tonic_status_messages::NONCE_EXPIRED, )); } Some(nonce_data) => { self .client .remove_nonce_from_nonces_table(&nonce_data.nonce) .await? } }; Ok(()) } async fn get_keyserver_device_to_remove( &self, user_id: &str, new_keyserver_device_id: &str, force: bool, device_type: &DeviceType, ) -> Result, tonic::Status> { if device_type != &DeviceType::Keyserver { return Ok(None); } let maybe_keyserver_device_id = self .client .get_keyserver_device_id_for_user(user_id) .await?; let Some(existing_keyserver_device_id) = maybe_keyserver_device_id else { return Ok(None); }; if new_keyserver_device_id == existing_keyserver_device_id { return Ok(None); } if force { info!( "keyserver {} will be removed from the device list", existing_keyserver_device_id ); Ok(Some(existing_keyserver_device_id)) } else { Err(tonic::Status::already_exists( tonic_status_messages::USER_ALREADY_HAS_KEYSERVER, )) } } /// This function is used in Backup Restore protocol. It: /// - Verifies singleton device list with both old and new primary signature /// - Performs device list update (updates with singleton payload) /// - Removes all CSATs, OTKs, Devices data for all devices /// - Closes Tunnelbroker connections with these devices /// - Registers the new primary device (Device Key Upload) /// - Issues a new CSAT for the new primary device and returns it async fn restore_primary_device_and_get_csat( &self, user_id: String, device_list_payload: String, platform_metadata: PlatformMetadata, device_key_upload: FlattenedDeviceKeyUpload, ) -> Result { debug!("Verifying device list..."); let new_primary_device_id = device_key_upload.device_id_key.clone(); let previous_primary_device_id = self .client .get_current_device_list(&user_id) .await? .and_then(|device_list| device_list.primary_device_id().cloned()) .ok_or_else(|| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User had missing or empty device list (before backup restore)!" ); tonic::Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, ) })?; // Verify device list payload let signed_list: SignedDeviceList = device_list_payload.parse()?; let device_list_payload = crate::database::DeviceListUpdate::try_from(signed_list)?; crate::device_list::verify_singleton_device_list( &device_list_payload, &new_primary_device_id, Some(&previous_primary_device_id), )?; debug!(user_id, "Attempting to revoke user's old access tokens"); self.client.delete_all_tokens_for_user(&user_id).await?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's old one-time keys"); self .client .delete_otks_table_rows_for_user(&user_id) .await?; debug!(user_id, "Attempting to delete user's old devices"); let old_device_ids = self.client.delete_devices_data_for_user(&user_id).await?; spawn_force_close_tb_session_task(old_device_ids); // Reset device list (perform update) let login_time = chrono::Utc::now(); debug!(user_id, "Registering new primary device..."); self .client .register_primary_device( &user_id, device_key_upload, platform_metadata, login_time, device_list_payload, ) .await?; // Create new access token let token_data = AccessTokenData::with_created_time( user_id.clone(), new_primary_device_id, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token_data.access_token.clone(); self.client.put_access_token_data(token_data).await?; Ok(access_token) } } #[tracing::instrument(skip_all)] pub fn handle_db_error(db_error: DBError) -> tonic::Status { use comm_lib::aws::DynamoDBError as DDBError; use tonic::Status; use tonic_status_messages as msg; use DBError as E; match db_error { E::AwsSdk(DDBError::InternalServerError(_)) | E::AwsSdk(DDBError::ProvisionedThroughputExceededException(_)) | E::AwsSdk(DDBError::RequestLimitExceeded(_)) => { Status::unavailable(msg::RETRY) } E::AwsSdk(ref err) if is_transaction_conflict(err) => { warn!("DB operation conflicted. Returning RETRY status."); Status::unavailable(msg::RETRY) } E::DeviceList(DeviceListError::InvalidDeviceListUpdate) => { Status::invalid_argument(msg::INVALID_DEVICE_LIST_UPDATE) } E::DeviceList(DeviceListError::InvalidSignature) => { Status::invalid_argument(msg::INVALID_DEVICE_LIST_SIGNATURE) } e => { error!( errorType = error_types::GENERIC_DB_LOG, "Encountered an unexpected error: {}", e ); Status::failed_precondition(msg::UNEXPECTED_ERROR) } } } impl From for tonic::Status { fn from(err: DBError) -> Self { handle_db_error(err) } } fn construct_user_registration_info( message: &(impl DeviceKeyUploadActions + RegistrationActions), user_id: Option, username: String, farcaster_id: Option, ) -> Result { Ok(UserRegistrationInfo { username, flattened_device_key_upload: construct_flattened_device_key_upload( message, )?, user_id, farcaster_id, initial_device_list: message.get_and_verify_initial_device_list()?, }) } fn construct_user_login_info( user_id: String, username: String, opaque_server_login: comm_opaque2::server::Login, flattened_device_key_upload: FlattenedDeviceKeyUpload, device_to_remove: Option, ) -> Result { Ok(UserLoginInfo { user_id, username, flattened_device_key_upload, opaque_server_login, device_to_remove, }) } fn construct_flattened_device_key_upload( message: &impl DeviceKeyUploadActions, ) -> Result { let key_info = KeyPayload::from_str(&message.payload()?).map_err(|_| { tonic::Status::invalid_argument(tonic_status_messages::MALFORMED_PAYLOAD) })?; let flattened_device_key_upload = FlattenedDeviceKeyUpload { device_id_key: key_info.primary_identity_public_keys.ed25519, key_payload: message.payload()?, key_payload_signature: message.payload_signature()?, content_prekey: message.content_prekey()?, content_prekey_signature: message.content_prekey_signature()?, content_one_time_keys: message.one_time_content_prekeys()?, notif_prekey: message.notif_prekey()?, notif_prekey_signature: message.notif_prekey_signature()?, notif_one_time_keys: message.one_time_notif_prekeys()?, device_type: DeviceType::try_from(DBDeviceTypeInt(message.device_type()?))?, }; Ok(flattened_device_key_upload) } fn spawn_force_close_tb_session_task(device_ids: Vec) { tokio::spawn(async move { debug!( "Attempting to terminate Tunnelbroker sessions for devices: {:?}", device_ids.as_slice() ); let result = tunnelbroker::terminate_device_sessions(&device_ids).await; consume_error(result); }); } diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs index e9a55058c..46218e169 100644 --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -1,365 +1,370 @@ use tokio::time::Duration; // Secrets pub const SECRETS_DIRECTORY: &str = "secrets"; pub const SECRETS_SETUP_FILE: &str = "server_setup.txt"; // DynamoDB // User table information, supporting opaque_ke 2.0 and X3DH information // Users can sign in either through username+password or Eth wallet. // // This structure should be aligned with the messages defined in // shared/protos/identity_unauthenticated.proto // // Structure for a user should be: // { // userID: String, // opaqueRegistrationData: Option, // username: Option, // walletAddress: Option, // devices: HashMap // } // // A device is defined as: // { // deviceType: String, # client or keyserver // keyPayload: String, // keyPayloadSignature: String, // identityPreKey: String, // identityPreKeySignature: String, // identityOneTimeKeys: Vec, // notifPreKey: String, // notifPreKeySignature: String, // notifOneTimeKeys: Vec, // socialProof: Option // } // } // // Additional context: // "devices" uses the signing public identity key of the device as a key for the devices map // "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification) // if "deviceType" == "keyserver", then the device will not have any notif key information pub const USERS_TABLE: &str = "identity-users"; pub const USERS_TABLE_PARTITION_KEY: &str = "userID"; pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData"; pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username"; pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str = "deviceType"; pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress"; pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof"; pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str = "deviceListTimestamp"; pub const USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME: &str = "farcasterID"; pub const USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME: &str = "usernameLower"; pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index"; pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index"; pub const USERS_TABLE_FARCASTER_ID_INDEX: &str = "farcasterID-index"; pub const USERS_TABLE_USERNAME_LOWER_INDEX: &str = "usernameLower-index"; pub mod token_table { pub const NAME: &str = "identity-tokens"; pub const PARTITION_KEY: &str = "userID"; pub const SORT_KEY: &str = "signingPublicKey"; pub const ATTR_CREATED: &str = "created"; pub const ATTR_AUTH_TYPE: &str = "authType"; pub const ATTR_VALID: &str = "valid"; pub const ATTR_TOKEN: &str = "token"; } pub const NONCE_TABLE: &str = "identity-nonces"; pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce"; pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime"; pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; pub const WORKFLOWS_IN_PROGRESS_TABLE: &str = "identity-workflows-in-progress"; pub const WORKFLOWS_IN_PROGRESS_PARTITION_KEY: &str = "id"; pub const WORKFLOWS_IN_PROGRESS_WORKFLOW_ATTRIBUTE: &str = "workflow"; pub const WORKFLOWS_IN_PROGRESS_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; // Usernames reserved because they exist in Ashoat's keyserver already pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames"; pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username"; pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID"; pub const RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE: &str = "usernameLower"; pub const RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX: &str = "usernameLower-index"; pub const RESERVED_USERNAMES_TABLE_USER_ID_INDEX: &str = "userID-index"; // Users table social proof attribute pub const SOCIAL_PROOF_MESSAGE_ATTRIBUTE: &str = "siweMessage"; pub const SOCIAL_PROOF_SIGNATURE_ATTRIBUTE: &str = "siweSignature"; pub mod devices_table { /// table name pub const NAME: &str = "identity-devices"; pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index"; pub const DEVICE_ID_INDEX_NAME: &str = "deviceID-index"; /// partition key pub const ATTR_USER_ID: &str = "userID"; /// sort key pub const ATTR_ITEM_ID: &str = "itemID"; // itemID prefixes (one shouldn't be a prefix of the other) pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-"; pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-"; // device-specific attrs pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo"; pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey"; pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey"; pub const ATTR_PLATFORM_DETAILS: &str = "platformDetails"; pub const ATTR_LOGIN_TIME: &str = "loginTime"; // IdentityKeyInfo constants pub const ATTR_KEY_PAYLOAD: &str = "keyPayload"; pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature"; // PreKey constants pub const ATTR_PREKEY: &str = "preKey"; pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature"; // PlatformDetails constants pub const ATTR_DEVICE_TYPE: &str = "deviceType"; pub const ATTR_CODE_VERSION: &str = "codeVersion"; pub const ATTR_STATE_VERSION: &str = "stateVersion"; pub const ATTR_MAJOR_DESKTOP_VERSION: &str = "majorDesktopVersion"; // device-list-specific attrs pub const ATTR_TIMESTAMP: &str = "timestamp"; pub const ATTR_DEVICE_IDS: &str = "deviceIDs"; pub const ATTR_CURRENT_SIGNATURE: &str = "curPrimarySignature"; pub const ATTR_LAST_SIGNATURE: &str = "lastPrimarySignature"; // one-time key constants pub const ATTR_CONTENT_OTK_COUNT: &str = "contentOTKCount"; pub const ATTR_NOTIF_OTK_COUNT: &str = "notifOTKCount"; // deprecated attributes pub const OLD_ATTR_DEVICE_TYPE: &str = "deviceType"; pub const OLD_ATTR_CODE_VERSION: &str = "codeVersion"; } // One time keys table, which need to exist in their own table to ensure // atomicity of additions and removals pub mod one_time_keys_table { pub const NAME: &str = "identity-one-time-keys"; pub const PARTITION_KEY: &str = "userID#deviceID#olmAccount"; pub const SORT_KEY: &str = "timestamp#keyNumber"; pub const ATTR_ONE_TIME_KEY: &str = "oneTimeKey"; } // Tokio pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054"; pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004"; pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); // Token pub const ACCESS_TOKEN_LENGTH: usize = 512; // Temporary config pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN"; pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY"; // Nonce pub const NONCE_LENGTH: usize = 17; pub const NONCE_TTL_DURATION: Duration = Duration::from_secs(900); // seconds // Device list pub const DEVICE_LIST_TIMESTAMP_VALID_FOR: Duration = Duration::from_secs(300); // Workflows in progress pub const WORKFLOWS_IN_PROGRESS_TTL_DURATION: Duration = Duration::from_secs(120); // LocalStack pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; // OPAQUE Server Setup pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP"; // Identity Search pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT"; pub const DEFAULT_OPENSEARCH_ENDPOINT: &str = "identity-search-domain.us-east-2.opensearch.localhost.localstack.cloud:4566"; pub const IDENTITY_SEARCH_INDEX: &str = "users"; pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20; // Log Error Types pub mod error_types { pub const GENERIC_DB_LOG: &str = "DB Error"; pub const OTK_DB_LOG: &str = "One-time Key DB Error"; pub const DEVICE_LIST_DB_LOG: &str = "Device List DB Error"; pub const TOKEN_DB_LOG: &str = "Token DB Error"; pub const FARCASTER_DB_LOG: &str = "Farcaster DB Error"; pub const SYNC_LOG: &str = "Sync Error"; pub const SEARCH_LOG: &str = "Search Error"; pub const SIWE_LOG: &str = "SIWE Error"; pub const GRPC_SERVICES_LOG: &str = "gRPC Services Error"; pub const TUNNELBROKER_LOG: &str = "Tunnelbroker Error"; pub const HTTP_LOG: &str = "HTTP Error"; } // Tonic Status Messages pub mod tonic_status_messages { pub const UNEXPECTED_MESSAGE_DATA: &str = "unexpected_message_data"; pub const SIGNATURE_INVALID: &str = "signature_invalid"; pub const MALFORMED_KEY: &str = "malformed_key"; pub const VERIFICATION_FAILED: &str = "verification_failed"; pub const MALFORMED_PAYLOAD: &str = "malformed_payload"; pub const INVALID_DEVICE_LIST_PAYLOAD: &str = "invalid_device_list_payload"; pub const USERNAME_ALREADY_EXISTS: &str = "username_already_exists"; pub const USERNAME_RESERVED: &str = "username_reserved"; pub const WALLET_ADDRESS_TAKEN: &str = "wallet_address_taken"; pub const WALLET_ADDRESS_NOT_RESERVED: &str = "wallet_address_not_reserved"; pub const WALLET_ADDRESS_MISMATCH: &str = "wallet_address_mismatch"; pub const DEVICE_ID_ALREADY_EXISTS: &str = "device_id_already_exists"; pub const USER_NOT_FOUND: &str = "user_not_found"; pub const INVALID_NONCE: &str = "invalid_nonce"; pub const NONCE_EXPIRED: &str = "nonce_expired"; pub const FID_TAKEN: &str = "fid_taken"; pub const CANNOT_LINK_FID: &str = "cannot_link_fid"; pub const INVALID_PLATFORM_METADATA: &str = "invalid_platform_metadata"; pub const MISSING_CREDENTIALS: &str = "missing_credentials"; pub const BAD_CREDENTIALS: &str = "bad_credentials"; pub const SESSION_NOT_FOUND: &str = "session_not_found"; pub const INVALID_TIMESTAMP: &str = "invalid_timestamp"; pub const INVALID_USERNAME: &str = "invalid_username"; pub const USERNAME_NOT_RESERVED: &str = "username_not_reserved"; pub const NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME: &str = "need_keyserver_message_to_claim_username"; pub const UNEXPECTED_INITIAL_DEVICE_LIST: &str = "unexpected_initial_device_list"; pub const DEVICE_LIST_ERROR: &str = "device_list_error"; pub const DEVICE_NOT_IN_DEVICE_LIST: &str = "device_not_in_device_list"; pub const NO_IDENTIFIER_PROVIDED: &str = "no_identifier_provided"; pub const USER_ALREADY_HAS_KEYSERVER: &str = "user_already_has_keyserver"; pub const RETRY: &str = "retry"; pub const INVALID_DEVICE_LIST_UPDATE: &str = "invalid_device_list_update"; pub const INVALID_DEVICE_LIST_SIGNATURE: &str = "invalid_device_list_signature"; pub const UNEXPECTED_ERROR: &str = "unexpected_error"; pub const NO_DEVICE_LIST: &str = "no_device_list"; pub const USER_ID_MISSING: &str = "user_id_missing"; pub const DEVICE_ID_MISSING: &str = "device_id_missing"; pub const MISSING_CONTENT_KEYS: &str = "missing_content_keys"; pub const MISSING_NOTIF_KEYS: &str = "missing_notif_keys"; pub const KEYSERVER_NOT_FOUND: &str = "keyserver_not_found"; pub const PASSWORD_USER: &str = "password_user"; pub const WALLET_USER: &str = "wallet_user"; pub const INVALID_MESSAGE: &str = "invalid_message"; pub const INVALID_MESSAGE_FORMAT: &str = "invalid_message_format"; pub const MISSING_PLATFORM_OR_CODE_VERSION_METADATA: &str = "missing_platform_or_code_version_metadata"; pub const MISSING_KEY: &str = "missing_key"; pub const MESSAGE_NOT_AUTHENTICATED: &str = "message_not_authenticated"; pub const RETRY_FROM_NATIVE: &str = "retry_from_native"; pub const USER_IS_NOT_STAFF: &str = "user_is_not_staff"; } // Tunnelbroker pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT"; pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051"; // Backup pub const BACKUP_SERVICE_URL: &str = "BACKUP_SERVICE_URL"; pub const DEFAULT_BACKUP_SERVICE_URL: &str = "http://localhost:50052"; // Blob pub const BLOB_SERVICE_URL: &str = "BLOB_SERVICE_URL"; pub const DEFAULT_BLOB_SERVICE_URL: &str = "http://localhost:50053"; // X3DH key management // Threshold for requesting more one_time keys pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5; // Number of keys to be refreshed when below the threshold pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5; // Minimum supported code versions pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270; // Request metadata pub mod request_metadata { pub const CODE_VERSION: &str = "code_version"; pub const STATE_VERSION: &str = "state_version"; pub const MAJOR_DESKTOP_VERSION: &str = "major_desktop_version"; pub const DEVICE_TYPE: &str = "device_type"; pub const USER_ID: &str = "user_id"; pub const DEVICE_ID: &str = "device_id"; pub const ACCESS_TOKEN: &str = "access_token"; } // CORS pub mod cors { use std::time::Duration; pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] = ["grpc-status", "grpc-message", "grpc-status-details-bin"]; pub const DEFAULT_ALLOW_HEADERS: [&str; 12] = [ "x-grpc-web", "content-type", "x-user-agent", "grpc-timeout", "authorization", super::request_metadata::CODE_VERSION, super::request_metadata::STATE_VERSION, super::request_metadata::MAJOR_DESKTOP_VERSION, super::request_metadata::DEVICE_TYPE, super::request_metadata::USER_ID, super::request_metadata::DEVICE_ID, super::request_metadata::ACCESS_TOKEN, ]; pub const ALLOW_ORIGIN_LIST: &str = "ALLOW_ORIGIN_LIST"; pub const PROD_ORIGIN_HOST_STR: &str = "web.comm.app"; } // Tracing pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; pub const REDACT_SENSITIVE_DATA: &str = "REDACT_SENSITIVE_DATA"; // Regex pub const VALID_USERNAME_REGEX_STRING: &str = r"^[a-zA-Z0-9][a-zA-Z0-9-_]{0,190}$"; // Retry // TODO: Replace this with `ExponentialBackoffConfig` from `comm-lib` pub mod retry { pub const MAX_ATTEMPTS: usize = 8; pub const CONDITIONAL_CHECK_FAILED: &str = "ConditionalCheckFailed"; pub const TRANSACTION_CONFLICT: &str = "TransactionConflict"; } // One-time keys pub const ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT: usize = 24; pub const ONE_TIME_KEY_SIZE: usize = 43; // as defined in olm pub const MAX_ONE_TIME_KEYS: usize = 100; // as defined in olm + +// Comm staff +pub mod staff { + pub const STAFF_USER_IDS: [&str; 1] = ["256"]; +} diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index a45862f03..40811b17d 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,1053 +1,1129 @@ use std::collections::{HashMap, HashSet}; use crate::comm_service::{backup, blob, tunnelbroker}; use crate::config::CONFIG; use crate::database::{DeviceListUpdate, PlatformDetails}; use crate::device_list::validation::DeviceListValidator; use crate::device_list::SignedDeviceList; use crate::error::consume_error; use crate::log::redact_sensitive_data; use crate::{ client_service::{handle_db_error, WorkflowInProgress}, - constants::{error_types, request_metadata, tonic_status_messages}, + constants::{error_types, request_metadata, staff, tonic_status_messages}, database::DatabaseClient, grpc_services::shared::{get_platform_metadata, get_value}, }; use chrono::DateTime; use comm_lib::auth::AuthService; use comm_lib::blob::client::BlobServiceClient; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, trace}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, PrimaryDeviceLogoutRequest, PrivilegedDeleteUsersRequest, PrivilegedResetUserPasswordFinishRequest, PrivilegedResetUserPasswordStartRequest, PrivilegedResetUserPasswordStartResponse, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserDevicesPlatformDetails, UserIdentitiesRequest, UserIdentitiesResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, blob_client: BlobServiceClient, comm_auth_service: AuthService, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { trace!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { trace!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::MISSING_CREDENTIALS) })?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle.block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) })?; if !valid_token { return Err(Status::aborted(tonic_status_messages::BAD_CREDENTIALS)); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::USER_ID_MISSING) })?; let device_id = get_value(request, request_metadata::DEVICE_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::DEVICE_ID_MISSING) })?; Ok((user_id, device_id)) } fn spawn_delete_devices_services_data_task( blob_client: &BlobServiceClient, device_ids: Vec, ) { let blob_client = blob_client.clone(); tokio::spawn(async move { debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids.as_slice() ); let (tunnelbroker_result, blob_result) = tokio::join!( tunnelbroker::delete_devices_data(&device_ids), blob::remove_holders_for_devices(&blob_client, &device_ids) ); consume_error(tunnelbroker_result); consume_error(blob_result); }); } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { #[tracing::instrument(skip_all)] async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_key = message.new_content_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_CONTENT_KEYS) })?; let notif_key = message.new_notif_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_NOTIF_KEYS) })?; self .db_client .update_device_prekeys( user_id, device_id, content_key.into(), notif_key.into(), ) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } #[tracing::instrument(skip_all)] async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identity(user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } #[tracing::instrument(skip_all)] async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identity(&message.user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await? else { return Err(Status::not_found( tonic_status_messages::KEYSERVER_NOT_FOUND, )); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } #[tracing::instrument(skip_all)] async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await?; Ok(tonic::Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let Some((username, password_file)) = self .db_client .get_username_and_password_file(&user_id) .await? else { return Err(tonic::Status::permission_denied( tonic_status_messages::WALLET_USER, )); }; let message = request.into_inner(); let mut server_login = comm_opaque2::server::Login::new(); let login_response = server_login .start( &CONFIG.server_setup, &password_file, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let registration_response = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdatePasswordInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(Box::new(update_state))) .await?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: registration_response, opaque_login_response: login_response, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(user_id, password_file) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self.db_client.remove_device(&user_id, &device_id).await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; let device_list = self .db_client .get_current_device_list(&user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition("no device list")); }; tokio::spawn(async move { debug!( "Sending device list updates to {:?}", device_list.device_ids ); let device_ids: Vec<&str> = device_list.device_ids.iter().map(AsRef::as_ref).collect(); let result = tunnelbroker::send_device_list_update(&device_ids).await; consume_error(result); }); let blob_client = self.authenticated_blob_client().await?; spawn_delete_devices_services_data_task(&blob_client, [device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_primary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!( "Primary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; // Get and verify singleton device list let parsed_device_list: SignedDeviceList = message.signed_device_list.parse()?; let update_payload = DeviceListUpdate::try_from(parsed_device_list)?; crate::device_list::verify_singleton_device_list( &update_payload, &device_id, None, )?; self .db_client .apply_devicelist_update( &user_id, update_payload, // - We've already validated the list so no need to do it here. // - Need to pass the type because it cannot be inferred from None None::, // We don't want side effects - we'll take care of removing devices // on our own. (Side effect would skip the primary device). false, ) .await?; debug!(user_id, "Attempting to delete user's access tokens"); self.db_client.delete_all_tokens_for_user(&user_id).await?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self .db_client .delete_otks_table_rows_for_user(&user_id) .await?; debug!(user_id, "Attempting to delete user's devices"); let device_ids = self .db_client .delete_devices_data_for_user(&user_id) .await?; let blob_client = self.authenticated_blob_client().await?; spawn_delete_devices_services_data_task(&blob_client, device_ids); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_secondary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; debug!( "Secondary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Secondary, ) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; let blob_client = self.authenticated_blob_client().await?; spawn_delete_devices_services_data_task(&blob_client, [device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); let user_is_password_authenticated = self .db_client .user_is_password_authenticated(&user_id) .await?; if user_is_password_authenticated { return Err(tonic::Status::permission_denied( tonic_status_messages::PASSWORD_USER, )); } self.delete_services_data_for_user(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = DeletePasswordUserInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; self.delete_services_data_for_user(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn privileged_delete_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { - const STAFF_USER_IDS: [&str; 1] = ["256"]; - let (user_id, _) = get_user_and_device_id(&request)?; - if !STAFF_USER_IDS.contains(&user_id.as_str()) { + if !staff::STAFF_USER_IDS.contains(&user_id.as_str()) { return Err(Status::permission_denied( tonic_status_messages::USER_IS_NOT_STAFF, )); } for user_id_to_delete in request.into_inner().user_ids { self .delete_services_data_for_user(&user_id_to_delete) .await?; self.db_client.delete_user(user_id_to_delete).await?; } let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn privileged_reset_user_password_start( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result< tonic::Response, tonic::Status, > { - unimplemented!() + let (staff_user_id, _) = get_user_and_device_id(&request)?; + if !staff::STAFF_USER_IDS.contains(&staff_user_id.as_str()) { + return Err(Status::permission_denied( + tonic_status_messages::USER_IS_NOT_STAFF, + )); + } + + let message = request.into_inner(); + debug!( + "Attempting to start resetting password for user: {:?}", + &message.username + ); + + let user_id_and_password_file = self + .db_client + .get_user_info_and_password_file_from_username(&message.username) + .await? + .ok_or(tonic::Status::not_found( + tonic_status_messages::USER_NOT_FOUND, + ))?; + + let server_registration = comm_opaque2::server::Registration::new(); + let registration_response = server_registration + .start( + &CONFIG.server_setup, + &message.opaque_registration_request, + &message.username.to_lowercase().as_bytes(), + ) + .map_err(protocol_error_to_grpc_status)?; + + let reset_state = + PrivilegedPasswordResetInfo::new(user_id_and_password_file.user_id); + let session_id = self + .db_client + .insert_workflow(WorkflowInProgress::PrivilegedPasswordReset(Box::new( + reset_state, + ))) + .await?; + + let response = PrivilegedResetUserPasswordStartResponse { + session_id, + opaque_registration_response: registration_response, + }; + Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn privileged_reset_user_password_finish( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - unimplemented!() + let (staff_user_id, _) = get_user_and_device_id(&request)?; + if !staff::STAFF_USER_IDS.contains(&staff_user_id.as_str()) { + return Err(Status::permission_denied( + tonic_status_messages::USER_IS_NOT_STAFF, + )); + } + + let message = request.into_inner(); + + let Some(WorkflowInProgress::PrivilegedPasswordReset(state)) = + self.db_client.get_workflow(message.session_id).await? + else { + return Err(tonic::Status::not_found( + tonic_status_messages::SESSION_NOT_FOUND, + )); + }; + + let server_registration = comm_opaque2::server::Registration::new(); + let password_file = server_registration + .finish(&message.opaque_registration_upload) + .map_err(protocol_error_to_grpc_status)?; + + self + .db_client + .update_user_password(state.user_id, password_file) + .await?; + + let response = Empty {}; + Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::from_timestamp_millis(timestamp).ok_or_else(|| { tonic::Status::invalid_argument( tonic_status_messages::INVALID_TIMESTAMP, ) }) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(SignedDeviceList::try_from) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } #[tracing::instrument(skip_all)] async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); let request_count = user_ids.len(); let user_ids: HashSet = user_ids.into_iter().collect(); debug!( "Requesting device lists and platform details for {} users ({} unique)", request_count, user_ids.len() ); // 1. Fetch device lists let device_lists = self.db_client.get_current_device_lists(user_ids).await?; trace!("Found device lists for {} users", device_lists.keys().len()); // 2. Fetch platform details let flattened_user_device_ids: Vec<(String, String)> = device_lists .iter() .flat_map(|(user_id, device_list)| { device_list .device_ids .iter() .map(|device_id| (user_id.clone(), device_id.clone())) .collect::>() }) .collect(); let platform_details = self .db_client .get_devices_platform_details(flattened_user_device_ids) .await?; trace!( "Found platform details for {} users", platform_details.keys().len() ); // 3. Prepare output format let users_device_lists: HashMap = device_lists .into_iter() .map(|(user_id, device_list_row)| { let signed_list = SignedDeviceList::try_from(device_list_row)?; let serialized_list = signed_list.as_json_string()?; Ok((user_id, serialized_list)) }) .collect::>()?; let users_devices_platform_details = platform_details .into_iter() .map(|(user_id, devices_map)| { (user_id, UserDevicesPlatformDetails::from(devices_map)) }) .collect(); let response = PeersDeviceListsResponse { users_device_lists, users_devices_platform_details, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; let validator = crate::device_list::validation::update_device_list_rpc_validator; self .db_client .apply_devicelist_update(&user_id, update, Some(validator), true) .await?; Ok(Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await?; if get_farcaster_users_response.len() > 1 { error!( errorType = error_types::GRPC_SERVICES_LOG, "multiple users associated with the same Farcaster ID" ); return Err(Status::failed_precondition( tonic_status_messages::CANNOT_LINK_FID, )); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists(tonic_status_messages::FID_TAKEN)); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self.db_client.remove_farcaster_id(user_id).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn find_user_identities( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_ids: HashSet = message.user_ids.into_iter().collect(); let users_table_results = self .db_client .find_db_user_identities(user_ids.clone()) .await?; // Look up only user IDs that haven't been found in users table let reserved_user_ids_to_query: Vec = user_ids .into_iter() .filter(|user_id| !users_table_results.contains_key(user_id)) .collect(); let reserved_user_identifiers = self .db_client .query_reserved_usernames_by_user_ids(reserved_user_ids_to_query) .await?; let identities = users_table_results .into_iter() .map(|(user_id, identifier)| (user_id, identifier.into())) .collect(); let response = UserIdentitiesResponse { identities, reserved_user_identifiers, }; return Ok(Response::new(response)); } #[tracing::instrument(skip_all)] async fn sync_platform_details( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let platform_metadata = get_platform_metadata(&request)?; let platform_details = PlatformDetails::new(platform_metadata, None) .map_err(|_| { Status::invalid_argument( tonic_status_messages::INVALID_PLATFORM_METADATA, ) })?; self .db_client .update_device_platform_details(user_id, device_id, platform_details) .await?; Ok(Response::new(Empty {})) } } #[allow(dead_code)] enum DeviceListItemKind { Any, Primary, Secondary, } impl AuthenticatedService { async fn verify_device_on_device_list( &self, user_id: &String, device_id: &String, device_kind: DeviceListItemKind, ) -> Result<(), tonic::Status> { let device_list = self .db_client .get_current_device_list(user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, )); }; use DeviceListItemKind as DeviceKind; let device_on_list = match device_kind { DeviceKind::Any => device_list.has_device(device_id), DeviceKind::Primary => device_list.is_primary_device(device_id), DeviceKind::Secondary => device_list.has_secondary_device(device_id), }; if !device_on_list { debug!( "Device {} not in device list for user {}", device_id, user_id ); return Err(Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } Ok(()) } async fn delete_services_data_for_user( &self, user_id: &str, ) -> Result<(), Status> { debug!("Attempting to delete Backup data for user: {}", &user_id); let (device_list_result, delete_backup_result) = tokio::join!( self.db_client.get_current_device_list(user_id), backup::delete_backup_user_data(user_id, &self.comm_auth_service) ); let device_ids = device_list_result? .map(|list| list.device_ids) .unwrap_or_default(); delete_backup_result?; debug!( "Attempting to delete Blob holders and Tunnelbroker data for devices: {:?}", device_ids ); let (tunnelbroker_result, blob_client_result) = tokio::join!( tunnelbroker::delete_devices_data(&device_ids), self.authenticated_blob_client() ); tunnelbroker_result?; let blob_client = blob_client_result?; blob::remove_holders_for_devices(&blob_client, &device_ids).await?; Ok(()) } /// Retrieves [`BlobServiceClient`] authenticated with a service-to-service token async fn authenticated_blob_client( &self, ) -> Result { let s2s_token = self .comm_auth_service .get_services_token() .await .map_err(|err| { tracing::error!( errorType = error_types::HTTP_LOG, "Failed to retrieve service-to-service token: {err:?}", ); tonic::Status::aborted(tonic_status_messages::UNEXPECTED_ERROR) })?; let blob_client = self.blob_client.with_authentication(s2s_token.into()); Ok(blob_client) } } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct UpdatePasswordInfo { pub opaque_server_login: comm_opaque2::server::Login, } + +#[derive( + Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, +)] +pub struct PrivilegedPasswordResetInfo { + pub user_id: String, +}