diff --git a/services/identity/src/client_service.rs b/services/identity/src/client_service.rs index ca33d547e..228c5626e 100644 --- a/services/identity/src/client_service.rs +++ b/services/identity/src/client_service.rs @@ -1,1348 +1,1348 @@ // Standard library imports use std::str::FromStr; // External crate imports use comm_lib::shared::reserved_users::RESERVED_USERNAME_SET; use comm_opaque2::grpc::protocol_error_to_grpc_status; use grpc_clients::identity::PlatformMetadata; use rand::rngs::OsRng; use serde::{Deserialize, Serialize}; use siwe::eip55; use tonic::Response; use tracing::{debug, error, info, warn}; +use crate::comm_service::tunnelbroker; // Workspace crate imports use crate::config::CONFIG; use crate::constants::{error_types, tonic_status_messages}; use crate::database::{ DBDeviceTypeInt, DatabaseClient, DeviceType, KeyPayload, UserInfoAndPasswordFile, }; use crate::ddb_utils::{Identifier, is_transaction_conflict}; use crate::device_list::SignedDeviceList; use crate::error::{DeviceListError, Error as DBError, consume_error}; use crate::grpc_services::authenticated::{DeletePasswordUserInfo, UpdatePasswordInfo}; use crate::grpc_services::protos::unauth::{ find_user_id_request, AddReservedUsernamesRequest, AuthResponse, Empty, ExistingDeviceLoginRequest, FindUserIdRequest, FindUserIdResponse, GenerateNonceResponse, OpaqueLoginFinishRequest, OpaqueLoginStartRequest, OpaqueLoginStartResponse, RegistrationFinishRequest, RegistrationStartRequest, RegistrationStartResponse, RemoveReservedUsernameRequest, ReservedRegistrationStartRequest, RestoreUserRequest, SecondaryDeviceKeysUploadRequest, VerifyUserAccessTokenRequest, VerifyUserAccessTokenResponse, WalletAuthRequest, GetFarcasterUsersRequest, GetFarcasterUsersResponse }; use crate::grpc_services::shared::get_platform_metadata; use crate::grpc_utils::{ DeviceKeyUploadActions, RegistrationActions, SignedNonce }; use crate::log::redact_sensitive_data; use crate::nonce::generate_nonce_data; use crate::reserved_users::{ validate_account_ownership_message_and_get_user_id, validate_add_reserved_usernames_message, validate_remove_reserved_username_message, }; use crate::siwe::{ is_valid_ethereum_address, parse_and_verify_siwe_message, SocialProof, }; use crate::token::{AccessTokenData, AuthType}; pub use crate::grpc_services::protos::unauth::identity_client_service_server::{ IdentityClientService, IdentityClientServiceServer, }; use crate::regex::is_valid_username; #[derive(Clone, Serialize, Deserialize)] pub enum WorkflowInProgress { Registration(Box), Login(Box), Update(Box), PasswordUserDeletion(Box), } #[derive(Clone, Serialize, Deserialize)] pub struct UserRegistrationInfo { pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub user_id: Option, pub farcaster_id: Option, pub initial_device_list: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct UserLoginInfo { pub user_id: String, pub username: String, pub flattened_device_key_upload: FlattenedDeviceKeyUpload, pub opaque_server_login: comm_opaque2::server::Login, pub device_to_remove: Option, } #[derive(Clone, Serialize, Deserialize)] pub struct FlattenedDeviceKeyUpload { pub device_id_key: String, pub key_payload: String, pub key_payload_signature: String, pub content_prekey: String, pub content_prekey_signature: String, pub content_one_time_keys: Vec, pub notif_prekey: String, pub notif_prekey_signature: String, pub notif_one_time_keys: Vec, pub device_type: DeviceType, } #[derive(derive_more::Constructor)] pub struct ClientService { client: DatabaseClient, } #[tonic::async_trait] impl IdentityClientService for ClientService { #[tracing::instrument(skip_all)] async fn register_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Received registration request for: {}", message.username); if !is_valid_username(&message.username) || is_valid_ethereum_address(&message.username) { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_USERNAME, )); } self.check_username_taken(&message.username).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&message.username) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } if RESERVED_USERNAME_SET.contains(&message.username.to_lowercase()) { return Err(tonic::Status::invalid_argument( tonic_status_messages::USERNAME_RESERVED, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let registration_state = construct_user_registration_info( &message, None, message.username.clone(), message.farcaster_id.clone(), )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_reserved_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); self.check_username_taken(&message.username).await?; let Some(original_username) = self .client .get_original_username_from_reserved_usernames_table(&message.username) .await? else { return Err(tonic::Status::permission_denied( tonic_status_messages::USERNAME_NOT_RESERVED, )); }; let user_id = validate_account_ownership_message_and_get_user_id( &message.username, &message.keyserver_message, &message.keyserver_signature, )?; let registration_state = construct_user_registration_info( &message, Some(user_id), original_username, None, )?; self .check_device_id_taken( ®istration_state.flattened_device_key_upload, None, ) .await?; let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, message.username.to_lowercase().as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let session_id = self .client .insert_workflow(WorkflowInProgress::Registration(Box::new( registration_state, ))) .await?; let response = RegistrationStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); if let Some(WorkflowInProgress::Registration(state)) = self.client.get_workflow(message.session_id).await? { let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; let login_time = chrono::Utc::now(); let device_id = state.flattened_device_key_upload.device_id_key.clone(); let username = state.username.clone(); let user_id = self .client .add_password_user_to_users_table( *state, password_file, platform_metadata, login_time, ) .await?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } else { Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )) } } #[tracing::instrument(skip_all)] async fn log_in_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Attempting to log in user: {:?}", &message.username); let user_id_and_password_file = self .client .get_user_info_and_password_file_from_username(&message.username) .await?; let UserInfoAndPasswordFile { user_id, original_username: username, password_file: password_file_bytes, } = if let Some(data) = user_id_and_password_file { data } else { // It's possible that the user attempting login is already registered // on Ashoat's keyserver. If they are, we should send back a gRPC status // code instructing them to get a signed message from Ashoat's keyserver // in order to claim their username and register with the Identity // service. let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&message.username) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::permission_denied( tonic_status_messages::NEED_KEYSERVER_MESSAGE_TO_CLAIM_USERNAME, )); } return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let maybe_device_to_remove = self .get_keyserver_device_to_remove( &user_id, &flattened_device_key_upload.device_id_key, message.force.unwrap_or(false), &flattened_device_key_upload.device_type, ) .await?; let mut server_login = comm_opaque2::server::Login::new(); let server_response = match server_login.start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, message.username.to_lowercase().as_bytes(), ) { Ok(response) => response, Err(_) => { // Retry with original username bytes if the first attempt fails server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)? } }; let login_state = construct_user_login_info( user_id, username, server_login, flattened_device_key_upload, maybe_device_to_remove, )?; let session_id = self .client .insert_workflow(WorkflowInProgress::Login(Box::new(login_state))) .await?; let response = Response::new(OpaqueLoginStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn log_in_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Login(state)) = self.client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; if let Some(device_to_remove) = state.device_to_remove { self .client .remove_device(state.user_id.clone(), device_to_remove) .await?; } let login_time = chrono::Utc::now(); self .client .add_user_device( state.user_id.clone(), state.flattened_device_key_upload.clone(), platform_metadata, login_time, ) .await?; // Create access token let token = AccessTokenData::with_created_time( state.user_id.clone(), state.flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id: state.user_id, access_token, username: state.username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); // WalletAuthRequest is used for both log_in_wallet_user and register_wallet_user if !message.initial_device_list.is_empty() { return Err(tonic::Status::invalid_argument( tonic_status_messages::UNEXPECTED_INITIAL_DEVICE_LIST, )); } let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, ) .await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let login_time = chrono::Utc::now(); let user_id = if let Some(user_id) = self .client .get_user_id_from_user_info(wallet_address.clone(), &AuthType::Wallet) .await? { self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_user_device( user_id.clone(), flattened_device_key_upload.clone(), platform_metadata, login_time, ) .await?; user_id } else { let Some(user_id) = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await? else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; // It's possible that the user attempting login is already registered // on Ashoat's keyserver. If they are, we should try to register them if // they're on a mobile device, otherwise we should send back a gRPC status // code instructing them to try logging in from a mobile device first. if platform_metadata.device_type.to_uppercase() != "ANDROID" && platform_metadata.device_type.to_uppercase() != "IOS" { return Err(tonic::Status::permission_denied( tonic_status_messages::RETRY_FROM_NATIVE, )); }; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, Some(user_id.clone()), platform_metadata, login_time, message.farcaster_id, None, ) .await?; user_id }; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn register_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let parsed_message = parse_and_verify_siwe_message( &message.siwe_message, &message.siwe_signature, ) .await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); self.check_wallet_address_taken(&wallet_address).await?; let username_in_reserved_usernames_table = self .client .get_user_id_from_reserved_usernames_table(&wallet_address) .await? .is_some(); if username_in_reserved_usernames_table { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } if let Some(fid) = &message.farcaster_id { self.check_farcaster_id_taken(fid).await?; } let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; self .check_device_id_taken(&flattened_device_key_upload, None) .await?; let login_time = chrono::Utc::now(); let initial_device_list = message.get_and_verify_initial_device_list()?; let social_proof = SocialProof::new(message.siwe_message, message.siwe_signature); let user_id = self .client .add_wallet_user_to_users_table( flattened_device_key_upload.clone(), wallet_address.clone(), social_proof, None, platform_metadata, login_time, message.farcaster_id, initial_device_list, ) .await?; // Create access token let token = AccessTokenData::with_created_time( user_id.clone(), flattened_device_key_upload.device_id_key, login_time, crate::token::AuthType::Wallet, &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username: wallet_address, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn restore_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); debug!( "Attempting to restore user: {}", redact_sensitive_data(&message.user_id) ); let user_identifier = self .client .get_user_identity(&message.user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })? .identifier; if matches!(user_identifier, Identifier::Username(_)) && (message.siwe_message.is_some() || message.siwe_signature.is_some()) { debug!("SIWE data present for password user!"); return Err(tonic::Status::invalid_argument( tonic_status_messages::PASSWORD_USER, )); } if let Identifier::WalletAddress(ref eth_identity) = &user_identifier { let (Some(siwe_message), Some(siwe_signature)) = (&message.siwe_message, &message.siwe_signature) else { debug!("SIWE data absent for wallet user!"); return Err(tonic::Status::invalid_argument( tonic_status_messages::WALLET_USER, )); }; let parsed_message = parse_and_verify_siwe_message(siwe_message, siwe_signature).await?; self.verify_and_remove_nonce(&parsed_message.nonce).await?; let wallet_address = eip55(&parsed_message.address); if wallet_address != eth_identity.wallet_address { debug!( "Wallet address mismatch: expected '{}' but got '{}' instead", ð_identity.wallet_address, &wallet_address ); return Err(tonic::Status::invalid_argument( tonic_status_messages::WALLET_ADDRESS_MISMATCH, )); } debug!("Updating social proof..."); let social_proof = SocialProof::new(siwe_message.to_string(), siwe_signature.to_string()); self .client .update_wallet_user_social_proof(&message.user_id, social_proof) .await?; } let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let access_token = self .restore_primary_device_and_get_csat( message.user_id.clone(), message.device_list, platform_metadata, flattened_device_key_upload, ) .await?; let response = AuthResponse { user_id: message.user_id, access_token, username: user_identifier.username().to_string(), }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn upload_keys_for_registered_device_and_log_in( &self, request: tonic::Request, ) -> Result, tonic::Status> { let platform_metadata = get_platform_metadata(&request)?; let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let flattened_device_key_upload = construct_flattened_device_key_upload(&message)?; let user_id = message.user_id; let device_id = flattened_device_key_upload.device_id_key.clone(); let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; self .check_device_id_taken(&flattened_device_key_upload, Some(&user_id)) .await?; let user_identity = self .client .get_user_identity(&user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(device_list) = self.client.get_current_device_list(&user_id).await? else { warn!("User {} does not have valid device list. Secondary device auth impossible.", redact_sensitive_data(&user_id)); return Err(tonic::Status::aborted( tonic_status_messages::DEVICE_LIST_ERROR, )); }; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; self .client .put_device_data( &user_id, flattened_device_key_upload, platform_metadata, login_time, ) .await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_in_existing_device( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let message = request.into_inner(); let challenge_response = SignedNonce::try_from(&message)?; let ExistingDeviceLoginRequest { user_id, device_id, .. } = message; let nonce = challenge_response.verify_and_get_nonce(&device_id)?; self.verify_and_remove_nonce(&nonce).await?; let (identity_response, device_list_response) = tokio::join!( self.client.get_user_identity(&user_id), self.client.get_current_device_list(&user_id) ); let user_identity = identity_response?.ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let device_list = device_list_response?.ok_or_else(|| { warn!( "User {} does not have a valid device list.", redact_sensitive_data(&user_id) ); tonic::Status::aborted(tonic_status_messages::DEVICE_LIST_ERROR) })?; if !device_list.device_ids.contains(&device_id) { return Err(tonic::Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } let login_time = chrono::Utc::now(); let identifier = user_identity.identifier; let username = identifier.username().to_string(); let token = AccessTokenData::with_created_time( user_id.clone(), device_id, login_time, identifier.into(), &mut OsRng, ); let access_token = token.access_token.clone(); self.client.put_access_token_data(token).await?; let response = AuthResponse { user_id, access_token, username, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn generate_nonce( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let nonce_data = generate_nonce_data(&mut OsRng); self .client .add_nonce_to_nonces_table(nonce_data.clone()) .await?; let response = GenerateNonceResponse { nonce: nonce_data.nonce, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn verify_user_access_token( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); debug!("Verifying device: {}", &message.device_id); let token_valid = self .client .verify_access_token( message.user_id, message.device_id.clone(), message.access_token, ) .await?; let response = Response::new(VerifyUserAccessTokenResponse { token_valid }); debug!( "device {} was verified: {}", &message.device_id, token_valid ); Ok(response) } #[tracing::instrument(skip_all)] async fn add_reserved_usernames( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_details = validate_add_reserved_usernames_message( &message.message, &message.signature, )?; let filtered_user_details = self.client.filter_out_taken_usernames(user_details).await?; self .client .add_usernames_to_reserved_usernames_table(filtered_user_details) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn remove_reserved_username( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let username = validate_remove_reserved_username_message( &message.message, &message.signature, )?; self .client .delete_username_from_reserved_usernames_table(username) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn ping( &self, _request: tonic::Request, ) -> Result, tonic::Status> { let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn find_user_id( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); use find_user_id_request::Identifier; let (user_ident, auth_type) = match message.identifier { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::NO_IDENTIFIER_PROVIDED, )) } Some(Identifier::Username(username)) => (username, AuthType::Password), Some(Identifier::WalletAddress(address)) => (address, AuthType::Wallet), }; let (get_user_id_from_reserved_usernames_table_result, user_id_result) = tokio::join!( self .client .get_user_id_from_reserved_usernames_table(&user_ident), self .client .get_user_id_from_user_info(user_ident.clone(), &auth_type), ); let is_reserved = get_user_id_from_reserved_usernames_table_result?.is_some(); let user_id = user_id_result?; Ok(Response::new(FindUserIdResponse { user_id, is_reserved, })) } #[tracing::instrument(skip_all)] async fn get_farcaster_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let farcaster_users = self .client .get_farcaster_users(message.farcaster_ids) .await? .into_iter() .map(|d| d.0) .collect(); Ok(Response::new(GetFarcasterUsersResponse { farcaster_users })) } } impl ClientService { async fn check_username_taken( &self, username: &str, ) -> Result<(), tonic::Status> { let username_taken = self.client.username_taken(username.to_string()).await?; if username_taken { return Err(tonic::Status::already_exists( tonic_status_messages::USERNAME_ALREADY_EXISTS, )); } Ok(()) } async fn check_wallet_address_taken( &self, wallet_address: &str, ) -> Result<(), tonic::Status> { let wallet_address_taken = self .client .wallet_address_taken(wallet_address.to_string()) .await?; if wallet_address_taken { return Err(tonic::Status::already_exists( tonic_status_messages::WALLET_ADDRESS_TAKEN, )); } Ok(()) } async fn check_farcaster_id_taken( &self, farcaster_id: &str, ) -> Result<(), tonic::Status> { let fid_already_registered = !self .client .get_farcaster_users(vec![farcaster_id.to_string()]) .await? .is_empty(); if fid_already_registered { return Err(tonic::Status::already_exists( tonic_status_messages::FID_TAKEN, )); } Ok(()) } async fn check_device_id_taken( &self, key_upload: &FlattenedDeviceKeyUpload, requesting_user_id: Option<&str>, ) -> Result<(), tonic::Status> { let device_id = key_upload.device_id_key.as_str(); let Some(existing_device_user_id) = self.client.find_user_id_for_device(device_id).await? else { // device ID doesn't exist return Ok(()); }; // allow already-existing device ID for the same user match requesting_user_id { Some(user_id) if user_id == existing_device_user_id => { debug!( "Found already-existing device {} for user {}", device_id, user_id ); Ok(()) } _ => { warn!("Device ID already exists: {device_id}"); Err(tonic::Status::already_exists( tonic_status_messages::DEVICE_ID_ALREADY_EXISTS, )) } } } async fn verify_and_remove_nonce( &self, nonce: &str, ) -> Result<(), tonic::Status> { match self.client.get_nonce_from_nonces_table(nonce).await? { None => { return Err(tonic::Status::invalid_argument( tonic_status_messages::INVALID_NONCE, )) } Some(nonce) if nonce.is_expired() => { // we don't need to remove the nonce from the table here // because the DynamoDB TTL will take care of it return Err(tonic::Status::aborted( tonic_status_messages::NONCE_EXPIRED, )); } Some(nonce_data) => { self .client .remove_nonce_from_nonces_table(&nonce_data.nonce) .await? } }; Ok(()) } async fn get_keyserver_device_to_remove( &self, user_id: &str, new_keyserver_device_id: &str, force: bool, device_type: &DeviceType, ) -> Result, tonic::Status> { if device_type != &DeviceType::Keyserver { return Ok(None); } let maybe_keyserver_device_id = self .client .get_keyserver_device_id_for_user(user_id) .await?; let Some(existing_keyserver_device_id) = maybe_keyserver_device_id else { return Ok(None); }; if new_keyserver_device_id == existing_keyserver_device_id { return Ok(None); } if force { info!( "keyserver {} will be removed from the device list", existing_keyserver_device_id ); Ok(Some(existing_keyserver_device_id)) } else { Err(tonic::Status::already_exists( tonic_status_messages::USER_ALREADY_HAS_KEYSERVER, )) } } /// This function is used in Backup Restore protocol. It: /// - Verifies singleton device list with both old and new primary signature /// - Performs device list update (updates with singleton payload) /// - Removes all CSATs, OTKs, Devices data for all devices /// - Closes Tunnelbroker connections with these devices /// - Registers the new primary device (Device Key Upload) /// - Issues a new CSAT for the new primary device and returns it async fn restore_primary_device_and_get_csat( &self, user_id: String, device_list_payload: String, platform_metadata: PlatformMetadata, device_key_upload: FlattenedDeviceKeyUpload, ) -> Result { debug!("Verifying device list..."); let new_primary_device_id = device_key_upload.device_id_key.clone(); let previous_primary_device_id = self .client .get_current_device_list(&user_id) .await? .and_then(|device_list| device_list.primary_device_id().cloned()) .ok_or_else(|| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User had missing or empty device list (before backup restore)!" ); tonic::Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, ) })?; // Verify device list payload let signed_list: SignedDeviceList = device_list_payload.parse()?; let device_list_payload = crate::database::DeviceListUpdate::try_from(signed_list)?; crate::device_list::verify_singleton_device_list( &device_list_payload, &new_primary_device_id, Some(&previous_primary_device_id), )?; debug!(user_id, "Attempting to revoke user's old access tokens"); self.client.delete_all_tokens_for_user(&user_id).await?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's old one-time keys"); self .client .delete_otks_table_rows_for_user(&user_id) .await?; debug!(user_id, "Attempting to delete user's old devices"); let old_device_ids = self.client.delete_devices_data_for_user(&user_id).await?; spawn_force_close_tb_session_task(old_device_ids); // Reset device list (perform update) let login_time = chrono::Utc::now(); debug!(user_id, "Registering new primary device..."); self .client .register_primary_device( &user_id, device_key_upload, platform_metadata, login_time, device_list_payload, ) .await?; // Create new access token let token_data = AccessTokenData::with_created_time( user_id.clone(), new_primary_device_id, login_time, crate::token::AuthType::Password, &mut OsRng, ); let access_token = token_data.access_token.clone(); self.client.put_access_token_data(token_data).await?; Ok(access_token) } } #[tracing::instrument(skip_all)] pub fn handle_db_error(db_error: DBError) -> tonic::Status { use comm_lib::aws::DynamoDBError as DDBError; use tonic::Status; use tonic_status_messages as msg; use DBError as E; match db_error { E::AwsSdk(DDBError::InternalServerError(_)) | E::AwsSdk(DDBError::ProvisionedThroughputExceededException(_)) | E::AwsSdk(DDBError::RequestLimitExceeded(_)) => { Status::unavailable(msg::RETRY) } E::AwsSdk(ref err) if is_transaction_conflict(err) => { warn!("DB operation conflicted. Returning RETRY status."); Status::unavailable(msg::RETRY) } E::DeviceList(DeviceListError::InvalidDeviceListUpdate) => { Status::invalid_argument(msg::INVALID_DEVICE_LIST_UPDATE) } E::DeviceList(DeviceListError::InvalidSignature) => { Status::invalid_argument(msg::INVALID_DEVICE_LIST_SIGNATURE) } e => { error!( errorType = error_types::GENERIC_DB_LOG, "Encountered an unexpected error: {}", e ); Status::failed_precondition(msg::UNEXPECTED_ERROR) } } } impl From for tonic::Status { fn from(err: DBError) -> Self { handle_db_error(err) } } fn construct_user_registration_info( message: &(impl DeviceKeyUploadActions + RegistrationActions), user_id: Option, username: String, farcaster_id: Option, ) -> Result { Ok(UserRegistrationInfo { username, flattened_device_key_upload: construct_flattened_device_key_upload( message, )?, user_id, farcaster_id, initial_device_list: message.get_and_verify_initial_device_list()?, }) } fn construct_user_login_info( user_id: String, username: String, opaque_server_login: comm_opaque2::server::Login, flattened_device_key_upload: FlattenedDeviceKeyUpload, device_to_remove: Option, ) -> Result { Ok(UserLoginInfo { user_id, username, flattened_device_key_upload, opaque_server_login, device_to_remove, }) } fn construct_flattened_device_key_upload( message: &impl DeviceKeyUploadActions, ) -> Result { let key_info = KeyPayload::from_str(&message.payload()?).map_err(|_| { tonic::Status::invalid_argument(tonic_status_messages::MALFORMED_PAYLOAD) })?; let flattened_device_key_upload = FlattenedDeviceKeyUpload { device_id_key: key_info.primary_identity_public_keys.ed25519, key_payload: message.payload()?, key_payload_signature: message.payload_signature()?, content_prekey: message.content_prekey()?, content_prekey_signature: message.content_prekey_signature()?, content_one_time_keys: message.one_time_content_prekeys()?, notif_prekey: message.notif_prekey()?, notif_prekey_signature: message.notif_prekey_signature()?, notif_one_time_keys: message.one_time_notif_prekeys()?, device_type: DeviceType::try_from(DBDeviceTypeInt(message.device_type()?))?, }; Ok(flattened_device_key_upload) } fn spawn_force_close_tb_session_task(device_ids: Vec) { tokio::spawn(async move { debug!( "Attempting to terminate Tunnelbroker sessions for devices: {:?}", device_ids.as_slice() ); - let result = - crate::tunnelbroker::terminate_device_sessions(&device_ids).await; + let result = tunnelbroker::terminate_device_sessions(&device_ids).await; consume_error(result); }); } diff --git a/services/identity/src/backup.rs b/services/identity/src/comm_service/backup.rs similarity index 100% rename from services/identity/src/backup.rs rename to services/identity/src/comm_service/backup.rs diff --git a/services/identity/src/tunnelbroker.rs b/services/identity/src/comm_service/tunnelbroker.rs similarity index 100% rename from services/identity/src/tunnelbroker.rs rename to services/identity/src/comm_service/tunnelbroker.rs diff --git a/services/identity/src/database/device_list.rs b/services/identity/src/database/device_list.rs index f5d92d82f..b568a5e39 100644 --- a/services/identity/src/database/device_list.rs +++ b/services/identity/src/database/device_list.rs @@ -1,2276 +1,2276 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use comm_lib::{ aws::ddb::{ operation::{get_item::GetItemOutput, query::builders::QueryFluentBuilder}, types::{ error::TransactionCanceledException, AttributeValue, Delete, DeleteRequest, Put, TransactWriteItem, Update, WriteRequest, }, }, database::{ batch_operations::ExponentialBackoffConfig, AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, DynamoDBError, TryFromAttribute, }, }; use serde::Serialize; use tracing::{debug, error, trace, warn}; use crate::{ client_service::FlattenedDeviceKeyUpload, + comm_service::tunnelbroker, constants::{ devices_table::{self, *}, error_types, USERS_TABLE, USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, }, ddb_utils::is_transaction_conflict, error::{DeviceListError, Error}, grpc_services::{ protos::{self, unauth::DeviceType}, shared::PlatformMetadata, }, grpc_utils::DeviceKeysInfo, olm::is_valid_olm_key, }; use crate::{error::consume_error, log::redact_sensitive_data}; use super::DatabaseClient; // We omit the content and notif one-time key count attributes from this struct // because they are internal helpers and are not provided by users #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct DeviceRow { #[serde(skip)] pub user_id: String, #[serde(skip)] pub device_id: String, #[serde(rename = "identityKeyInfo")] pub device_key_info: IdentityKeyInfo, pub content_prekey: Prekey, pub notif_prekey: Prekey, /// Timestamp of last login (access token generation) #[serde(skip)] pub login_time: DateTime, #[serde(skip)] pub platform_details: PlatformDetails, } #[derive(Clone, Debug)] pub struct DeviceListRow { pub user_id: String, pub timestamp: DateTime, pub device_ids: Vec, /// Primary device signature. This is `None` for Identity-generated lists. pub current_primary_signature: Option, /// Last primary device signature, in case the primary device has changed /// since last device list update. pub last_primary_signature: Option, } #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct IdentityKeyInfo { pub key_payload: String, pub key_payload_signature: String, } #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct Prekey { pub prekey: String, pub prekey_signature: String, } #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct PlatformDetails { #[serde(serialize_with = "serialize_device_type")] device_type: DeviceType, code_version: u64, state_version: Option, major_desktop_version: Option, } fn serialize_device_type( device_type: &DeviceType, s: S, ) -> Result { let v = device_type.as_str_name().to_lowercase(); v.serialize(s) } /// A struct representing device list update payload /// issued by the primary device. /// For the JSON payload, see [`crate::device_list::SignedDeviceList`] pub struct DeviceListUpdate { pub devices: Vec, pub timestamp: DateTime, /// Primary device signature. This is `None` for Identity-generated lists. pub current_primary_signature: Option, /// Last primary device signature, in case the primary device has changed /// since last device list update. pub last_primary_signature: Option, /// Raw update payload to verify signatures pub raw_payload: String, } impl DeviceRow { #[tracing::instrument(skip_all)] pub fn from_device_key_upload( user_id: impl Into, upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, login_time: DateTime, ) -> Result { if !is_valid_olm_key(&upload.content_prekey) || !is_valid_olm_key(&upload.notif_prekey) { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Invalid prekey format" ); return Err(Error::InvalidFormat); } let key_upload_device_type = DeviceType::from_str_name(upload.device_type.as_str_name()) .expect("DeviceType conversion failed. Identity client and server protos mismatch"); let platform_details = PlatformDetails::new(platform_metadata, Some(key_upload_device_type))?; let device_row = Self { user_id: user_id.into(), device_id: upload.device_id_key, device_key_info: IdentityKeyInfo { key_payload: upload.key_payload, key_payload_signature: upload.key_payload_signature, }, content_prekey: Prekey { prekey: upload.content_prekey, prekey_signature: upload.content_prekey_signature, }, notif_prekey: Prekey { prekey: upload.notif_prekey, prekey_signature: upload.notif_prekey_signature, }, platform_details, login_time, }; Ok(device_row) } pub fn device_type(&self) -> &DeviceType { &self.platform_details.device_type } } impl DeviceListRow { /// Generates new device list row from given devices. /// Used only for Identity-generated (unsigned) device lists. fn new( user_id: impl Into, device_ids: Vec, update_info: &UpdateOperationInfo, ) -> Self { Self { user_id: user_id.into(), device_ids, timestamp: update_info.timestamp.unwrap_or_else(Utc::now), current_primary_signature: update_info.current_signature.clone(), last_primary_signature: update_info.last_signature.clone(), } } pub fn has_device(&self, device_id: &String) -> bool { self.device_ids.contains(device_id) } pub fn is_primary_device(&self, device_id: &String) -> bool { self .device_ids .first() .filter(|it| *it == device_id) .is_some() } pub fn has_secondary_device(&self, device_id: &String) -> bool { self.has_device(device_id) && !self.is_primary_device(device_id) } pub fn primary_device_id(&self) -> Option<&String> { self.device_ids.first() } } impl PlatformDetails { pub fn new( metadata: PlatformMetadata, key_upload_device_type: Option, ) -> Result { let PlatformMetadata { device_type, .. } = metadata; let metadata_device_type = DeviceType::from_str_name(&device_type.to_uppercase()); let device_type = match (metadata_device_type, key_upload_device_type) { (Some(metadata_value), None) => metadata_value, (Some(metadata_value), Some(key_upload_value)) => { if metadata_value != key_upload_value { warn!( "DeviceKeyUpload device type ({1}) mismatches request metadata platform ({2}). {0}", "Preferring value from key uplaod.", key_upload_value.as_str_name(), metadata_value.as_str_name() ); } key_upload_value } (None, Some(key_upload_value)) => key_upload_value, (None, None) => { warn!( "Received invalid device_type in request metadata: {}", device_type ); return Err(Error::InvalidFormat); } }; Ok(Self { device_type, code_version: metadata.code_version, state_version: metadata.state_version, major_desktop_version: metadata.major_desktop_version, }) } } impl From> for protos::auth::UserDevicesPlatformDetails { fn from(devices_map: HashMap) -> Self { let devices_platform_details = devices_map .into_iter() .map(|(device_id, platform_details)| (device_id, platform_details.into())) .collect(); Self { devices_platform_details, } } } // helper structs for converting to/from attribute values for sort key (a.k.a itemID) pub struct DeviceIDAttribute(pub String); struct DeviceListKeyAttribute(DateTime); impl DeviceIDAttribute { /// Retrieves the device ID string pub fn into_inner(self) -> String { self.0 } } impl From for AttributeValue { fn from(value: DeviceIDAttribute) -> Self { AttributeValue::S(format!("{DEVICE_ITEM_KEY_PREFIX}{}", value.0)) } } impl From for AttributeValue { fn from(value: DeviceListKeyAttribute) -> Self { AttributeValue::S(format!( "{DEVICE_LIST_KEY_PREFIX}{}", value.0.to_rfc3339() )) } } impl TryFrom> for DeviceIDAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device- prefix let device_id = item_id .strip_prefix(DEVICE_ITEM_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, })? .to_string(); Ok(Self(device_id)) } } impl TryFrom> for DeviceListKeyAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device-list- prefix, then parse the timestamp let timestamp: DateTime = item_id .strip_prefix(DEVICE_LIST_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, }) .and_then(|s| { s.parse().map_err(|e| { DBItemError::new( ATTR_ITEM_ID.to_string(), item_id.clone().into(), DBItemAttributeError::InvalidTimestamp(e), ) }) })?; Ok(Self(timestamp)) } } impl TryFrom for DeviceRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(ATTR_USER_ID)?; let DeviceIDAttribute(device_id) = attrs.remove(ATTR_ITEM_ID).try_into()?; let device_key_info = attrs .take_attr::(ATTR_DEVICE_KEY_INFO) .and_then(IdentityKeyInfo::try_from)?; let content_prekey = attrs .take_attr::(ATTR_CONTENT_PREKEY) .and_then(Prekey::try_from)?; let notif_prekey = attrs .take_attr::(ATTR_NOTIF_PREKEY) .and_then(Prekey::try_from)?; let login_time: DateTime = attrs.take_attr(ATTR_LOGIN_TIME)?; let platform_details = take_platform_details(&mut attrs)?; Ok(Self { user_id, device_id, device_key_info, content_prekey, notif_prekey, platform_details, login_time, }) } } impl From for AttributeMap { fn from(value: DeviceRow) -> Self { HashMap::from([ (ATTR_USER_ID.to_string(), AttributeValue::S(value.user_id)), ( ATTR_ITEM_ID.to_string(), DeviceIDAttribute(value.device_id).into(), ), ( ATTR_PLATFORM_DETAILS.to_string(), value.platform_details.into(), ), ( ATTR_DEVICE_KEY_INFO.to_string(), value.device_key_info.into(), ), (ATTR_CONTENT_PREKEY.to_string(), value.content_prekey.into()), (ATTR_NOTIF_PREKEY.to_string(), value.notif_prekey.into()), // migration attributes ( ATTR_LOGIN_TIME.to_string(), AttributeValue::S(value.login_time.to_rfc3339()), ), ]) } } impl From for protos::unauth::IdentityKeyInfo { fn from(value: IdentityKeyInfo) -> Self { Self { payload: value.key_payload, payload_signature: value.key_payload_signature, } } } impl From for AttributeValue { fn from(value: IdentityKeyInfo) -> Self { let attrs = HashMap::from([ ( ATTR_KEY_PAYLOAD.to_string(), AttributeValue::S(value.key_payload), ), ( ATTR_KEY_PAYLOAD_SIGNATURE.to_string(), AttributeValue::S(value.key_payload_signature), ), ]); AttributeValue::M(attrs) } } impl TryFrom for IdentityKeyInfo { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let key_payload = attrs.take_attr(ATTR_KEY_PAYLOAD)?; let key_payload_signature = attrs.take_attr(ATTR_KEY_PAYLOAD_SIGNATURE)?; Ok(Self { key_payload, key_payload_signature, }) } } impl From for AttributeValue { fn from(value: Prekey) -> Self { let attrs = HashMap::from([ (ATTR_PREKEY.to_string(), AttributeValue::S(value.prekey)), ( ATTR_PREKEY_SIGNATURE.to_string(), AttributeValue::S(value.prekey_signature), ), ]); AttributeValue::M(attrs) } } impl From for protos::unauth::Prekey { fn from(value: Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl From for Prekey { fn from(value: protos::unauth::Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl TryFrom for Prekey { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let prekey = attrs.take_attr(ATTR_PREKEY)?; let prekey_signature = attrs.take_attr(ATTR_PREKEY_SIGNATURE)?; Ok(Self { prekey, prekey_signature, }) } } impl From for AttributeValue { fn from(value: PlatformDetails) -> Self { let mut attrs = HashMap::from([ ( ATTR_DEVICE_TYPE.to_string(), AttributeValue::S(value.device_type.as_str_name().to_string()), ), ( ATTR_CODE_VERSION.to_string(), AttributeValue::N(value.code_version.to_string()), ), ]); if let Some(state_version) = value.state_version { attrs.insert( ATTR_STATE_VERSION.to_string(), AttributeValue::N(state_version.to_string()), ); } if let Some(major_desktop_version) = value.major_desktop_version { attrs.insert( ATTR_MAJOR_DESKTOP_VERSION.to_string(), AttributeValue::N(major_desktop_version.to_string()), ); } AttributeValue::M(attrs) } } impl TryFrom for PlatformDetails { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let raw_device_type: String = attrs.take_attr(ATTR_DEVICE_TYPE)?; let device_type = DeviceType::from_str_name(&raw_device_type).ok_or_else(|| { DBItemError::new( ATTR_DEVICE_TYPE.to_string(), raw_device_type.into(), DBItemAttributeError::InvalidValue, ) })?; let code_version = attrs .remove(ATTR_CODE_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .unwrap_or_default(); let state_version = attrs .remove(ATTR_STATE_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()); let major_desktop_version = attrs .remove(ATTR_MAJOR_DESKTOP_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()); Ok(Self { device_type, code_version, state_version, major_desktop_version, }) } } impl TryFromAttribute for PlatformDetails { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { AttributeMap::try_from_attr(attribute_name, attribute) .and_then(PlatformDetails::try_from) } } impl From for protos::auth::PlatformDetails { fn from(value: PlatformDetails) -> Self { Self { device_type: value.device_type.into(), code_version: value.code_version, state_version: value.state_version, major_desktop_version: value.major_desktop_version, } } } impl TryFrom for DeviceListRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id: String = attrs.take_attr(ATTR_USER_ID)?; let DeviceListKeyAttribute(timestamp) = attrs.remove(ATTR_ITEM_ID).try_into()?; // validate timestamps are in sync let timestamps_match = attrs .remove(ATTR_TIMESTAMP) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .filter(|val| *val == timestamp.timestamp_millis()) .is_some(); if !timestamps_match { warn!( "DeviceList timestamp mismatch for (userID={}, itemID={})", redact_sensitive_data(&user_id), timestamp.to_rfc3339() ); } let device_ids: Vec = attrs.take_attr(ATTR_DEVICE_IDS)?; let current_primary_signature = attrs.take_attr(ATTR_CURRENT_SIGNATURE)?; let last_primary_signature = attrs.take_attr(ATTR_LAST_SIGNATURE)?; Ok(Self { user_id, timestamp, device_ids, current_primary_signature, last_primary_signature, }) } } impl From for AttributeMap { fn from(device_list: DeviceListRow) -> Self { let mut attrs = HashMap::new(); attrs.insert( ATTR_USER_ID.to_string(), AttributeValue::S(device_list.user_id.clone()), ); attrs.insert( ATTR_ITEM_ID.to_string(), DeviceListKeyAttribute(device_list.timestamp).into(), ); attrs.insert( ATTR_TIMESTAMP.to_string(), AttributeValue::N(device_list.timestamp.timestamp_millis().to_string()), ); attrs.insert( ATTR_DEVICE_IDS.to_string(), AttributeValue::L( device_list .device_ids .into_iter() .map(AttributeValue::S) .collect(), ), ); if let Some(current_signature) = device_list.current_primary_signature { attrs.insert( ATTR_CURRENT_SIGNATURE.to_string(), AttributeValue::S(current_signature), ); } if let Some(last_signature) = device_list.last_primary_signature { attrs.insert( ATTR_CURRENT_SIGNATURE.to_string(), AttributeValue::S(last_signature), ); } attrs } } impl DatabaseClient { /// Retrieves user's current devices and their full data #[tracing::instrument(skip_all)] pub async fn get_current_devices( &self, user_id: impl Into, ) -> Result, Error> { let response = query_rows_with_prefix(self, user_id, DEVICE_ITEM_KEY_PREFIX) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to get current devices: {:?}", e ); Error::AwsSdk(e.into()) })?; let Some(rows) = response.items else { return Ok(Vec::new()); }; rows .into_iter() .map(DeviceRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Gets user's device list history #[tracing::instrument(skip_all)] pub async fn get_device_list_history( &self, user_id: impl Into, since: Option>, ) -> Result, Error> { let rows = if let Some(since) = since { // When timestamp is provided, it's better to query device lists by timestamp LSI self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id AND #timestamp > :since") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#timestamp", ATTR_TIMESTAMP) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .expression_attribute_values( ":since", AttributeValue::N(since.timestamp_millis().to_string()), ) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to query device list updates by index: {:?}", e ); Error::AwsSdk(e.into()) })? .items } else { // Query all device lists for user query_rows_with_prefix(self, user_id, DEVICE_LIST_KEY_PREFIX) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to query device list updates (all): {:?}", e ); Error::AwsSdk(e.into()) })? .items }; rows .unwrap_or_default() .into_iter() .map(DeviceListRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Returns all devices' keys for the given user. Response is in the same format /// as [DatabaseClient::get_keys_for_user] for compatibility reasons. #[tracing::instrument(skip_all)] pub async fn get_keys_for_user_devices( &self, user_id: impl Into, ) -> Result { let user_devices = self.get_current_devices(user_id).await?; let user_devices_keys = user_devices .into_iter() .map(|device| (device.device_id.clone(), DeviceKeysInfo::from(device))) .collect(); Ok(user_devices_keys) } /// Find owner's user ID for given device ID. Useful for finding /// devices table partition key. #[tracing::instrument(skip_all)] pub async fn find_user_id_for_device( &self, device_id: &str, ) -> Result, Error> { let response = self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::DEVICE_ID_INDEX_NAME) .key_condition_expression("#item_id = :device_id_attr") .expression_attribute_names("#item_id", devices_table::ATTR_ITEM_ID) .expression_attribute_values( ":device_id_attr", DeviceIDAttribute(device_id.to_string()).into(), ) .send() .await .map_err(|err| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to query for device ID: {:?}", err ); Error::AwsSdk(err.into()) })?; let Some(mut results) = response.items else { debug!("Query by deviceID returned empty response"); return Ok(None); }; if results.len() > 1 { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Devices table contains more than one device with ID: {}", device_id ); return Err(Error::IllegalState); } let user_id = results .pop() .map(|mut attrs| attrs.take_attr::(devices_table::ATTR_USER_ID)) .transpose()?; Ok(user_id) } #[tracing::instrument(skip_all)] pub async fn find_device_by_id( &self, device_id: &str, ) -> Result, Error> { let Some(user_id) = self.find_user_id_for_device(device_id).await? else { debug!("No device found with ID: {}", device_id); return Ok(None); }; self.get_device_data(user_id, device_id).await } #[tracing::instrument(skip_all)] pub async fn update_device_prekeys( &self, user_id: impl Into, device_id: impl Into, content_prekey: Prekey, notif_prekey: Prekey, ) -> Result<(), Error> { if !is_valid_olm_key(&content_prekey.prekey) || !is_valid_olm_key(¬if_prekey.prekey) { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Invalid prekey format" ); return Err(Error::InvalidFormat); } let db_operation = self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression( "SET #content_prekey = :content_prekey, #notif_prekey = :notif_prekey", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#content_prekey", ATTR_CONTENT_PREKEY) .expression_attribute_names("#notif_prekey", ATTR_NOTIF_PREKEY) .expression_attribute_values(":content_prekey", content_prekey.into()) .expression_attribute_values(":notif_prekey", notif_prekey.into()); let retry_config = ExponentialBackoffConfig::default(); let mut exponential_backoff = retry_config.new_counter(); loop { let result = db_operation.clone().send().await; match result { Ok(_) => return Ok(()), Err(err) => match DynamoDBError::from(err) { ref conflict_err if is_transaction_conflict(conflict_err) => { exponential_backoff.sleep_and_retry().await?; } error => { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to update device prekeys: {:?}", error ); return Err(error.into()); } }, } } } /// Checks if given device exists on user's current device list #[tracing::instrument(skip_all)] pub async fn device_exists( &self, user_id: impl Into, device_id: impl Into, ) -> Result { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) // only fetch the primary key, we don't need the rest .projection_expression(format!("{ATTR_USER_ID}, {ATTR_ITEM_ID}")) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to check if device exists: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(item.is_some()) } #[tracing::instrument(skip_all)] pub async fn get_device_data( &self, user_id: impl Into, device_id: impl Into, ) -> Result, Error> { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to fetch device data: {:?}", e ); Error::AwsSdk(e.into()) })?; let Some(attrs) = item else { return Ok(None); }; let device_data = DeviceRow::try_from(attrs)?; Ok(Some(device_data)) } /// Fails if the device list is empty #[tracing::instrument(skip_all)] pub async fn get_primary_device_data( &self, user_id: &str, ) -> Result { let device_list = self.get_current_device_list(user_id).await?; let Some(primary_device_id) = device_list .as_ref() .and_then(|list| list.device_ids.first()) else { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::DEVICE_LIST_DB_LOG, "Device list is empty. Cannot fetch primary device" ); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); }; self .get_device_data(user_id, primary_device_id) .await? .ok_or_else(|| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Corrupt database. Missing primary device data for user {}", user_id ); Error::MissingItem }) } /// Required only for migration purposes (determining primary device) #[tracing::instrument(skip_all)] pub async fn update_device_login_time( &self, user_id: impl Into, device_id: impl Into, login_time: DateTime, ) -> Result<(), Error> { self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression("SET #login_time = :login_time") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#login_time", ATTR_LOGIN_TIME) .expression_attribute_values( ":login_time", AttributeValue::S(login_time.to_rfc3339()), ) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to update device login time: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(()) } #[tracing::instrument(skip_all)] pub async fn update_device_platform_details( &self, user_id: impl Into, device_id: impl Into, platform_details: PlatformDetails, ) -> Result<(), Error> { self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression("SET #platform_details = :platform_details") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#platform_details", ATTR_PLATFORM_DETAILS) .expression_attribute_values(":platform_details", platform_details.into()) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to update device platform details: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(()) } #[tracing::instrument(skip_all)] pub async fn get_current_device_list( &self, user_id: impl Into, ) -> Result, Error> { self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id") // sort descending .scan_index_forward(false) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .limit(1) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to query device list updates by index: {:?}", e ); Error::AwsSdk(e.into()) })? .items .and_then(|mut items| items.pop()) .map(DeviceListRow::try_from) .transpose() .map_err(Error::from) } /// Fetches latest device lists for multiple users in batch. #[tracing::instrument(skip_all)] pub async fn get_current_device_lists( &self, user_ids: impl IntoIterator, ) -> Result, Error> { // 1a. Prepare primary keys for device list timestamps let primary_keys = user_ids .into_iter() .map(|user_id| { AttributeMap::from([( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), )]) }) .collect::>(); let projection_expression = Some( [ USERS_TABLE_PARTITION_KEY, USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, ] .join(", "), ); // 1b. Fetch latest device list timestamps let timestamps = comm_lib::database::batch_operations::batch_get( &self.client, USERS_TABLE, primary_keys, projection_expression, Default::default(), ) .await?; // 2a. Prepare primary keys for latest device lists let device_list_primary_keys: Vec = timestamps .into_iter() .filter_map(|mut attrs| { let user_id: String = attrs.take_attr(USERS_TABLE_PARTITION_KEY).ok()?; let Ok(timestamp) = attrs .take_attr::(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME) else { warn!( "{} attribute missing for userID={}. Skipping", USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, redact_sensitive_data(&user_id) ); return None; }; let pk = AttributeMap::from([ ( devices_table::ATTR_USER_ID.to_string(), AttributeValue::S(user_id), ), ( devices_table::ATTR_ITEM_ID.to_string(), AttributeValue::S(format!("{DEVICE_LIST_KEY_PREFIX}{}", timestamp)), ), ]); Some(pk) }) .collect(); // 2b. Fetch latest device lists trace!( "Finding device lists for {} users having valid timestamp", device_list_primary_keys.len() ); let device_list_results = comm_lib::database::batch_operations::batch_get( &self.client, devices_table::NAME, device_list_primary_keys, None, Default::default(), ) .await?; // 3. Prepare output format let device_lists = device_list_results .into_iter() .map(|attrs| { let user_id: String = attrs.get_attr(devices_table::ATTR_USER_ID)?; let device_list = DeviceListRow::try_from(attrs)?; Ok((user_id, device_list)) }) .collect::, DBItemError>>()?; Ok(device_lists) } /// Gets [`PlatformDetails`] for multiple users. /// Takes iterable collection of tuples `(UserID, DeviceID)`. /// Returns nested map: `Map>`. #[tracing::instrument(skip_all)] pub async fn get_devices_platform_details( &self, user_device_ids: impl IntoIterator, ) -> Result>, Error> { let primary_keys = user_device_ids .into_iter() .map(|(user_id, device_id)| { AttributeMap::from([ ( devices_table::ATTR_USER_ID.to_string(), AttributeValue::S(user_id), ), ( devices_table::ATTR_ITEM_ID.to_string(), DeviceIDAttribute(device_id).into(), ), ]) }) .collect::>(); let projection_expression = Some( [ devices_table::ATTR_USER_ID, devices_table::ATTR_ITEM_ID, devices_table::ATTR_PLATFORM_DETAILS, // we need these for legacy devices without ATTR_PLATFORM_DETAILS devices_table::OLD_ATTR_DEVICE_TYPE, devices_table::OLD_ATTR_CODE_VERSION, ] .join(", "), ); let fetched_results = comm_lib::database::batch_operations::batch_get( &self.client, devices_table::NAME, primary_keys, projection_expression, Default::default(), ) .await?; let mut users_devices_platform_details = HashMap::new(); for mut item in fetched_results { let user_id: String = item.take_attr(devices_table::ATTR_USER_ID)?; let device_id: DeviceIDAttribute = item.remove(devices_table::ATTR_ITEM_ID).try_into()?; let platform_details = take_platform_details(&mut item)?; let user_devices = users_devices_platform_details .entry(user_id) .or_insert_with(HashMap::new); user_devices.insert(device_id.into_inner(), platform_details); } Ok(users_devices_platform_details) } /// Adds device data to devices table. If the device already exists, its /// data is overwritten. This does not update the device list; the device ID /// should already be present in the device list. #[tracing::instrument(skip_all)] pub async fn put_device_data( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, login_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = device_key_upload.notif_one_time_keys.clone(); let user_id_string = user_id.into(); let new_device = DeviceRow::from_device_key_upload( user_id_string.clone(), device_key_upload, platform_metadata, login_time, )?; let device_id = new_device.device_id.clone(); self .client .put_item() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to put device data: {:?}", e ); Error::AwsSdk(e.into()) })?; self .append_one_time_prekeys( &user_id_string, &device_id, &content_one_time_keys, ¬if_one_time_keys, ) .await?; Ok(()) } /// Removes device data from devices table. If the device doesn't exist, /// it is a no-op. This does not update the device list; the device ID /// should be removed from the device list separately. #[tracing::instrument(skip_all)] pub async fn remove_device_data( &self, user_id: impl Into, device_id: impl Into, ) -> Result<(), Error> { let user_id = user_id.into(); let device_id = device_id.into(); self .client .delete_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id)) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id).into()) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to delete device data: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(()) } /// Registers primary device for user, stores its signed device list pub async fn register_primary_device( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, login_time: DateTime, singleton_device_list: DeviceListUpdate, ) -> Result<(), Error> { let user_id: String = user_id.into(); self .transact_update_devicelist(&user_id, |device_ids, devices_data| { // Allow replacing existing device list if last_primary_signature is present. // This is the case for backup restore protocol. let allow_device_list_overwrite = singleton_device_list.last_primary_signature.is_some(); if (!device_ids.is_empty() && !allow_device_list_overwrite) || !devices_data.is_empty() { warn!( "Tried creating singleton device list for already existing user (userID={})", redact_sensitive_data(&user_id), ); return Err(Error::DeviceList(DeviceListError::DeviceAlreadyExists)); } // Set device list *device_ids = singleton_device_list.devices.clone(); let primary_device = DeviceRow::from_device_key_upload( &user_id, device_key_upload, platform_metadata, login_time, )?; // Put device keys into DDB let put_device = Put::builder() .table_name(devices_table::NAME) .set_item(Some(primary_device.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build() .expect("table_name or item not set in Put builder"); let put_device_operation = TransactWriteItem::builder().put(put_device).build(); let update_info = UpdateOperationInfo::primary_device_issued(singleton_device_list) .with_ddb_operation(put_device_operation); Ok(update_info) }) .await?; Ok(()) } /// Adds new device to user's device list. If the device already exists, the /// operation fails. Transactionally generates new device list version. pub async fn add_device( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, login_time: DateTime, ) -> Result<(), Error> { let user_id: String = user_id.into(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let new_device = DeviceRow::from_device_key_upload( &user_id, device_key_upload, platform_metadata, login_time, )?; if device_ids.iter().any(|id| &new_device.device_id == id) { warn!( "Device already exists in user's device list \ (userID={}, deviceID={})", redact_sensitive_data(&user_id), redact_sensitive_data(&new_device.device_id) ); return Err(Error::DeviceList(DeviceListError::DeviceAlreadyExists)); } device_ids.push(new_device.device_id.clone()); // Reorder devices (determine primary device again) devices_data.push(new_device.clone()); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Put new device let put_device = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build() .expect("table_name or item not set in Put builder"); let put_device_operation = TransactWriteItem::builder().put(put_device).build(); let update_info = UpdateOperationInfo::identity_generated() .with_ddb_operation(put_device_operation); Ok(update_info) }) .await?; Ok(()) } /// Removes device from user's device list. If the device doesn't exist, the /// operation fails. Transactionally generates new device list version. pub async fn remove_device( &self, user_id: impl Into, device_id: impl AsRef, ) -> Result<(), Error> { let user_id: String = user_id.into(); let device_id = device_id.as_ref(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let device_exists = device_ids.iter().any(|id| id == device_id); if !device_exists { warn!( "Device doesn't exist in user's device list \ (userID={}, deviceID={})", redact_sensitive_data(&user_id), redact_sensitive_data(device_id) ); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); } device_ids.retain(|id| id != device_id); // Reorder devices (determine primary device again) devices_data.retain(|d| d.device_id != device_id); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Delete device DDB operation let delete_device = Delete::builder() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.clone())) .key( ATTR_ITEM_ID, DeviceIDAttribute(device_id.to_string()).into(), ) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build() .expect("table_name or key not set in Delete builder"); let operation = TransactWriteItem::builder().delete(delete_device).build(); let update_info = UpdateOperationInfo::identity_generated() .with_ddb_operation(operation); Ok(update_info) }) .await?; Ok(()) } /// applies updated device list received from primary device pub async fn apply_devicelist_update( &self, user_id: &str, update: DeviceListUpdate, // A function that receives previous and new device IDs and // returns boolean determining if the new device list is valid. validator_fn: Option, // Whether to remove device data when a device is removed from the list. remove_device_data: bool, ) -> Result where V: Fn(&[&str], &[&str]) -> bool, { use std::collections::HashSet; let new_list = update.devices.clone(); let mut devices_being_removed: Vec = Vec::new(); let update_result = self .transact_update_devicelist(user_id, |current_list, _| { crate::device_list::verify_device_list_signatures( current_list.first(), &update, )?; let previous_device_ids: Vec<&str> = current_list.iter().map(AsRef::as_ref).collect(); let new_device_ids: Vec<&str> = new_list.iter().map(AsRef::as_ref).collect(); if let Some(validate) = validator_fn { if !validate(&previous_device_ids, &new_device_ids) { warn!("Received invalid device list update"); return Err(Error::DeviceList( DeviceListError::InvalidDeviceListUpdate, )); } } // collect device IDs that were removed let previous_set: HashSet<&str> = previous_device_ids.into_iter().collect(); let new_set: HashSet<&str> = new_device_ids.into_iter().collect(); devices_being_removed .extend(previous_set.difference(&new_set).map(ToString::to_string)); debug!("Applying device list update"); *current_list = new_list; Ok(UpdateOperationInfo::primary_device_issued(update)) }) .await?; if !remove_device_data { return Ok(update_result); } // delete device data and invalidate CSAT for removed devices debug!( "{} devices have been removed from device list. Clearing data...", devices_being_removed.len() ); for device_id in devices_being_removed { trace!("Invalidating CSAT for device {}", device_id); self.delete_access_token_data(user_id, &device_id).await?; trace!("Clearing keys for device {}", device_id); self.remove_device_data(user_id, &device_id).await?; trace!("Pruning OTKs for device {}", device_id); self .delete_otks_table_rows_for_user_device(user_id, &device_id) .await?; let device_id = device_id.to_string(); tokio::spawn(async move { debug!( "Attempting to delete Tunnelbroker data for device: {}", &device_id ); - let result = - crate::tunnelbroker::delete_devices_data(&[device_id]).await; + let result = tunnelbroker::delete_devices_data(&[device_id]).await; consume_error(result); }); } Ok(update_result) } /// Performs a transactional update of the device list for the user. Afterwards /// generates a new device list and updates the timestamp in the users table. /// This is done in a transaction. Operation fails if the device list has been /// updated concurrently (timestamp mismatch). /// Returns the new device list row that has been saved to database. #[tracing::instrument(skip_all)] async fn transact_update_devicelist( &self, user_id: &str, // The closure performing a transactional update of the device list. // It receives two arguments: // 1. A mutable reference to the current device list (ordered device IDs). // 2. Details (full data) of the current devices (unordered). // The closure should return a [`UpdateOperationInfo`] object. action: impl FnOnce( &mut Vec, Vec, ) -> Result, ) -> Result { let previous_timestamp = get_current_devicelist_timestamp(self, user_id).await?; let current_devices_data = self.get_current_devices(user_id).await?; let mut device_ids = self .get_current_device_list(user_id) .await? .map(|device_list| device_list.device_ids) .unwrap_or_default(); // Perform the update action, then generate new device list let update_info = action(&mut device_ids, current_devices_data)?; crate::device_list::verify_device_list_timestamp( previous_timestamp.as_ref(), update_info.timestamp.as_ref(), )?; let new_device_list = DeviceListRow::new(user_id, device_ids, &update_info); // Update timestamp in users table let timestamp_update_operation = device_list_timestamp_update_operation( user_id, previous_timestamp, new_device_list.timestamp, ); // Put updated device list (a new version) let put_device_list = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device_list.clone().into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build() .expect("table_name or item not set in Put builder"); let put_device_list_operation = TransactWriteItem::builder().put(put_device_list).build(); let operations = if let Some(operation) = update_info.ddb_operation { vec![ operation, put_device_list_operation, timestamp_update_operation, ] } else { vec![put_device_list_operation, timestamp_update_operation] }; self .client .transact_write_items() .set_transact_items(Some(operations)) .send() .await .map_err(|e| match DynamoDBError::from(e) { DynamoDBError::TransactionCanceledException( TransactionCanceledException { cancellation_reasons: Some(reasons), .. }, ) if reasons .iter() .any(|reason| reason.code() == Some("ConditionalCheckFailed")) => { Error::DeviceList(DeviceListError::ConcurrentUpdateError) } other => { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Device list update transaction failed: {:?}", other ); Error::AwsSdk(other) } })?; Ok(new_device_list) } /// Deletes all device data for user. Keeps device list rows. /// Returns list of deleted device IDs #[tracing::instrument(skip_all)] pub async fn delete_devices_data_for_user( &self, user_id: impl Into, ) -> Result, Error> { let user_id: String = user_id.into(); // we project only the primary keys so we can pass these directly to delete requests let primary_keys = query_rows_with_prefix(self, &user_id, DEVICE_ITEM_KEY_PREFIX) .projection_expression(format!("{ATTR_USER_ID}, {ATTR_ITEM_ID}")) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to list user's devices' primary keys: {:?}", e ); Error::AwsSdk(e.into()) })? .items .unwrap_or_default(); let device_ids = primary_keys .iter() .map(|attrs| { let attr = attrs.get(devices_table::ATTR_ITEM_ID).cloned(); DeviceIDAttribute::try_from(attr).map(DeviceIDAttribute::into_inner) }) .collect::>()?; let delete_requests = primary_keys .into_iter() .map(|item| { let request = DeleteRequest::builder() .set_key(Some(item)) .build() .expect("key not set in DeleteRequest builder"); WriteRequest::builder().delete_request(request).build() }) .collect::>(); comm_lib::database::batch_operations::batch_write( &self.client, devices_table::NAME, delete_requests, Default::default(), ) .await?; Ok(device_ids) } /// Deletes all user data from devices table #[tracing::instrument(skip_all)] pub async fn delete_devices_table_rows_for_user( &self, user_id: impl Into, ) -> Result<(), Error> { // 1. get all rows // 2. batch write delete all // we project only the primary keys so we can pass these directly to delete requests let primary_keys = self .client .query() .table_name(devices_table::NAME) .projection_expression("#user_id, #item_id") .key_condition_expression("#user_id = :user_id") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .consistent_read(true) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to list user's items in devices table: {:?}", e ); Error::AwsSdk(e.into()) })? .items .unwrap_or_default(); let delete_requests = primary_keys .into_iter() .map(|item| { let request = DeleteRequest::builder() .set_key(Some(item)) .build() .expect("key not set in DeleteRequest builder"); WriteRequest::builder().delete_request(request).build() }) .collect::>(); comm_lib::database::batch_operations::batch_write( &self.client, devices_table::NAME, delete_requests, Default::default(), ) .await?; Ok(()) } } /// Gets timestamp of user's current device list. Returns None if the user /// doesn't have a device list yet. Storing the timestamp in the users table is /// required for consistency. It's used as a condition when updating the device /// list. #[tracing::instrument(skip_all)] async fn get_current_devicelist_timestamp( db: &crate::database::DatabaseClient, user_id: impl Into, ) -> Result>, Error> { let response = db .client .get_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .projection_expression(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME) .send() .await .map_err(|e| { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Failed to get user's device list timestamp: {:?}", e ); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); let raw_datetime = user_item.remove(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME); // existing records will not have this field when // updating device list for the first time if raw_datetime.is_none() { return Ok(None); } let timestamp = DateTime::::try_from_attr( USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, raw_datetime, )?; Ok(Some(timestamp)) } /// Generates update expression for current device list timestamp in users table. /// The previous timestamp is used as a condition to ensure that the value hasn't changed /// since we got it. This avoids race conditions when updating the device list. fn device_list_timestamp_update_operation( user_id: impl Into, previous_timestamp: Option>, new_timestamp: DateTime, ) -> TransactWriteItem { let update_builder = match previous_timestamp { Some(previous_timestamp) => Update::builder() .condition_expression("#device_list_timestamp = :previous_timestamp") .expression_attribute_values( ":previous_timestamp", AttributeValue::S(previous_timestamp.to_rfc3339()), ), // If there's no previous timestamp, the attribute shouldn't exist yet None => Update::builder() .condition_expression("attribute_not_exists(#device_list_timestamp)"), }; let update = update_builder .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .update_expression("SET #device_list_timestamp = :new_timestamp") .expression_attribute_names( "#device_list_timestamp", USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, ) .expression_attribute_values( ":new_timestamp", AttributeValue::S(new_timestamp.to_rfc3339()), ) .build() .expect("table_name, key or update_expression not set in Update builder"); TransactWriteItem::builder().update(update).build() } /// Helper function to query rows by given sort key prefix fn query_rows_with_prefix( db: &crate::database::DatabaseClient, user_id: impl Into, prefix: &'static str, ) -> QueryFluentBuilder { db.client .query() .table_name(devices_table::NAME) .key_condition_expression( "#user_id = :user_id AND begins_with(#item_id, :device_prefix)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values(":user_id", AttributeValue::S(user_id.into())) .expression_attribute_values( ":device_prefix", AttributeValue::S(prefix.to_string()), ) .consistent_read(true) } fn take_platform_details( device_attrs: &mut AttributeMap, ) -> Result { let platform_details_attr: Option = device_attrs.take_attr::>(ATTR_PLATFORM_DETAILS)?; // New schema contains PlatformDetails attribute while legacy schema // contains "deviceType" and "codeVersion" top-level attributes let platform_details = match platform_details_attr { Some(platform_details) => platform_details, None => { let raw_device_type: String = device_attrs.take_attr(OLD_ATTR_DEVICE_TYPE)?; let device_type = DeviceType::from_str_name(&raw_device_type) .ok_or_else(|| { DBItemError::new( OLD_ATTR_DEVICE_TYPE.to_string(), raw_device_type.into(), DBItemAttributeError::InvalidValue, ) })?; let code_version = device_attrs .remove(OLD_ATTR_CODE_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .unwrap_or_default(); PlatformDetails { device_type, code_version, state_version: None, major_desktop_version: None, } } }; Ok(platform_details) } /// [`transact_update_devicelist()`] closure result struct UpdateOperationInfo { /// (optional) transactional DDB operation to be performed /// when updating the device list. ddb_operation: Option, /// new device list timestamp. Defaults to `Utc::now()` /// for Identity-generated device lists. timestamp: Option>, current_signature: Option, last_signature: Option, } impl UpdateOperationInfo { fn identity_generated() -> Self { Self { ddb_operation: None, timestamp: None, current_signature: None, last_signature: None, } } fn primary_device_issued(source: DeviceListUpdate) -> Self { Self { ddb_operation: None, timestamp: Some(source.timestamp), current_signature: source.current_primary_signature, last_signature: source.last_primary_signature, } } fn with_ddb_operation(mut self, operation: TransactWriteItem) -> Self { self.ddb_operation = Some(operation); self } } // Helper module for "migration" code into new device list schema. // We can get rid of this when primary device takes over the responsibility // of managing the device list. mod migration { use std::{cmp::Ordering, collections::HashSet}; use tracing::{debug, error, info}; use super::*; #[tracing::instrument(skip_all)] pub(super) fn reorder_device_list( user_id: &str, list: &mut [String], devices_data: &[DeviceRow], ) { if !verify_device_list_match(list, devices_data) { info!( "Device list for user (userID={1}) does not match devices data. {0}", "Primary device will be selected basing only on devices with data.", redact_sensitive_data(user_id) ); return; } let Some(first_device) = list.first() else { debug!("Skipping device list rotation. Nothing to reorder."); return; }; let Some(primary_device) = determine_primary_device(devices_data) else { info!( "No valid primary device found for user (userID={}).\ Skipping device list reorder.", redact_sensitive_data(user_id) ); return; }; if first_device == &primary_device.device_id { debug!("Skipping device list reorder. Primary device is already first"); return; } // swap primary device with the first one let Some(primary_device_idx) = list.iter().position(|id| id == &primary_device.device_id) else { error!( errorType = error_types::DEVICE_LIST_DB_LOG, "Detected primary device not found in device list (userID={})", redact_sensitive_data(user_id) ); return; }; list.swap(0, primary_device_idx); info!( "Reordered device list for user (userID={})", redact_sensitive_data(user_id) ); } // checks if device list matches given devices data #[tracing::instrument(skip_all)] fn verify_device_list_match( list: &[String], devices_data: &[DeviceRow], ) -> bool { if list.len() != devices_data.len() { debug!( list_len = list.len(), data_len = devices_data.len(), "Device list length mismatch!" ); return false; } let actual_device_ids = devices_data .iter() .map(|device| &device.device_id) .collect::>(); let device_list_set = list.iter().collect::>(); // devices on device list but with no keys uploaded // this is normal in some flows if let Some(unknown_device_id) = device_list_set.difference(&actual_device_ids).next() { debug!( "Device list and data out of sync (unregistered deviceID={})", unknown_device_id ); return false; } // devices that have devices data (keys etc) but not on device list // this should never happen in any login flow and means we have corrupt state if let Some(unknown_device_id) = actual_device_ids.difference(&device_list_set).next() { warn!( "Device ID={} registered, but not on device list!", unknown_device_id ); return false; } true } /// Returns reference to primary device (if any) from given list of devices /// or None if there's no valid primary device. fn determine_primary_device(devices: &[DeviceRow]) -> Option<&DeviceRow> { // 1. Find mobile devices with valid token // 2. Prioritize these with latest code version // 3. If there's a tie, select the one with latest login time let mut mobile_devices = devices .iter() .filter(|device| { *device.device_type() == DeviceType::Ios || *device.device_type() == DeviceType::Android }) .collect::>(); mobile_devices.sort_by(|a, b| { let code_version_cmp = b .platform_details .code_version .cmp(&a.platform_details.code_version); if code_version_cmp == Ordering::Equal { b.login_time.cmp(&a.login_time) } else { code_version_cmp } }); mobile_devices.first().cloned() } #[cfg(test)] mod tests { use super::*; use chrono::Duration; #[test] fn reorder_skips_no_devices() { let mut list = vec![]; reorder_device_list("", &mut list, &[]); assert_eq!(list, Vec::::new()); } #[test] fn reorder_skips_single_device() { let mut list = vec!["test".into()]; let devices_data = vec![create_test_device("test", DeviceType::Web, 0, Utc::now())]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["test"]); } #[test] fn reorder_skips_for_valid_list() { let mut list = vec!["mobile".into(), "web".into()]; let devices_data = vec![ create_test_device("mobile", DeviceType::Android, 1, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn reorder_swaps_primary_device_when_possible() { let mut list = vec!["web".into(), "mobile".into()]; let devices_data = vec![ create_test_device("web", DeviceType::Web, 0, Utc::now()), create_test_device("mobile", DeviceType::Android, 1, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn determine_primary_device_returns_none_for_empty_list() { let devices = vec![]; assert!(determine_primary_device(&devices).is_none()); } #[test] fn determine_primary_device_returns_none_for_web_only() { let devices = vec![create_test_device("web", DeviceType::Web, 0, Utc::now())]; assert!( determine_primary_device(&devices).is_none(), "Primary device should be None for web-only devices" ); } #[test] fn determine_primary_device_prioritizes_mobile() { let devices = vec![ create_test_device("mobile", DeviceType::Android, 0, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile", "Primary device should be mobile" ); } #[test] fn determine_primary_device_prioritizes_latest_code_version() { let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, Utc::now()), create_test_device("mobile2", DeviceType::Android, 2, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2", "Primary device should be mobile with latest code version" ); } #[test] fn determine_primary_device_prioritizes_latest_login_time() { let devices = vec![ create_test_device("mobile1_today", DeviceType::Ios, 1, Utc::now()), create_test_device( "mobile2_yesterday", DeviceType::Android, 1, Utc::now() - Duration::days(1), ), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1_today", "Primary device should be mobile with latest login time" ); } #[test] fn determine_primary_device_keeps_deterministic_order() { // Given two identical devices, the first one should be selected as primary let today = Utc::now(); let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, today), create_test_device("mobile2", DeviceType::Android, 1, today), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1", "Primary device selection should be deterministic" ); } #[test] fn determine_primary_device_all_rules_together() { use DeviceType::{Android, Ios, Web}; let today = Utc::now(); let yesterday = today - Duration::days(1); let devices = vec![ create_test_device("mobile1_today", Android, 1, today), create_test_device("mobile2_today", Android, 2, today), create_test_device("mobile3_yesterday", Ios, 1, yesterday), create_test_device("mobile4_yesterday", Ios, 2, yesterday), create_test_device("web", Web, 5, today), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2_today", "Primary device should be mobile with latest code version and login time" ); } fn create_test_device( id: &str, platform: DeviceType, code_version: u64, login_time: DateTime, ) -> DeviceRow { DeviceRow { user_id: "test".into(), device_id: id.into(), device_key_info: IdentityKeyInfo { key_payload: "".into(), key_payload_signature: "".into(), }, content_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, notif_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, platform_details: PlatformDetails { device_type: platform, code_version, state_version: None, major_desktop_version: None, }, login_time, } } } } diff --git a/services/identity/src/database/one_time_keys.rs b/services/identity/src/database/one_time_keys.rs index 79b3a6d6b..e0cfccd58 100644 --- a/services/identity/src/database/one_time_keys.rs +++ b/services/identity/src/database/one_time_keys.rs @@ -1,504 +1,504 @@ use std::collections::HashSet; use comm_lib::{ aws::{ ddb::types::{ AttributeValue, Delete, DeleteRequest, TransactWriteItem, Update, WriteRequest, }, DynamoDBError, }, database::{ batch_operations::{batch_write, ExponentialBackoffConfig}, parse_int_attribute, AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, }, }; use tracing::{debug, error, info, warn}; use crate::{ + comm_service::tunnelbroker, constants::{ error_types, MAX_ONE_TIME_KEYS, ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, }, database::DeviceIDAttribute, ddb_utils::{ create_one_time_key_partition_key, into_one_time_put_requests, into_one_time_update_and_delete_requests, is_transaction_retryable, OlmAccountType, }, error::{consume_error, Error}, olm::is_valid_olm_key, }; use super::DatabaseClient; impl DatabaseClient { /// Gets the next one-time key for the account and then, in a transaction, /// deletes the key and updates the key count /// /// Returns the retrieved one-time key if it exists and a boolean indicating /// whether the `spawn_refresh_keys_task`` was called #[tracing::instrument(skip_all)] pub(super) async fn get_one_time_key( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, can_request_more_keys: bool, ) -> Result<(Option, bool), Error> { use crate::constants::devices_table; use crate::constants::retry; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; let attr_otk_count = match account_type { OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, }; fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block let device_id = device_id.to_string(); tokio::spawn(async move { debug!("Attempting to request more keys for device: {}", &device_id); - let result = - crate::tunnelbroker::send_refresh_keys_request(&device_id).await; + let result = tunnelbroker::send_refresh_keys_request(&device_id).await; consume_error(result); }); } // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` // in `comm-lib` to handle transactions with retries let retry_config = ExponentialBackoffConfig { max_attempts: retry::MAX_ATTEMPTS as u32, ..Default::default() }; // TODO: Introduce nanny task that handles calling `spawn_refresh_keys_task` let mut requested_more_keys = false; let mut exponential_backoff = retry_config.new_counter(); loop { let otk_count = self.get_otk_count(user_id, device_id, account_type).await?; if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD && can_request_more_keys { spawn_refresh_keys_task(device_id); requested_more_keys = true; } if otk_count < 1 { return Ok((None, requested_more_keys)); } let Some(otk_row) = self .get_one_time_keys(user_id, device_id, account_type, Some(1)) .await? .pop() else { return Ok((None, requested_more_keys)); }; let delete_otk_operation = otk_row.as_delete_request(); let update_otk_count = Update::builder() .table_name(devices_table::NAME) .key( devices_table::ATTR_USER_ID, AttributeValue::S(user_id.to_string()), ) .key( devices_table::ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into(), ) .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) .expression_attribute_values( ":decrement_val", AttributeValue::N("-1".to_string()), ) .condition_expression(format!("{} = :old_val", attr_otk_count)) .expression_attribute_values( ":old_val", AttributeValue::N(otk_count.to_string()), ) .build() .expect( "table_name, key or update_expression not set in Update builder", ); let update_otk_count_operation = TransactWriteItem::builder() .update(update_otk_count) .build(); let transaction = self .client .transact_write_items() .set_transact_items(Some(vec![ delete_otk_operation, update_otk_count_operation, ])) .send() .await; match transaction { Ok(_) => return Ok((Some(otk_row.otk), requested_more_keys)), Err(e) => { info!("Error retrieving one-time key: {:?}", e); let dynamo_db_error = DynamoDBError::from(e); let retryable_codes = HashSet::from([ retry::CONDITIONAL_CHECK_FAILED, retry::TRANSACTION_CONFLICT, ]); if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { info!("Encountered transaction conflict while retrieving one-time key - retrying"); exponential_backoff.sleep_and_retry().await?; } else { error!( errorType = error_types::OTK_DB_LOG, "One-time key retrieval transaction failed: {:?}", dynamo_db_error ); return Err(Error::AwsSdk(dynamo_db_error)); } } } } } #[tracing::instrument(skip_all)] async fn get_one_time_keys( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, num_keys: Option, ) -> Result, Error> { use crate::constants::one_time_keys_table::*; let partition_key = create_one_time_key_partition_key(user_id, device_id, account_type); let mut query = self .client .query() .table_name(NAME) .key_condition_expression("#pk = :pk") .expression_attribute_names("#pk", PARTITION_KEY) .expression_attribute_values(":pk", AttributeValue::S(partition_key)); if let Some(limit) = num_keys { // DynamoDB will reject the `query` request if `limit < 1` if limit < 1 { return Ok(Vec::new()); } query = query.limit(limit as i32); } let otk_rows = query .send() .await .map_err(|e| { error!( errorType = error_types::OTK_DB_LOG, "DDB client failed to query OTK rows: {:?}", e ); Error::AwsSdk(e.into()) })? .items .unwrap_or_default() .into_iter() .map(OTKRow::try_from) .collect::, _>>() .map_err(Error::from)?; if let Some(limit) = num_keys { if otk_rows.len() != limit { warn!("There are fewer one-time keys than the number requested"); } } Ok(otk_rows) } #[tracing::instrument(skip_all)] pub async fn append_one_time_prekeys( &self, user_id: &str, device_id: &str, content_one_time_keys: &Vec, notif_one_time_keys: &Vec, ) -> Result<(), Error> { use crate::constants::retry; let num_content_keys_to_append = content_one_time_keys.len(); let num_notif_keys_to_append = notif_one_time_keys.len(); if num_content_keys_to_append > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT || num_notif_keys_to_append > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT { return Err(Error::OneTimeKeyUploadLimitExceeded); } if content_one_time_keys .iter() .any(|otk| !is_valid_olm_key(otk)) || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk)) { debug!("Invalid one-time key format"); return Err(Error::InvalidFormat); } let current_time = chrono::Utc::now(); let content_otk_requests = into_one_time_put_requests( user_id, device_id, content_one_time_keys, OlmAccountType::Content, current_time, ); let notif_otk_requests = into_one_time_put_requests( user_id, device_id, notif_one_time_keys, OlmAccountType::Notification, current_time, ); let current_content_otk_count = self .get_otk_count(user_id, device_id, OlmAccountType::Content) .await?; let current_notif_otk_count = self .get_otk_count(user_id, device_id, OlmAccountType::Notification) .await?; let num_content_keys_to_delete = (num_content_keys_to_append + current_content_otk_count) .saturating_sub(MAX_ONE_TIME_KEYS); let num_notif_keys_to_delete = (num_notif_keys_to_append + current_notif_otk_count) .saturating_sub(MAX_ONE_TIME_KEYS); let content_keys_to_delete = self .get_one_time_keys( user_id, device_id, OlmAccountType::Content, Some(num_content_keys_to_delete), ) .await?; let notif_keys_to_delete = self .get_one_time_keys( user_id, device_id, OlmAccountType::Notification, Some(num_notif_keys_to_delete), ) .await?; let update_and_delete_otk_count_operation = into_one_time_update_and_delete_requests( user_id, device_id, num_content_keys_to_append, num_notif_keys_to_append, content_keys_to_delete, notif_keys_to_delete, ); let mut operations = Vec::new(); operations.extend_from_slice(&content_otk_requests); operations.extend_from_slice(¬if_otk_requests); operations.extend_from_slice(&update_and_delete_otk_count_operation); let retry_config = ExponentialBackoffConfig { max_attempts: retry::MAX_ATTEMPTS as u32, ..Default::default() }; let mut exponential_backoff = retry_config.new_counter(); loop { let transaction = self .client .transact_write_items() .set_transact_items(Some(operations.clone())) .send() .await; match transaction { Ok(_) => break, Err(e) => { let dynamo_db_error = DynamoDBError::from(e); let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]); if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { info!("Encountered transaction conflict while uploading one-time keys - retrying"); exponential_backoff.sleep_and_retry().await?; } else { error!( errorType = error_types::OTK_DB_LOG, "One-time key upload transaction failed: {:?}", dynamo_db_error ); return Err(Error::AwsSdk(dynamo_db_error)); } } } } Ok(()) } #[tracing::instrument(skip_all)] async fn get_otk_count( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::devices_table; let attr_name = match account_type { OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, }; let response = self .client .get_item() .table_name(devices_table::NAME) .projection_expression(attr_name) .key( devices_table::ATTR_USER_ID, AttributeValue::S(user_id.to_string()), ) .key( devices_table::ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into(), ) .send() .await .map_err(|e| { error!( errorType = error_types::OTK_DB_LOG, "Failed to get user's OTK count: {:?}", e ); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); match parse_int_attribute(attr_name, user_item.remove(attr_name)) { Ok(num) => Ok(num), Err(DBItemError { attribute_error: DBItemAttributeError::Missing, .. }) => Ok(0), Err(e) => Err(Error::Attribute(e)), } } /// Deletes all data for a user's device from one-time keys table pub async fn delete_otks_table_rows_for_user_device( &self, user_id: &str, device_id: &str, ) -> Result<(), Error> { use crate::constants::one_time_keys_table::*; let content_otk_primary_keys = self .get_one_time_keys(user_id, device_id, OlmAccountType::Content, None) .await?; let notif_otk_primary_keys = self .get_one_time_keys(user_id, device_id, OlmAccountType::Notification, None) .await?; let delete_requests = content_otk_primary_keys .into_iter() .chain(notif_otk_primary_keys) .map(|otk_row| { let request = DeleteRequest::builder() .key(PARTITION_KEY, AttributeValue::S(otk_row.partition_key)) .key(SORT_KEY, AttributeValue::S(otk_row.sort_key)) .build() .expect("no keys set in DeleteRequest builder"); WriteRequest::builder().delete_request(request).build() }) .collect::>(); batch_write( &self.client, NAME, delete_requests, ExponentialBackoffConfig::default(), ) .await .map_err(Error::from)?; Ok(()) } /// Deletes all data for a user from one-time keys table pub async fn delete_otks_table_rows_for_user( &self, user_id: &str, ) -> Result<(), Error> { let maybe_device_list_row = self.get_current_device_list(user_id).await?; let Some(device_list_row) = maybe_device_list_row else { info!("No devices associated with user. Skipping one-time key removal."); return Ok(()); }; for device_id in device_list_row.device_ids { self .delete_otks_table_rows_for_user_device(user_id, &device_id) .await?; } Ok(()) } } pub struct OTKRow { pub partition_key: String, pub sort_key: String, pub otk: String, } impl OTKRow { pub fn as_delete_request(&self) -> TransactWriteItem { use crate::constants::one_time_keys_table as otk_table; let delete_otk = Delete::builder() .table_name(otk_table::NAME) .key( otk_table::PARTITION_KEY, AttributeValue::S(self.partition_key.to_string()), ) .key( otk_table::SORT_KEY, AttributeValue::S(self.sort_key.to_string()), ) .condition_expression("attribute_exists(#otk)") .expression_attribute_names("#otk", otk_table::ATTR_ONE_TIME_KEY) .build() .expect("table_name or key not set in Delete builder"); TransactWriteItem::builder().delete(delete_otk).build() } } impl TryFrom for OTKRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { use crate::constants::one_time_keys_table as otk_table; let partition_key = attrs.take_attr(otk_table::PARTITION_KEY)?; let sort_key = attrs.take_attr(otk_table::SORT_KEY)?; let otk: String = attrs.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; Ok(Self { partition_key, sort_key, otk, }) } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index 42a9f8d8a..c53bd55ef 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,990 +1,990 @@ use std::collections::{HashMap, HashSet}; +use crate::comm_service::{backup, tunnelbroker}; use crate::config::CONFIG; use crate::database::{DeviceListUpdate, PlatformDetails}; use crate::device_list::validation::DeviceListValidator; use crate::device_list::SignedDeviceList; use crate::error::consume_error; use crate::log::redact_sensitive_data; use crate::{ client_service::{handle_db_error, WorkflowInProgress}, constants::{error_types, request_metadata, tonic_status_messages}, database::DatabaseClient, grpc_services::shared::{get_platform_metadata, get_value}, }; use chrono::DateTime; use comm_lib::auth::AuthService; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, trace}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, PrimaryDeviceLogoutRequest, PrivilegedDeleteUsersRequest, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserDevicesPlatformDetails, UserIdentitiesRequest, UserIdentitiesResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, comm_auth_service: AuthService, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { trace!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { trace!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::MISSING_CREDENTIALS) })?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle.block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) })?; if !valid_token { return Err(Status::aborted(tonic_status_messages::BAD_CREDENTIALS)); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::USER_ID_MISSING) })?; let device_id = get_value(request, request_metadata::DEVICE_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::DEVICE_ID_MISSING) })?; Ok((user_id, device_id)) } fn spawn_delete_tunnelbroker_data_task(device_ids: Vec) { tokio::spawn(async move { debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids.as_slice() ); - let result = crate::tunnelbroker::delete_devices_data(&device_ids).await; + let result = tunnelbroker::delete_devices_data(&device_ids).await; consume_error(result); }); } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { #[tracing::instrument(skip_all)] async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_key = message.new_content_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_CONTENT_KEYS) })?; let notif_key = message.new_notif_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_NOTIF_KEYS) })?; self .db_client .update_device_prekeys( user_id, device_id, content_key.into(), notif_key.into(), ) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } #[tracing::instrument(skip_all)] async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identity(user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } #[tracing::instrument(skip_all)] async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identity(&message.user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await? else { return Err(Status::not_found( tonic_status_messages::KEYSERVER_NOT_FOUND, )); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } #[tracing::instrument(skip_all)] async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await?; Ok(tonic::Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let Some((username, password_file)) = self .db_client .get_username_and_password_file(&user_id) .await? else { return Err(tonic::Status::permission_denied( tonic_status_messages::WALLET_USER, )); }; let message = request.into_inner(); let mut server_login = comm_opaque2::server::Login::new(); let login_response = server_login .start( &CONFIG.server_setup, &password_file, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let registration_response = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdatePasswordInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(Box::new(update_state))) .await?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: registration_response, opaque_login_response: login_response, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(user_id, password_file) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self.db_client.remove_device(&user_id, &device_id).await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; let device_list = self .db_client .get_current_device_list(&user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition("no device list")); }; tokio::spawn(async move { debug!( "Sending device list updates to {:?}", device_list.device_ids ); let device_ids: Vec<&str> = device_list.device_ids.iter().map(AsRef::as_ref).collect(); - let result = - crate::tunnelbroker::send_device_list_update(&device_ids).await; + let result = tunnelbroker::send_device_list_update(&device_ids).await; consume_error(result); }); spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_primary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!( "Primary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; // Get and verify singleton device list let parsed_device_list: SignedDeviceList = message.signed_device_list.parse()?; let update_payload = DeviceListUpdate::try_from(parsed_device_list)?; crate::device_list::verify_singleton_device_list( &update_payload, &device_id, None, )?; self .db_client .apply_devicelist_update( &user_id, update_payload, // - We've already validated the list so no need to do it here. // - Need to pass the type because it cannot be inferred from None None::, // We don't want side effects - we'll take care of removing devices // on our own. (Side effect would skip the primary device). false, ) .await?; debug!(user_id, "Attempting to delete user's access tokens"); self.db_client.delete_all_tokens_for_user(&user_id).await?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self .db_client .delete_otks_table_rows_for_user(&user_id) .await?; debug!(user_id, "Attempting to delete user's devices"); let device_ids = self .db_client .delete_devices_data_for_user(&user_id) .await?; spawn_delete_tunnelbroker_data_task(device_ids); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_secondary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; debug!( "Secondary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Secondary, ) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); let user_is_password_authenticated = self .db_client .user_is_password_authenticated(&user_id) .await?; if user_is_password_authenticated { return Err(tonic::Status::permission_denied( tonic_status_messages::PASSWORD_USER, )); } self.delete_tunnelbroker_and_backup_data(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = DeletePasswordUserInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; self.delete_tunnelbroker_and_backup_data(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn privileged_delete_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { const STAFF_USER_IDS: [&str; 1] = ["256"]; let (user_id, _) = get_user_and_device_id(&request)?; if !STAFF_USER_IDS.contains(&user_id.as_str()) { return Err(Status::permission_denied( tonic_status_messages::USER_IS_NOT_STAFF, )); } for user_id_to_delete in request.into_inner().user_ids { self .delete_tunnelbroker_and_backup_data(&user_id_to_delete) .await?; self.db_client.delete_user(user_id_to_delete).await?; } let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::from_timestamp_millis(timestamp).ok_or_else(|| { tonic::Status::invalid_argument( tonic_status_messages::INVALID_TIMESTAMP, ) }) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(SignedDeviceList::try_from) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } #[tracing::instrument(skip_all)] async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); let request_count = user_ids.len(); let user_ids: HashSet = user_ids.into_iter().collect(); debug!( "Requesting device lists and platform details for {} users ({} unique)", request_count, user_ids.len() ); // 1. Fetch device lists let device_lists = self.db_client.get_current_device_lists(user_ids).await?; trace!("Found device lists for {} users", device_lists.keys().len()); // 2. Fetch platform details let flattened_user_device_ids: Vec<(String, String)> = device_lists .iter() .flat_map(|(user_id, device_list)| { device_list .device_ids .iter() .map(|device_id| (user_id.clone(), device_id.clone())) .collect::>() }) .collect(); let platform_details = self .db_client .get_devices_platform_details(flattened_user_device_ids) .await?; trace!( "Found platform details for {} users", platform_details.keys().len() ); // 3. Prepare output format let users_device_lists: HashMap = device_lists .into_iter() .map(|(user_id, device_list_row)| { let signed_list = SignedDeviceList::try_from(device_list_row)?; let serialized_list = signed_list.as_json_string()?; Ok((user_id, serialized_list)) }) .collect::>()?; let users_devices_platform_details = platform_details .into_iter() .map(|(user_id, devices_map)| { (user_id, UserDevicesPlatformDetails::from(devices_map)) }) .collect(); let response = PeersDeviceListsResponse { users_device_lists, users_devices_platform_details, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; let validator = crate::device_list::validation::update_device_list_rpc_validator; self .db_client .apply_devicelist_update(&user_id, update, Some(validator), true) .await?; Ok(Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await?; if get_farcaster_users_response.len() > 1 { error!( errorType = error_types::GRPC_SERVICES_LOG, "multiple users associated with the same Farcaster ID" ); return Err(Status::failed_precondition( tonic_status_messages::CANNOT_LINK_FID, )); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists(tonic_status_messages::FID_TAKEN)); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self.db_client.remove_farcaster_id(user_id).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn find_user_identities( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_ids: HashSet = message.user_ids.into_iter().collect(); let users_table_results = self .db_client .find_db_user_identities(user_ids.clone()) .await?; // Look up only user IDs that haven't been found in users table let reserved_user_ids_to_query: Vec = user_ids .into_iter() .filter(|user_id| !users_table_results.contains_key(user_id)) .collect(); let reserved_user_identifiers = self .db_client .query_reserved_usernames_by_user_ids(reserved_user_ids_to_query) .await?; let identities = users_table_results .into_iter() .map(|(user_id, identifier)| (user_id, identifier.into())) .collect(); let response = UserIdentitiesResponse { identities, reserved_user_identifiers, }; return Ok(Response::new(response)); } #[tracing::instrument(skip_all)] async fn sync_platform_details( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let platform_metadata = get_platform_metadata(&request)?; let platform_details = PlatformDetails::new(platform_metadata, None) .map_err(|_| { Status::invalid_argument( tonic_status_messages::INVALID_PLATFORM_METADATA, ) })?; self .db_client .update_device_platform_details(user_id, device_id, platform_details) .await?; Ok(Response::new(Empty {})) } } #[allow(dead_code)] enum DeviceListItemKind { Any, Primary, Secondary, } impl AuthenticatedService { async fn verify_device_on_device_list( &self, user_id: &String, device_id: &String, device_kind: DeviceListItemKind, ) -> Result<(), tonic::Status> { let device_list = self .db_client .get_current_device_list(user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, )); }; use DeviceListItemKind as DeviceKind; let device_on_list = match device_kind { DeviceKind::Any => device_list.has_device(device_id), DeviceKind::Primary => device_list.is_primary_device(device_id), DeviceKind::Secondary => device_list.has_secondary_device(device_id), }; if !device_on_list { debug!( "Device {} not in device list for user {}", device_id, user_id ); return Err(Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } Ok(()) } async fn delete_tunnelbroker_and_backup_data( &self, user_id: &str, ) -> Result<(), Status> { debug!("Attempting to delete Backup data for user: {}", &user_id); let (device_list_result, delete_backup_result) = tokio::join!( self.db_client.get_current_device_list(user_id), - crate::backup::delete_backup_user_data(user_id, &self.comm_auth_service) + backup::delete_backup_user_data(user_id, &self.comm_auth_service) ); let device_ids = device_list_result? .map(|list| list.device_ids) .unwrap_or_default(); delete_backup_result?; debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids ); - crate::tunnelbroker::delete_devices_data(&device_ids).await?; + tunnelbroker::delete_devices_data(&device_ids).await?; Ok(()) } } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct UpdatePasswordInfo { pub opaque_server_login: comm_opaque2::server::Login, } diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs index 3290d4dc5..a0df620f1 100644 --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -1,141 +1,144 @@ use comm_lib::auth::AuthService; use comm_lib::aws; use comm_lib::aws::config::timeout::TimeoutConfig; use comm_lib::aws::config::BehaviorVersion; use config::Command; use database::DatabaseClient; use tonic::transport::Server; use tonic_web::GrpcWebLayer; -mod backup; mod client_service; mod config; pub mod constants; mod cors; mod database; pub mod ddb_utils; mod device_list; pub mod error; mod grpc_services; mod grpc_utils; mod http; mod id; mod keygen; mod log; mod nonce; mod olm; mod regex; mod reserved_users; mod siwe; mod sync_identity_search; mod token; -mod tunnelbroker; mod websockets; +mod comm_service { + pub mod backup; + pub mod tunnelbroker; +} + use constants::{COMM_SERVICES_USE_JSON_LOGS, IDENTITY_SERVICE_SOCKET_ADDR}; use cors::cors_layer; use keygen::generate_and_persist_keypair; use std::env; use sync_identity_search::sync_index; use tokio::time::Duration; use tracing::{self, info, Level}; use tracing_subscriber::EnvFilter; use client_service::{ClientService, IdentityClientServiceServer}; use grpc_services::authenticated::AuthenticatedService; use grpc_services::protos::auth::identity_client_service_server::IdentityClientServiceServer as AuthServer; use websockets::errors::BoxedError; #[tokio::main] async fn main() -> Result<(), BoxedError> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(EnvFilter::DEFAULT_ENV) .from_env_lossy(); let use_json_logs: bool = env::var(COMM_SERVICES_USE_JSON_LOGS) .unwrap_or("false".to_string()) .parse() .unwrap_or_default(); if use_json_logs { let subscriber = tracing_subscriber::fmt() .json() .with_env_filter(filter) .finish(); tracing::subscriber::set_global_default(subscriber)?; } else { let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; } match config::parse_cli_command() { Command::Keygen { dir } => { generate_and_persist_keypair(dir)?; } Command::Server => { config::load_server_config(); let addr = IDENTITY_SERVICE_SOCKET_ADDR.parse()?; let aws_config = aws::config::defaults(BehaviorVersion::v2024_03_28()) .timeout_config( TimeoutConfig::builder() .connect_timeout(Duration::from_secs(60)) .build(), ) .region("us-east-2") .load() .await; let comm_auth_service = AuthService::new(&aws_config, "http://localhost:50054".to_string()); let database_client = DatabaseClient::new(&aws_config); let inner_client_service = ClientService::new(database_client.clone()); let client_service = IdentityClientServiceServer::with_interceptor( inner_client_service, grpc_services::shared::version_interceptor, ); let inner_auth_service = AuthenticatedService::new(database_client.clone(), comm_auth_service); let db_client = database_client.clone(); let auth_service = AuthServer::with_interceptor(inner_auth_service, move |req| { grpc_services::authenticated::auth_interceptor(req, &db_client) .and_then(grpc_services::shared::version_interceptor) }); info!("Listening to gRPC traffic on {}", addr); let grpc_server = Server::builder() .accept_http1(true) .layer(cors_layer()) .layer(GrpcWebLayer::new()) .trace_fn(|_| { tracing::info_span!( "grpc_request", request_id = uuid::Uuid::new_v4().to_string() ) }) .add_service(client_service) .add_service(auth_service) .serve(addr); let websocket_server = websockets::run_server(database_client); return tokio::select! { websocket_result = websocket_server => websocket_result, grpc_result = grpc_server => { grpc_result.map_err(|e| e.into()) }, }; } Command::SyncIdentitySearch => { let aws_config = aws::config::defaults(BehaviorVersion::v2024_03_28()) .region("us-east-2") .load() .await; let database_client = DatabaseClient::new(&aws_config); let sync_result = sync_index(&database_client).await; error::consume_error(sync_result); } } Ok(()) }