diff --git a/services/identity/src/client_service.rs b/services/identity/src/client_service.rs index 343e96d29..3c78a0e32 100644 --- a/services/identity/src/client_service.rs +++ b/services/identity/src/client_service.rs @@ -1,1224 +1,1237 @@ // Standard library imports use std::str::FromStr; // External crate imports use comm_lib::aws::DynamoDBError; use comm_lib::shared::reserved_users::RESERVED_USERNAME_SET; use comm_opaque2::grpc::protocol_error_to_grpc_status; use rand::rngs::OsRng; use serde::{Deserialize, Serialize}; use siwe::eip55; use tonic::Response; use tracing::{debug, error, info, warn}; // Workspace crate imports use crate::config::CONFIG; use crate::constants::{error_types, tonic_status_messages}; use crate::database::{ - DBDeviceTypeInt, DatabaseClient, DeviceType, KeyPayload + DBDeviceTypeInt, DatabaseClient, DeviceType, KeyPayload, UserInfoAndPasswordFile }; use crate::device_list::SignedDeviceList; use crate::error::{DeviceListError, Error as DBError}; use crate::grpc_services::authenticated::{DeletePasswordUserInfo, UpdatePasswordInfo}; use crate::grpc_services::protos::unauth::{ find_user_id_request, AddReservedUsernamesRequest, AuthResponse, Empty, ExistingDeviceLoginRequest, FindUserIdRequest, FindUserIdResponse, GenerateNonceResponse, OpaqueLoginFinishRequest, OpaqueLoginStartRequest, OpaqueLoginStartResponse, RegistrationFinishRequest, RegistrationStartRequest, RegistrationStartResponse, RemoveReservedUsernameRequest, ReservedRegistrationStartRequest, SecondaryDeviceKeysUploadRequest, VerifyUserAccessTokenRequest, VerifyUserAccessTokenResponse, WalletAuthRequest, GetFarcasterUsersRequest, GetFarcasterUsersResponse }; use crate::grpc_services::shared::get_platform_metadata; use crate::grpc_utils::{ DeviceKeyUploadActions, RegistrationActions, SignedNonce }; use crate::nonce::generate_nonce_data; use crate::reserved_users::{ validate_account_ownership_message_and_get_user_id, validate_add_reserved_usernames_message, validate_remove_reserved_username_message, }; use crate::siwe::{ is_valid_ethereum_address, parse_and_verify_siwe_message, SocialProof, }; use crate::token::{AccessTokenData, AuthType}; pub use crate::grpc_services::protos::unauth::identity_client_service_server::{ IdentityClientService, IdentityClientServiceServer, }; use crate::regex::is_valid_username; #[derive(Clone, Serialize, Deserialize)] pub enum WorkflowInProgress { Registration(Box), Login(Box), Update(Box), PasswordUserDeletion(Box), } #[derive(Clone, Serialize, Deserialize)] pub struct UserRegistrationInfo { pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub user_id: Option, pub farcaster_id: Option, pub initial_device_list: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct UserLoginInfo { pub user_id: String, pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub opaque_server_login: comm_opaque2::server::Login, pub device_to_remove: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct FlattenedDeviceKeyUpload { pub device_id_key: String, pub key_payload: String, pub key_payload_signature: String, pub content_prekey: String, pub content_prekey_signature: String, pub content_one_time_keys: Vec, pub notif_prekey: String, pub notif_prekey_signature: String, pub notif_one_time_keys: Vec, pub device_type: DeviceType, } #[derive(derive_more::Constructor)] pub struct ClientService { client: DatabaseClient, } #[tonic::async_trait] impl IdentityClientService for ClientService { #[tracing::instrument(skip_all)] async fn register_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Received registration request for: {}", message.username); if !is_valid_username(&message.username) || is_valid_ethereum_address(&message.username) { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_USERNAME, )); } self.check_username_taken(&message.username).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&message.username) .await .map_err(handle_db_error)? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } - if RESERVED_USERNAME_SET.contains(&message.username) { + if RESERVED_USERNAME_SET.contains(&message.username.to_lowercase()) { return Err(tonic::Status::invalid_argument( tonic_status_messages::USERNAME_RESERVED, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let registration_state = construct_user_registration_info( &message, None, message.username.clone(), message.farcaster_id.clone(), )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, - message.username.as_bytes(), + message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await .map_err(handle_db_error)?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_reserved_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); self.check_username_taken(&message.username).await?; - if RESERVED_USERNAME_SET.contains(&message.username) { + if RESERVED_USERNAME_SET.contains(&message.username.to_lowercase()) { return Err(tonic::Status::invalid_argument( tonic_status_messages::USERNAME_RESERVED, )); } - let username_in_reserved_usernames_table = self + let Some(original_username) = self .client - .get_user_id_from_reserved_usernames_table(&message.username) + .get_original_username_from_reserved_usernames_table(&message.username) .await .map_err(handle_db_error)? - .is_some(); - if !username_in_reserved_usernames_table { + else { return Err(tonic::Status::permission_denied( tonic_status_messages::USERNAME_NOT_RESERVED, )); - } + }; let user_id = validate_account_ownership_message_and_get_user_id( &message.username, &message.keyserver_message, &message.keyserver_signature, )?; let registration_state = construct_user_registration_info( &message, Some(user_id), - message.username.clone(), + original_username, None, )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, - message.username.as_bytes(), + message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await .map_err(handle_db_error)?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); if let Some(WorkflowInProgress::Registration(state)) = self .client .get_workflow(message.session_id) .await .map_err(handle_db_error)? { let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; let login_time = chrono::Utc::now(); let device_id = state.flattened_device_key_upload.device_id_key.clone(); let username = state.username.clone(); let user_id = self .client .add_password_user_to_users_table( *state, password_file, platform_metadata, login_time, ) .await .map_err(handle_db_error)?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } else { Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )) } } #[tracing::instrument(skip_all)] async fn log_in_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Attempting to log in user: {:?}", &message.username); let user_id_and_password_file = self .client - .get_user_id_and_password_file_from_username(&message.username) + .get_user_info_and_password_file_from_username(&message.username) .await .map_err(handle_db_error)?; - let (user_id, password_file_bytes) = - if let Some(data) = user_id_and_password_file { - data - } else { - // It's possible that the user attempting login is already registered - // on Ashoat's keyserver. If they are, we should send back a gRPC status - // code instructing them to get a signed message from Ashoat's keyserver - // in order to claim their username and register with the Identity - // service. - let username_in_reserved_usernames_table = self - .client - .get_user_id_from_reserved_usernames_table(&message.username) - .await - .map_err(handle_db_error)? - .is_some(); - - if username_in_reserved_usernames_table { - return Err(tonic::Status::permission_denied( - tonic_status_messages::NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME, - )); - } + let UserInfoAndPasswordFile { + user_id, + original_username: username, + password_file: password_file_bytes, + } = if let Some(data) = user_id_and_password_file { + data + } else { + // It's possible that the user attempting login is already registered + // on Ashoat's keyserver. If they are, we should send back a gRPC status + // code instructing them to get a signed message from Ashoat's keyserver + // in order to claim their username and register with the Identity + // service. + let username_in_reserved_usernames_table = self + .client + .get_user_id_from_reserved_usernames_table(&message.username) + .await + .map_err(handle_db_error)? + .is_some(); - return Err(tonic::Status::not_found( - tonic_status_messages::USER_NOT_FOUND, + if username_in_reserved_usernames_table { + return Err(tonic::Status::permission_denied( + tonic_status_messages::NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME, )); - }; + } + + return Err(tonic::Status::not_found( + tonic_status_messages::USER_NOT_FOUND, + )); + }; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let maybe_device_to_remove = self .get_keyserver_device_to_remove( &user_id, &flattened_device_key_upload.device_id_key, message.force.unwrap_or(false), &flattened_device_key_upload.device_type, ) .await?; let mut server_login = comm_opaque2::server::Login::new(); - let server_response = server_login - .start( - &CONFIG.server_setup, - &password_file_bytes, - &message.opaque_login_request, - message.username.as_bytes(), - ) - .map_err(protocol_error_to_grpc_status)?; + let server_response = match server_login.start( + &CONFIG.server_setup, + &password_file_bytes, + &message.opaque_login_request, + message.username.to_lowercase().as_bytes(), + ) { + Ok(response) => response, + Err(_) => { + // Retry with original username bytes if the first attempt fails + server_login + .start( + &CONFIG.server_setup, + &password_file_bytes, + &message.opaque_login_request, + username.as_bytes(), + ) + .map_err(protocol_error_to_grpc_status)? + } + }; let login_state = construct_user_login_info( user_id, - message.username, + username, server_login, flattened_device_key_upload, maybe_device_to_remove, )?; let session_id = self .client .insert_workflow(WorkflowInProgress::Login(Box::new(login_state))) .await .map_err(handle_db_error)?; let response = Response::new(OpaqueLoginStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn log_in_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Login(state)) = self .client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; if let Some(device_to_remove) = state.device_to_remove { self .client .remove_device(state.user_id.clone(), device_to_remove) .await .map_err(handle_db_error)?; } let login_time = chrono::Utc::now(); self .client .add_user_device( state.user_id.clone(), state.flattened_device_key_upload.clone(), platform_metadata, login_time, ) .await .map_err(handle_db_error)?; // Create access token let token = AccessTokenData::with_created_time( state.user_id.clone(), state.flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id: state.user_id, access_token, username: state.username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); // WalletAuthRequest is used for both log_in_wallet_user and register_wallet_user if !message.initial_device_list.is_empty() { return Err(tonic::Status::invalid_argument( tonic_status_messages::UNEXPECTED_INITIAL_DEVICE_LIST, )); } let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, )?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let login_time = chrono::Utc::now(); let user_id = if let Some(user_id) = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await .map_err(handle_db_error)? { // It's possible that the user attempting login is already registered // on Ashoat's keyserver. If they are, we should try to register them if // they're on a mobile device, otherwise we should send back a gRPC status // code instructing them to try logging in from a mobile device first. if platform_metadata.device_type.to_uppercase() != "ANDROID" && platform_metadata.device_type.to_uppercase() != "IOS" { return Err(tonic::Status::permission_denied( tonic_status_messages::RETRY_FROM_NATIVE, )); }; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, Some(user_id.clone()), platform_metadata, login_time, message.farcaster_id, None, ) .await .map_err(handle_db_error)?; user_id } else { let Some(user_id) = self .client .get_user_id_from_user_info(wallet_address.clone(), &AuthType::Wallet) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_user_device( user_id.clone(), flattened_device_key_upload.clone(), platform_metadata, chrono::Utc::now(), ) .await .map_err(handle_db_error)?; user_id }; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, )?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); self.check_wallet_address_taken(&wallet_address).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await .map_err(handle_db_error)? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, None) .await?; let login_time = chrono::Utc::now(); let initial_device_list = message.get_and_verify_initial_device_list()?; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); let user_id = self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, None, platform_metadata, login_time, message.farcaster_id, initial_device_list, ) .await .map_err(handle_db_error)?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn upload_keys_for_registered_device_and_log_in( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let user_id = message.user_id; let device_id = flattened_device_key_upload.device_id_key.clone(); let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let user_identity = self .client .get_user_identity(&user_id) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(device_list) = self .client .get_current_device_list(&user_id) .await .map_err(handle_db_error)? else { warn!("User {} does not have valid device list. Secondary device auth impossible.", user_id); return Err(tonic::Status::aborted( tonic_status_messages::DEVICE_LIST_ERROR, )); }; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; self .client .put_device_data( &user_id, flattened_device_key_upload, platform_metadata, login_time, ) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_existing_device( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let ExistingDeviceLoginRequest { user_id, device_id, .. } = message; let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; let (identity_response, device_list_response) = tokio::join!( self.client.get_user_identity(&user_id), self.client.get_current_device_list(&user_id) ); let user_identity = identity_response.map_err(handle_db_error)?.ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let device_list = device_list_response .map_err(handle_db_error)? .ok_or_else(|| { warn!("User {} does not have a valid device list.", user_id); tonic::Status::aborted(tonic_status_messages::DEVICE_LIST_ERROR) })?; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self .client .put_access_token_data(token) .await .map_err(handle_db_error)?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn generate_nonce( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let nonce_data = generate_nonce_data(&mut OsRng); match self .client .add_nonce_to_nonces_table(nonce_data.clone()) .await { Ok(_) => Ok(Response::new(GenerateNonceResponse { nonce: nonce_data.nonce, })), Err(e) => Err(handle_db_error(e)), } } #[tracing::instrument(skip_all)] async fn verify_user_access_token( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Verifying device: {}", &message.device_id); let token_valid = self .client .verify_access_token( message.user_id, message.device_id.clone(), message.access_token, ) .await .map_err(handle_db_error)?; let response = Response::new(VerifyUserAccessTokenResponse { token_valid }); debug!( "device {} was verified: {}", &message.device_id, token_valid ); Ok(response) } #[tracing::instrument(skip_all)] async fn add_reserved_usernames( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_details = validate_add_reserved_usernames_message( &message.message, &message.signature, )?; let filtered_user_details = self .client .filter_out_taken_usernames(user_details) .await .map_err(handle_db_error)?; self .client .add_usernames_to_reserved_usernames_table(filtered_user_details) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn remove_reserved_username( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let username = validate_remove_reserved_username_message( &message.message, &message.signature, )?; self .client .delete_username_from_reserved_usernames_table(username) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn ping( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn find_user_id( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); use find_user_id_request::Identifier; let (user_ident, auth_type) = match message.identifier { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::NO_IDENTIFIER_PROVIDED, )) } Some(Identifier::Username(username)) => (username, AuthType::Password), Some(Identifier::WalletAddress(address)) => (address, AuthType::Wallet), }; let (get_user_id_from_reserved_usernames_table_result, user_id_result) = tokio::join!( self .client .get_user_id_from_reserved_usernames_table(&user_ident), self .client .get_user_id_from_user_info(user_ident.clone(), &auth_type), ); let is_reserved = get_user_id_from_reserved_usernames_table_result .map_err(handle_db_error)? .is_some(); let user_id = user_id_result.map_err(handle_db_error)?; Ok(Response::new(FindUserIdResponse { user_id, is_reserved, })) } #[tracing::instrument(skip_all)] async fn get_farcaster_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let farcaster_users = self .client .get_farcaster_users(message.farcaster_ids) .await .map_err(handle_db_error)? .into_iter() .map(|d| d.0) .collect(); Ok(Response::new(GetFarcasterUsersResponse { farcaster_users })) } } impl ClientService { async fn check_username_taken( &self, username: &str, ) -> Result<(), tonic::Status> { let username_taken = self .client .username_taken(username.to_string()) .await .map_err(handle_db_error)?; if username_taken { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } Ok(()) } async fn check_wallet_address_taken( &self, wallet_address: &str, ) -> Result<(), tonic::Status> { let wallet_address_taken = self .client .wallet_address_taken(wallet_address.to_string()) .await .map_err(handle_db_error)?; if wallet_address_taken { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } Ok(()) } async fn check_farcaster_id_taken( &self, farcaster_id: &str, ) -> Result<(), tonic::Status> { let fid_already_registered = !self .client .get_farcaster_users(vec![farcaster_id.to_string()]) .await .map_err(handle_db_error)? .is_empty(); if fid_already_registered { return Err(tonic::Status::already_exists( tonic_status_messages::FID_TAKEN, )); } Ok(()) } async fn check_device_id_taken( &self, key_upload: &FlattenedDeviceKeyUpload, requesting_user_id: Option<&str>, ) -> Result<(), tonic::Status> { let device_id = key_upload.device_id_key.as_str(); let Some(existing_device_user_id) = self .client .find_user_id_for_device(device_id) .await .map_err(handle_db_error)? else { // device ID doesn't exist return Ok(()); }; // allow already-existing device ID for the same user match requesting_user_id { Some(user_id) if user_id == existing_device_user_id => { debug!( "Found already-existing device {} for user {}", device_id, user_id ); Ok(()) } _ => { warn!("Device ID already exists: {device_id}"); Err(tonic::Status::already_exists( tonic_status_messages::DEVICE_ID_ALREADY_EXISTS, )) } } } async fn verify_and_remove_nonce( &self, nonce: &str, ) -> Result<(), tonic::Status> { match self .client .get_nonce_from_nonces_table(nonce) .await .map_err(handle_db_error)? { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_NONCE, )) } Some(nonce) if nonce.is_expired() => { // we don't need to remove the nonce from the table here // because the DynamoDB TTL will take care of it return Err(tonic::Status::aborted( tonic_status_messages::NONCE_EXPIRED, )); } Some(nonce_data) => self .client .remove_nonce_from_nonces_table(&nonce_data.nonce) .await .map_err(handle_db_error)?, }; Ok(()) } async fn get_keyserver_device_to_remove( &self, user_id: &str, new_keyserver_device_id: &str, force: bool, device_type: &DeviceType, ) -> Result, tonic::Status> { if device_type != &DeviceType::Keyserver { return Ok(None); } let maybe_keyserver_device_id = self .client .get_keyserver_device_id_for_user(user_id) .await .map_err(handle_db_error)?; let Some(existing_keyserver_device_id) = maybe_keyserver_device_id else { return Ok(None); }; if new_keyserver_device_id == existing_keyserver_device_id { return Ok(None); } if force { info!( "keyserver {} will be removed from the device list", existing_keyserver_device_id ); Ok(Some(existing_keyserver_device_id)) } else { Err(tonic::Status::already_exists( tonic_status_messages::USER_ALREADY_HAS_KEYSERVER, )) } } } #[tracing::instrument(skip_all)] pub fn handle_db_error(db_error: DBError) -> tonic::Status { match db_error { DBError::AwsSdk(DynamoDBError::InternalServerError(_)) | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException( _, )) | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => { tonic::Status::unavailable(tonic_status_messages::RETRY) } DBError::DeviceList(DeviceListError::InvalidDeviceListUpdate) => { tonic::Status::invalid_argument( tonic_status_messages::INVALID_DEVICE_LIST_UPDATE, ) } DBError::DeviceList(DeviceListError::InvalidSignature) => { tonic::Status::invalid_argument( tonic_status_messages::INVALID_DEVICE_LIST_SIGNATURE, ) } e => { error!( errorType = error_types::GENERIC_DB_LOG, "Encountered an unexpected error: {}", e ); tonic::Status::failed_precondition( tonic_status_messages::UNEXPECTED_ERROR, ) } } } fn construct_user_registration_info( message: &(impl DeviceKeyUploadActions + RegistrationActions), user_id: Option, username: String, farcaster_id: Option, ) -> Result { Ok(UserRegistrationInfo { username, flattened_device_key_upload: construct_flattened_device_key_upload( message, )?, user_id, farcaster_id, initial_device_list: message.get_and_verify_initial_device_list()?, }) } fn construct_user_login_info( user_id: String, username: String, opaque_server_login: comm_opaque2::server::Login, flattened_device_key_upload: FlattenedDeviceKeyUpload, device_to_remove: Option, ) -> Result { Ok(UserLoginInfo { user_id, username, flattened_device_key_upload, opaque_server_login, device_to_remove, }) } fn construct_flattened_device_key_upload( message: &impl DeviceKeyUploadActions, ) -> Result { let key_info = KeyPayload::from_str(&message.payload()?).map_err(|_| { tonic::Status::invalid_argument(tonic_status_messages::MALFORMED_PAYLOAD) })?; let flattened_device_key_upload = FlattenedDeviceKeyUpload { device_id_key: key_info.primary_identity_public_keys.ed25519, key_payload: message.payload()?, key_payload_signature: message.payload_signature()?, content_prekey: message.content_prekey()?, content_prekey_signature: message.content_prekey_signature()?, content_one_time_keys: message.one_time_content_prekeys()?, notif_prekey: message.notif_prekey()?, notif_prekey_signature: message.notif_prekey_signature()?, notif_one_time_keys: message.one_time_notif_prekeys()?, device_type: DeviceType::try_from(DBDeviceTypeInt(message.device_type()?)) .map_err(handle_db_error)?, }; Ok(flattened_device_key_upload) } diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs index 1eda91ff9..eccc29f1f 100644 --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -1,346 +1,352 @@ use tokio::time::Duration; // Secrets pub const SECRETS_DIRECTORY: &str = "secrets"; pub const SECRETS_SETUP_FILE: &str = "server_setup.txt"; // DynamoDB // User table information, supporting opaque_ke 2.0 and X3DH information // Users can sign in either through username+password or Eth wallet. // // This structure should be aligned with the messages defined in // shared/protos/identity_unauthenticated.proto // // Structure for a user should be: // { // userID: String, // opaqueRegistrationData: Option, // username: Option, // walletAddress: Option, // devices: HashMap // } // // A device is defined as: // { // deviceType: String, # client or keyserver // keyPayload: String, // keyPayloadSignature: String, // identityPreKey: String, // identityPreKeySignature: String, // identityOneTimeKeys: Vec, // notifPreKey: String, // notifPreKeySignature: String, // notifOneTimeKeys: Vec, // socialProof: Option // } // } // // Additional context: // "devices" uses the signing public identity key of the device as a key for the devices map // "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification) // if "deviceType" == "keyserver", then the device will not have any notif key information pub const USERS_TABLE: &str = "identity-users"; pub const USERS_TABLE_PARTITION_KEY: &str = "userID"; pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData"; pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username"; pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str = "deviceType"; pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress"; pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof"; pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str = "deviceListTimestamp"; pub const USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME: &str = "farcasterID"; +pub const USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME: &str = "usernameLower"; pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index"; pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index"; pub const USERS_TABLE_FARCASTER_ID_INDEX: &str = "farcasterID-index"; +pub const USERS_TABLE_USERNAME_LOWER_INDEX: &str = "usernameLower-index"; pub mod token_table { pub const NAME: &str = "identity-tokens"; pub const PARTITION_KEY: &str = "userID"; pub const SORT_KEY: &str = "signingPublicKey"; pub const ATTR_CREATED: &str = "created"; pub const ATTR_AUTH_TYPE: &str = "authType"; pub const ATTR_VALID: &str = "valid"; pub const ATTR_TOKEN: &str = "token"; } pub const NONCE_TABLE: &str = "identity-nonces"; pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce"; pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime"; pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; pub const WORKFLOWS_IN_PROGRESS_TABLE: &str = "identity-workflows-in-progress"; pub const WORKFLOWS_IN_PROGRESS_PARTITION_KEY: &str = "id"; pub const WORKFLOWS_IN_PROGRESS_WORKFLOW_ATTRIBUTE: &str = "workflow"; pub const WORKFLOWS_IN_PROGRESS_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; // Usernames reserved because they exist in Ashoat's keyserver already pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames"; pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username"; pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID"; +pub const RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE: &str = + "usernameLower"; +pub const RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX: &str = + "usernameLower-index"; // Users table social proof attribute pub const SOCIAL_PROOF_MESSAGE_ATTRIBUTE: &str = "siweMessage"; pub const SOCIAL_PROOF_SIGNATURE_ATTRIBUTE: &str = "siweSignature"; pub mod devices_table { /// table name pub const NAME: &str = "identity-devices"; pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index"; pub const DEVICE_ID_INDEX_NAME: &str = "deviceID-index"; /// partition key pub const ATTR_USER_ID: &str = "userID"; /// sort key pub const ATTR_ITEM_ID: &str = "itemID"; // itemID prefixes (one shouldn't be a prefix of the other) pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-"; pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-"; // device-specific attrs pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo"; pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey"; pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey"; pub const ATTR_PLATFORM_DETAILS: &str = "platformDetails"; pub const ATTR_LOGIN_TIME: &str = "loginTime"; // IdentityKeyInfo constants pub const ATTR_KEY_PAYLOAD: &str = "keyPayload"; pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature"; // PreKey constants pub const ATTR_PREKEY: &str = "preKey"; pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature"; // PlatformDetails constants pub const ATTR_DEVICE_TYPE: &str = "deviceType"; pub const ATTR_CODE_VERSION: &str = "codeVersion"; pub const ATTR_STATE_VERSION: &str = "stateVersion"; pub const ATTR_MAJOR_DESKTOP_VERSION: &str = "majorDesktopVersion"; // device-list-specific attrs pub const ATTR_TIMESTAMP: &str = "timestamp"; pub const ATTR_DEVICE_IDS: &str = "deviceIDs"; pub const ATTR_CURRENT_SIGNATURE: &str = "curPrimarySignature"; pub const ATTR_LAST_SIGNATURE: &str = "lastPrimarySignature"; // one-time key constants pub const ATTR_CONTENT_OTK_COUNT: &str = "contentOTKCount"; pub const ATTR_NOTIF_OTK_COUNT: &str = "notifOTKCount"; // deprecated attributes pub const OLD_ATTR_DEVICE_TYPE: &str = "deviceType"; pub const OLD_ATTR_CODE_VERSION: &str = "codeVersion"; } // One time keys table, which need to exist in their own table to ensure // atomicity of additions and removals pub mod one_time_keys_table { pub const NAME: &str = "identity-one-time-keys"; pub const PARTITION_KEY: &str = "userID#deviceID#olmAccount"; pub const SORT_KEY: &str = "timestamp#keyNumber"; pub const ATTR_ONE_TIME_KEY: &str = "oneTimeKey"; } // Tokio pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054"; pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004"; pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); // Token pub const ACCESS_TOKEN_LENGTH: usize = 512; // Temporary config pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN"; pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY"; // Nonce pub const NONCE_LENGTH: usize = 17; pub const NONCE_TTL_DURATION: Duration = Duration::from_secs(120); // seconds // Device list pub const DEVICE_LIST_TIMESTAMP_VALID_FOR: Duration = Duration::from_secs(300); // Workflows in progress pub const WORKFLOWS_IN_PROGRESS_TTL_DURATION: Duration = Duration::from_secs(120); // LocalStack pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; // OPAQUE Server Setup pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP"; // Identity Search pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT"; pub const DEFAULT_OPENSEARCH_ENDPOINT: &str = "identity-search-domain.us-east-2.opensearch.localhost.localstack.cloud:4566"; pub const IDENTITY_SEARCH_INDEX: &str = "users"; pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20; // Log Error Types pub mod error_types { pub const GENERIC_DB_LOG: &str = "DB Error"; pub const OTK_DB_LOG: &str = "One-time Key DB Error"; pub const DEVICE_LIST_DB_LOG: &str = "Device List DB Error"; pub const TOKEN_DB_LOG: &str = "Token DB Error"; pub const FARCASTER_DB_LOG: &str = "Farcaster DB Error"; pub const SYNC_LOG: &str = "Sync Error"; pub const SEARCH_LOG: &str = "Search Error"; pub const SIWE_LOG: &str = "SIWE Error"; pub const GRPC_SERVICES_LOG: &str = "gRPC Services Error"; pub const TUNNELBROKER_LOG: &str = "Tunnelbroker Error"; pub const HTTP_LOG: &str = "HTTP Error"; } // Tonic Status Messages pub mod tonic_status_messages { pub const UNEXPECTED_MESSAGE_DATA: &str = "unexpected_message_data"; pub const SIGNATURE_INVALID: &str = "signature_invalid"; pub const MALFORMED_KEY: &str = "malformed_key"; pub const VERIFICATION_FAILED: &str = "verification_failed"; pub const MALFORMED_PAYLOAD: &str = "malformed_payload"; pub const INVALID_DEVICE_LIST_PAYLOAD: &str = "invalid_device_list_payload"; pub const USERNAME_ALREADY_EXISTS: &str = "username_already_exists"; pub const USERNAME_RESERVED: &str = "username_reserved"; pub const WALLET_ADDRESS_TAKEN: &str = "wallet_address_taken"; pub const WALLET_ADDRESS_NOT_RESERVED: &str = "wallet_address_not_reserved"; pub const DEVICE_ID_ALREADY_EXISTS: &str = "device_id_already_exists"; pub const USER_NOT_FOUND: &str = "user_not_found"; pub const INVALID_NONCE: &str = "invalid_nonce"; pub const NONCE_EXPIRED: &str = "nonce_expired"; pub const FID_TAKEN: &str = "fid_taken"; pub const CANNOT_LINK_FID: &str = "cannot_link_fid"; pub const INVALID_PLATFORM_METADATA: &str = "invalid_platform_metadata"; pub const MISSING_CREDENTIALS: &str = "missing_credentials"; pub const BAD_CREDENTIALS: &str = "bad_credentials"; pub const SESSION_NOT_FOUND: &str = "session_not_found"; pub const INVALID_TIMESTAMP: &str = "invalid_timestamp"; pub const INVALID_USERNAME: &str = "invalid_username"; pub const USERNAME_NOT_RESERVED: &str = "username_not_reserved"; pub const NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME: &str = "need_keyserver_message_to_claim_username"; pub const UNEXPECTED_INITIAL_DEVICE_LIST: &str = "unexpected_initial_device_list"; pub const DEVICE_LIST_ERROR: &str = "device_list_error"; pub const DEVICE_NOT_IN_DEVICE_LIST: &str = "device_not_in_device_list"; pub const NO_IDENTIFIER_PROVIDED: &str = "no_identifier_provided"; pub const USER_ALREADY_HAS_KEYSERVER: &str = "user_already_has_keyserver"; pub const RETRY: &str = "retry"; pub const INVALID_DEVICE_LIST_UPDATE: &str = "invalid_device_list_update"; pub const INVALID_DEVICE_LIST_SIGNATURE: &str = "invalid_device_list_signature"; pub const UNEXPECTED_ERROR: &str = "unexpected_error"; pub const NO_DEVICE_LIST: &str = "no_device_list"; pub const USER_ID_MISSING: &str = "user_id_missing"; pub const DEVICE_ID_MISSING: &str = "device_id_missing"; pub const MISSING_CONTENT_KEYS: &str = "missing_content_keys"; pub const MISSING_NOTIF_KEYS: &str = "missing_notif_keys"; pub const KEYSERVER_NOT_FOUND: &str = "keyserver_not_found"; pub const PASSWORD_USER: &str = "password_user"; pub const WALLET_USER: &str = "wallet_user"; pub const INVALID_MESSAGE: &str = "invalid_message"; pub const INVALID_MESSAGE_FORMAT: &str = "invalid_message_format"; pub const MISSING_PLATFORM_OR_CODE_VERSION_METADATA: &str = "missing_platform_or_code_version_metadata"; pub const MISSING_KEY: &str = "missing_key"; pub const MESSAGE_NOT_AUTHENTICATED: &str = "message_not_authenticated"; pub const RETRY_FROM_NATIVE: &str = "retry_from_native"; } // Tunnelbroker pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT"; pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051"; // X3DH key management // Threshold for requesting more one_time keys pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5; // Number of keys to be refreshed when below the threshold pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5; // Minimum supported code versions pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270; // Request metadata pub mod request_metadata { pub const CODE_VERSION: &str = "code_version"; pub const STATE_VERSION: &str = "state_version"; pub const MAJOR_DESKTOP_VERSION: &str = "major_desktop_version"; pub const DEVICE_TYPE: &str = "device_type"; pub const USER_ID: &str = "user_id"; pub const DEVICE_ID: &str = "device_id"; pub const ACCESS_TOKEN: &str = "access_token"; } // CORS pub mod cors { use std::time::Duration; pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] = ["grpc-status", "grpc-message", "grpc-status-details-bin"]; pub const DEFAULT_ALLOW_HEADERS: [&str; 11] = [ "x-grpc-web", "content-type", "x-user-agent", "grpc-timeout", super::request_metadata::CODE_VERSION, super::request_metadata::STATE_VERSION, super::request_metadata::MAJOR_DESKTOP_VERSION, super::request_metadata::DEVICE_TYPE, super::request_metadata::USER_ID, super::request_metadata::DEVICE_ID, super::request_metadata::ACCESS_TOKEN, ]; pub const ALLOW_ORIGIN_LIST: &str = "ALLOW_ORIGIN_LIST"; pub const PROD_ORIGIN_HOST_STR: &str = "web.comm.app"; } // Tracing pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; // Regex pub const VALID_USERNAME_REGEX_STRING: &str = r"^[a-zA-Z0-9][a-zA-Z0-9-_]{0,190}$"; // Retry // TODO: Replace this with `ExponentialBackoffConfig` from `comm-lib` pub mod retry { pub const MAX_ATTEMPTS: usize = 8; pub const CONDITIONAL_CHECK_FAILED: &str = "ConditionalCheckFailed"; pub const TRANSACTION_CONFLICT: &str = "TransactionConflict"; } // One-time keys pub const ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT: usize = 24; pub const ONE_TIME_KEY_SIZE: usize = 43; // as defined in olm pub const MAX_ONE_TIME_KEYS: usize = 100; // as defined in olm diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index b6e365380..06c9d116a 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1289 +1,1379 @@ use comm_lib::aws::ddb::{ operation::{ delete_item::DeleteItemOutput, get_item::GetItemOutput, put_item::PutItemOutput, query::QueryOutput, }, primitives::Blob, types::{ AttributeValue, Delete, Put, PutRequest, TransactWriteItem, WriteRequest, }, }; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; pub use crate::database::device_list::DeviceIDAttribute; pub use crate::database::one_time_keys::OTKRow; use crate::{ - constants::{error_types, USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME}, - ddb_utils::EthereumIdentity, - device_list::SignedDeviceList, - grpc_services::shared::PlatformMetadata, - reserved_users::UserDetail, + ddb_utils::EthereumIdentity, device_list::SignedDeviceList, + grpc_services::shared::PlatformMetadata, reserved_users::UserDetail, siwe::SocialProof, }; use crate::{ ddb_utils::{DBIdentity, OlmAccountType}, grpc_services::protos, }; use crate::{error::Error, grpc_utils::DeviceKeysInfo}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; use crate::constants::{ - NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, + error_types, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, + RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, + RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, - USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, - USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, + USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, + USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, + USERS_TABLE_USERNAME_LOWER_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::AuthType; pub use grpc_clients::identity::DeviceType; mod device_list; mod farcaster; mod one_time_keys; mod token; mod workflows; pub use device_list::{ DeviceListRow, DeviceListUpdate, DeviceRow, PlatformDetails, }; use self::device_list::Prekey; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } pub struct DBDeviceTypeInt(pub i32); impl TryFrom for DeviceType { type Error = crate::error::Error; fn try_from(value: DBDeviceTypeInt) -> Result { let device_result = DeviceType::try_from(value.0); device_result.map_err(|_| { Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), attribute_value: Some(AttributeValue::N(value.0.to_string())).into(), attribute_error: DBItemAttributeError::InvalidValue, }) }) } } pub struct OutboundKeys { pub key_payload: String, pub key_payload_signature: String, pub content_prekey: Prekey, pub notif_prekey: Prekey, pub content_one_time_key: Option, pub notif_one_time_key: Option, } impl From for protos::auth::OutboundKeyInfo { fn from(db_keys: OutboundKeys) -> Self { use protos::unauth::IdentityKeyInfo; Self { identity_info: Some(IdentityKeyInfo { payload: db_keys.key_payload, payload_signature: db_keys.key_payload_signature, }), content_prekey: Some(db_keys.content_prekey.into()), notif_prekey: Some(db_keys.notif_prekey.into()), one_time_content_prekey: db_keys.content_one_time_key, one_time_notif_prekey: db_keys.notif_one_time_key, } } } +pub struct UserInfoAndPasswordFile { + pub user_id: String, + pub original_username: String, + pub password_file: Vec, +} + #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { let client = match &CONFIG.localstack_endpoint { Some(endpoint) => { info!( "Configuring DynamoDB client to use LocalStack endpoint: {}", endpoint ); let ddb_config_builder = comm_lib::aws::ddb::config::Builder::from(aws_config) .endpoint_url(endpoint); DynamoDBClient::from_conf(ddb_config_builder.build()) } None => DynamoDBClient::new(aws_config), }; DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, platform_details: PlatformMetadata, access_token_creation_time: DateTime, ) -> Result { let device_key_upload = registration_state.flattened_device_key_upload; let user_id = self .add_user_to_users_table( Some((registration_state.username, Blob::new(password_file))), None, registration_state.user_id, registration_state.farcaster_id, ) .await?; // When initial device list is present, we should apply it // instead of auto-creating one. if let Some(device_list) = registration_state.initial_device_list { let initial_device_list = DeviceListUpdate::try_from(device_list)?; self .register_primary_device( &user_id, device_key_upload.clone(), platform_details, access_token_creation_time, initial_device_list, ) .await?; } else { self .add_device( &user_id, device_key_upload.clone(), platform_details, access_token_creation_time, ) .await?; } self .append_one_time_prekeys( &user_id, &device_key_upload.device_id_key, &device_key_upload.content_one_time_keys, &device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } #[allow(clippy::too_many_arguments)] pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: SocialProof, user_id: Option, platform_metadata: PlatformMetadata, access_token_creation_time: DateTime, farcaster_id: Option, initial_device_list: Option, ) -> Result { let wallet_identity = EthereumIdentity { wallet_address: wallet_address.clone(), social_proof, }; let user_id = self .add_user_to_users_table( None, Some(wallet_identity), user_id, farcaster_id, ) .await?; // When initial device list is present, we should apply it // instead of auto-creating one. if let Some(device_list) = initial_device_list { let initial_device_list = DeviceListUpdate::try_from(device_list)?; self .register_primary_device( &user_id, flattened_device_key_upload.clone(), platform_metadata, access_token_creation_time, initial_device_list, ) .await?; } else { self .add_device( &user_id, flattened_device_key_upload.clone(), platform_metadata, access_token_creation_time, ) .await?; } self .append_one_time_prekeys( &user_id, &flattened_device_key_upload.device_id_key, &flattened_device_key_upload.content_one_time_keys, &flattened_device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } async fn add_user_to_users_table( &self, username_and_password_file: Option<(String, Blob)>, wallet_identity: Option, user_id: Option, farcaster_id: Option, ) -> Result { let user_id = user_id.unwrap_or_else(generate_uuid); let mut user = HashMap::from([( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), )]); if let Some((username, password_file)) = username_and_password_file.clone() { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), - AttributeValue::S(username), + AttributeValue::S(username.clone()), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); + user.insert( + USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME.to_string(), + AttributeValue::S(username.to_lowercase()), + ); } if let Some(eth_identity) = wallet_identity.clone() { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(eth_identity.wallet_address), ); user.insert( USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), eth_identity.social_proof.into(), ); } if let Some(fid) = farcaster_id { user.insert( USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME.to_string(), AttributeValue::S(fid), ); } let put_user = Put::builder() .table_name(USERS_TABLE) .set_item(Some(user)) // make sure we don't accidentally overwrite existing row .condition_expression("attribute_not_exists(#pk)") .expression_attribute_names("#pk", USERS_TABLE_PARTITION_KEY) .build(); let put_user_operation = TransactWriteItem::builder().put(put_user).build(); let partition_key_value = match (username_and_password_file, wallet_identity) { (Some((username, _)), _) => username, (_, Some(ethereum_identity)) => ethereum_identity.wallet_address, _ => return Err(Error::MalformedItem), }; // We make sure to delete the user from the reserved usernames table when we // add them to the users table let delete_user_from_reserved_usernames = Delete::builder() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(partition_key_value), ) .build(); let delete_user_from_reserved_usernames_operation = TransactWriteItem::builder() .delete(delete_user_from_reserved_usernames) .build(); self .client .transact_write_items() .set_transact_items(Some(vec![ put_user_operation, delete_user_from_reserved_usernames_operation, ])) .send() .await .map_err(|e| { error!( errorType = error_types::GENERIC_DB_LOG, "Add user transaction failed: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(user_id) } pub async fn add_user_device( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, access_token_creation_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = flattened_device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_one_time_keys.clone(); // add device to the device list if not exists let device_id = flattened_device_key_upload.device_id_key.clone(); let device_exists = self .device_exists(user_id.clone(), device_id.clone()) .await?; if device_exists { self .update_device_login_time( user_id.clone(), device_id, access_token_creation_time, ) .await?; return Ok(()); } // add device to the new device list self .add_device( &user_id, flattened_device_key_upload, platform_metadata, access_token_creation_time, ) .await?; self .append_one_time_prekeys( &user_id, &device_id, &content_one_time_keys, ¬if_one_time_keys, ) .await?; Ok(()) } pub async fn get_keyserver_keys_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device = user_devices .into_iter() .find(|device| *device.device_type() == GrpcDeviceType::Keyserver); let Some(keyserver) = maybe_keyserver_device else { return Ok(None); }; debug!( "Found keyserver in devices table (ID={})", &keyserver.device_id ); let (notif_one_time_key, requested_more_keys) = self .get_one_time_key( user_id, &keyserver.device_id, OlmAccountType::Notification, true, ) .await?; let (content_one_time_key, _) = self .get_one_time_key( user_id, &keyserver.device_id, OlmAccountType::Content, !requested_more_keys, ) .await?; debug!( "Able to get notif one-time key for keyserver {}: {}", &keyserver.device_id, notif_one_time_key.is_some() ); debug!( "Able to get content one-time key for keyserver {}: {}", &keyserver.device_id, content_one_time_key.is_some() ); let outbound_payload = OutboundKeys { key_payload: keyserver.device_key_info.key_payload, key_payload_signature: keyserver.device_key_info.key_payload_signature, content_prekey: keyserver.content_prekey, notif_prekey: keyserver.notif_prekey, content_one_time_key, notif_one_time_key, }; Ok(Some(outbound_payload)) } pub async fn get_keyserver_device_id_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device_id = user_devices .into_iter() .find(|device| *device.device_type() == GrpcDeviceType::Keyserver) .map(|device| device.device_id); Ok(maybe_keyserver_device_id) } pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } #[tracing::instrument(skip_all)] pub async fn delete_user( &self, user_id: String, ) -> Result { // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self.delete_otks_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user's devices"); self.delete_devices_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user's access tokens"); self.delete_all_tokens_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user"); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to delete user {}", user_id ); Err(Error::AwsSdk(e.into())) } } } pub async fn wallet_address_taken( &self, wallet_address: String, ) -> Result { let result = self .get_user_id_from_user_info(wallet_address, &AuthType::Wallet) .await?; Ok(result.is_some()) } pub async fn username_taken(&self, username: String) -> Result { - let result = self - .get_user_id_from_user_info(username, &AuthType::Password) - .await?; + let username_lower = username.to_lowercase(); - Ok(result.is_some()) + let request = self + .client + .query() + .table_name(USERS_TABLE) + .index_name(USERS_TABLE_USERNAME_LOWER_INDEX) + .key_condition_expression("#username_lower = :username_lower") + .expression_attribute_names( + "#username_lower", + USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, + ) + .expression_attribute_values( + ":username_lower", + AttributeValue::S(username_lower), + ); + + let response = request.send().await.map_err(|e| { + error!( + errorType = error_types::GENERIC_DB_LOG, + "Failed to query lowercase usernames by index: {:?}", e + ); + Error::AwsSdk(e.into()) + })?; + + if let Some(items) = response.items() { + if !items.is_empty() { + return Ok(true); + } + } + + Ok(false) } pub async fn filter_out_taken_usernames( &self, user_details: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; - let db_usernames_set: HashSet = db_usernames.into_iter().collect(); + let db_usernames_set: HashSet = db_usernames + .into_iter() + .map(|username| username.to_lowercase()) + .collect(); let available_user_details: Vec = user_details .into_iter() - .filter(|user_detail| !db_usernames_set.contains(&user_detail.username)) + .filter(|user_detail| { + !db_usernames_set.contains(&user_detail.username.to_lowercase()) + }) .collect(); Ok(available_user_details) } #[tracing::instrument(skip_all)] async fn get_user_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result>, Error> { - let (index, attribute_name) = match auth_type { - AuthType::Password => { - (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) - } + let (index, attribute_name, attribute_value) = match auth_type { + AuthType::Password => ( + USERS_TABLE_USERNAME_LOWER_INDEX, + USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, + user_info.to_lowercase(), + ), AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, + user_info.clone(), ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) - .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) + .expression_attribute_values(":u", AttributeValue::S(attribute_value)) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: first_item .get(USERS_TABLE_PARTITION_KEY) .cloned() .into(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_keys_for_user( &self, user_id: &str, get_one_time_keys: bool, ) -> Result, Error> { let mut devices_response = self.get_keys_for_user_devices(user_id).await?; if devices_response.is_empty() { debug!("No devices found for user {}", user_id); return Ok(None); } if get_one_time_keys { for (device_id_key, device_keys) in devices_response.iter_mut() { let requested_more_keys; (device_keys.notif_one_time_key, requested_more_keys) = self .get_one_time_key( user_id, device_id_key, OlmAccountType::Notification, true, ) .await?; (device_keys.content_one_time_key, _) = self .get_one_time_key( user_id, device_id_key, OlmAccountType::Content, !requested_more_keys, ) .await?; } } Ok(Some(devices_response)) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => user .take_attr(USERS_TABLE_PARTITION_KEY) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } #[tracing::instrument(skip_all)] - pub async fn get_user_id_and_password_file_from_username( + pub async fn get_user_info_and_password_file_from_username( &self, username: &str, - ) -> Result)>, Error> { + ) -> Result, Error> { match self .get_user_from_user_info(username.to_string(), &AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = user.take_attr(USERS_TABLE_PARTITION_KEY)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; + let original_username = + user.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE)?; - Ok(Some((user_id, password_file))) + Ok(Some(UserInfoAndPasswordFile { + user_id, + original_username, + password_file, + })) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_username_and_password_file( &self, user_id: &str, ) -> Result)>, Error> { let Some(mut user) = self.get_item_from_users_table(user_id).await?.item else { return Ok(None); }; let username = user.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((username, password_file))) } /// Returns an error if `user_id` does not exist in users table pub async fn user_is_password_authenticated( &self, user_id: &str, ) -> Result { let Some(user_item) = self.get_item_from_users_table(user_id).await?.item else { error!(errorType = error_types::GENERIC_DB_LOG, "user not found"); return Err(Error::MissingItem); }; Ok(user_item.contains_key(USERS_TABLE_REGISTRATION_ATTRIBUTE)) } async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn find_db_user_identities( &self, user_ids: impl IntoIterator, ) -> Result, Error> { use comm_lib::database::batch_operations::{ batch_get, ExponentialBackoffConfig, }; let primary_keys = user_ids.into_iter().map(|user_id| { create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id, )) }); let projection_expression = [ USERS_TABLE_PARTITION_KEY, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, ] .join(", "); debug!( num_requests = primary_keys.size_hint().0, "Attempting to batch get user identifiers" ); let responses = batch_get( &self.client, USERS_TABLE, primary_keys, Some(projection_expression), ExponentialBackoffConfig::default(), ) .await .map_err(Error::from)?; debug!("Found {} matching user identifiers in DDB", responses.len()); let mut results = HashMap::with_capacity(responses.len()); for response in responses { let user_id = response.get_attr(USERS_TABLE_PARTITION_KEY)?; // if this fails, it means that projection expression didnt have all attrs it needed let identity = DBIdentity::try_from(response)?; results.insert(user_id, identity); } Ok(results) } /// Retrieves username for password users or wallet address for wallet users /// Returns `None` if user not found #[tracing::instrument(skip_all)] pub async fn get_user_identity( &self, user_id: &str, ) -> Result, Error> { self .get_item_from_users_table(user_id) .await? .item .map(DBIdentity::try_from) .transpose() .map_err(|e| { error!( user_id, errorType = error_types::GENERIC_DB_LOG, "Database item is missing an identifier" ); e }) } async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let Ok(username) = attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE) { result.push(username); } } } Ok(result) } pub async fn get_all_user_details(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(format!( "{USERS_TABLE_USERNAME_ATTRIBUTE}, {USERS_TABLE_PARTITION_KEY}" )) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let (Ok(username), Ok(user_id)) = ( attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), attribute.take_attr(USERS_TABLE_PARTITION_KEY), ) { result.push(UserDetail { username, user_id }); } } } Ok(result) } pub async fn get_all_reserved_user_details( &self, ) -> Result, Error> { let scan_output = self .client .scan() .table_name(RESERVED_USERNAMES_TABLE) .projection_expression(format!( "{RESERVED_USERNAMES_TABLE_PARTITION_KEY},\ {RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE}" )) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let (Ok(username), Ok(user_id)) = ( attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), attribute.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE), ) { result.push(UserDetail { username, user_id }); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = item.take_attr(NONCE_TABLE_PARTITION_KEY)?; let created = DateTime::::try_from_attr( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(NONCE_TABLE_CREATED_ATTRIBUTE), )?; let expiration_time = DateTime::::try_from_attr( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, user_details: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for user_chunk in user_details.chunks(25) { let write_requests = user_chunk .iter() .map(|user_detail| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(user_detail.username.to_string()), ) .item( RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, AttributeValue::S(user_detail.user_id.to_string()), ) + .item( + RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, + AttributeValue::S(user_detail.username.to_lowercase()), + ) .build(); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } #[tracing::instrument(skip_all)] pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!(errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn get_user_id_from_reserved_usernames_table( &self, username: &str, ) -> Result, Error> { + self + .query_reserved_usernames_table( + username, + RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, + ) + .await + } + + pub async fn get_original_username_from_reserved_usernames_table( + &self, + username: &str, + ) -> Result, Error> { + self + .query_reserved_usernames_table( + username, + RESERVED_USERNAMES_TABLE_PARTITION_KEY, + ) + .await + } + + async fn query_reserved_usernames_table( + &self, + username: &str, + attribute: &str, + ) -> Result, Error> { + let username_lower = username.to_lowercase(); + let response = self .client - .get_item() + .query() .table_name(RESERVED_USERNAMES_TABLE) - .key( - RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(), - AttributeValue::S(username.to_string()), + .index_name(RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX) + .key_condition_expression("#username_lower = :username_lower") + .expression_attribute_names( + "#username_lower", + RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, + ) + .expression_attribute_values( + ":username_lower", + AttributeValue::S(username_lower), ) - .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; - let GetItemOutput { - item: Some(mut item), + let QueryOutput { + items: Some(mut results), .. } = response else { return Ok(None); }; - // We should not return `Ok(None)` if `userID` is missing from the item - let user_id = item.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE)?; - Ok(Some(user_id)) + let result = results + .pop() + .map(|mut attrs| attrs.take_attr::(attribute)) + .transpose()?; + + Ok(result) } } type AttributeName = String; type Devices = HashMap; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } #[deprecated(note = "Use `comm_lib` counterpart instead")] #[allow(dead_code)] fn parse_map_attribute( attribute_name: &str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", errorType = error_types::GENERIC_DB_LOG, "Unexpected attribute type when parsing map attribute" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", errorType = error_types::GENERIC_DB_LOG, "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::Missing, )) } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } #[test] fn test_int_to_device_type() { let valid_result = DeviceType::try_from(3); assert!(valid_result.is_ok()); assert_eq!(valid_result.unwrap(), DeviceType::Android); let invalid_result = DeviceType::try_from(6); assert!(invalid_result.is_err()); } } diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf index 5c8a8a99d..c46957cc1 100644 --- a/services/terraform/modules/shared/dynamodb.tf +++ b/services/terraform/modules/shared/dynamodb.tf @@ -1,318 +1,341 @@ resource "aws_dynamodb_table" "backup-service-backup" { name = "backup-service-backup" hash_key = "userID" range_key = "backupID" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } attribute { name = "backupID" type = "S" } attribute { name = "created" type = "S" } global_secondary_index { name = "userID-created-index" hash_key = "userID" range_key = "created" projection_type = "INCLUDE" non_key_attributes = ["userKeys", "siweBackupMsg"] } } resource "aws_dynamodb_table" "backup-service-log" { name = "backup-service-log" hash_key = "backupID" range_key = "logID" billing_mode = "PAY_PER_REQUEST" attribute { name = "backupID" type = "S" } attribute { name = "logID" type = "N" } } resource "aws_dynamodb_table" "blob-service-blobs" { name = "blob-service-blobs" hash_key = "blob_hash" range_key = "holder" billing_mode = "PAY_PER_REQUEST" attribute { name = "blob_hash" type = "S" } attribute { name = "holder" type = "S" } attribute { name = "last_modified" type = "N" } attribute { name = "unchecked" type = "S" } global_secondary_index { name = "unchecked-index" hash_key = "unchecked" range_key = "last_modified" projection_type = "KEYS_ONLY" } } resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" { name = "tunnelbroker-undelivered-messages" hash_key = "deviceID" range_key = "messageID" billing_mode = "PAY_PER_REQUEST" attribute { name = "deviceID" type = "S" } attribute { name = "messageID" type = "S" } } resource "aws_dynamodb_table" "tunnelbroker-device-tokens" { name = "tunnelbroker-device-tokens" hash_key = "deviceID" billing_mode = "PAY_PER_REQUEST" attribute { name = "deviceID" type = "S" } attribute { name = "deviceToken" type = "S" } global_secondary_index { name = "deviceToken-index" hash_key = "deviceToken" projection_type = "KEYS_ONLY" } } resource "aws_dynamodb_table" "identity-users" { name = "identity-users" hash_key = "userID" billing_mode = "PAY_PER_REQUEST" stream_enabled = true stream_view_type = "NEW_AND_OLD_IMAGES" attribute { name = "userID" type = "S" } attribute { name = "username" type = "S" } attribute { name = "walletAddress" type = "S" } attribute { name = "farcasterID" type = "S" } + attribute { + name = "usernameLower" + type = "S" + } + global_secondary_index { name = "username-index" hash_key = "username" projection_type = "KEYS_ONLY" } global_secondary_index { name = "walletAddress-index" hash_key = "walletAddress" projection_type = "KEYS_ONLY" } global_secondary_index { name = "farcasterID-index" hash_key = "farcasterID" projection_type = "INCLUDE" non_key_attributes = ["walletAddress", "username"] } + + global_secondary_index { + name = "usernameLower-index" + hash_key = "usernameLower" + projection_type = "KEYS_ONLY" + } } resource "aws_dynamodb_table" "identity-devices" { name = "identity-devices" hash_key = "userID" range_key = "itemID" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } # this is either device ID or device list datetime attribute { name = "itemID" type = "S" } # only for sorting device lists attribute { name = "timestamp" type = "N" } # used for deviceID-index attribute { name = "loginTime" type = "S" } # sparse index allowing to sort device list updates by timestamp local_secondary_index { name = "deviceList-timestamp-index" range_key = "timestamp" projection_type = "ALL" } # sort key used for sparse indexing (present only for devices) global_secondary_index { name = "deviceID-index" hash_key = "itemID" range_key = "loginTime" projection_type = "KEYS_ONLY" } } resource "aws_dynamodb_table" "identity-tokens" { name = "identity-tokens" hash_key = "userID" range_key = "signingPublicKey" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } attribute { name = "signingPublicKey" type = "S" } } resource "aws_dynamodb_table" "identity-nonces" { name = "identity-nonces" hash_key = "nonce" billing_mode = "PAY_PER_REQUEST" attribute { name = "nonce" type = "S" } ttl { attribute_name = "expirationTimeUnix" enabled = true } } resource "aws_dynamodb_table" "identity-workflows-in-progress" { name = "identity-workflows-in-progress" hash_key = "id" billing_mode = "PAY_PER_REQUEST" attribute { name = "id" type = "S" } ttl { attribute_name = "expirationTimeUnix" enabled = true } } resource "aws_dynamodb_table" "identity-reserved-usernames" { name = "identity-reserved-usernames" hash_key = "username" billing_mode = "PAY_PER_REQUEST" stream_enabled = true stream_view_type = "NEW_AND_OLD_IMAGES" attribute { name = "username" type = "S" } + + attribute { + name = "usernameLower" + type = "S" + } + + global_secondary_index { + name = "usernameLower-index" + hash_key = "usernameLower" + projection_type = "INCLUDE" + non_key_attributes = ["userID"] + } } resource "aws_dynamodb_table" "identity-one-time-keys" { name = "identity-one-time-keys" hash_key = "userID#deviceID#olmAccount" range_key = "timestamp#keyNumber" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID#deviceID#olmAccount" type = "S" } attribute { name = "timestamp#keyNumber" type = "S" } } resource "aws_dynamodb_table" "feature-flags" { name = "feature-flags" hash_key = "platform" range_key = "feature" billing_mode = "PAY_PER_REQUEST" attribute { name = "platform" type = "S" } attribute { name = "feature" type = "S" } } resource "aws_dynamodb_table" "reports-service-reports" { name = "reports-service-reports" hash_key = "reportID" billing_mode = "PAY_PER_REQUEST" attribute { name = "reportID" type = "S" } }