diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index 1530a8441..44d11d99e 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1557 +1,1543 @@ use comm_lib::aws::ddb::{ operation::{ delete_item::DeleteItemOutput, get_item::GetItemOutput, put_item::PutItemOutput, query::QueryOutput, }, primitives::Blob, types::{ AttributeValue, Delete, Put, PutRequest, TransactWriteItem, WriteRequest, }, }; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; use comm_lib::tools::IntoChunks; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; pub use crate::database::device_list::DeviceIDAttribute; pub use crate::database::one_time_keys::OTKRow; use crate::{ constants::{tonic_status_messages, RESERVED_USERNAMES_TABLE_USER_ID_INDEX}, ddb_utils::EthereumIdentity, device_list::SignedDeviceList, grpc_services::shared::PlatformMetadata, log::redact_sensitive_data, reserved_users::UserDetail, siwe::SocialProof, }; use crate::{ ddb_utils::{DBIdentity, OlmAccountType}, grpc_services::protos, }; use crate::{error::Error, grpc_utils::DeviceKeysInfo}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn, Instrument}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; -use crate::config::CONFIG; use crate::constants::{ error_types, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, USERS_TABLE_USERNAME_LOWER_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::AuthType; pub use grpc_clients::identity::DeviceType; mod device_list; mod farcaster; mod one_time_keys; mod token; mod workflows; pub use device_list::{ DeviceListRow, DeviceListUpdate, DeviceRow, PlatformDetails, }; use self::device_list::Prekey; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } pub struct DBDeviceTypeInt(pub i32); impl TryFrom for DeviceType { type Error = crate::error::Error; fn try_from(value: DBDeviceTypeInt) -> Result { let device_result = DeviceType::try_from(value.0); device_result.map_err(|_| { Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), attribute_value: Some(AttributeValue::N(value.0.to_string())).into(), attribute_error: DBItemAttributeError::InvalidValue, }) }) } } pub struct OutboundKeys { pub key_payload: String, pub key_payload_signature: String, pub content_prekey: Prekey, pub notif_prekey: Prekey, pub content_one_time_key: Option, pub notif_one_time_key: Option, } impl From for protos::auth::OutboundKeyInfo { fn from(db_keys: OutboundKeys) -> Self { use protos::unauth::IdentityKeyInfo; Self { identity_info: Some(IdentityKeyInfo { payload: db_keys.key_payload, payload_signature: db_keys.key_payload_signature, }), content_prekey: Some(db_keys.content_prekey.into()), notif_prekey: Some(db_keys.notif_prekey.into()), one_time_content_prekey: db_keys.content_one_time_key, one_time_notif_prekey: db_keys.notif_one_time_key, } } } pub struct UserInfoAndPasswordFile { pub user_id: String, pub original_username: String, pub password_file: Vec, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { - let client = match &CONFIG.localstack_endpoint { - Some(endpoint) => { - info!( - "Configuring DynamoDB client to use LocalStack endpoint: {}", - endpoint - ); - let ddb_config_builder = - comm_lib::aws::ddb::config::Builder::from(aws_config) - .endpoint_url(endpoint); - DynamoDBClient::from_conf(ddb_config_builder.build()) - } - None => DynamoDBClient::new(aws_config), - }; - + let client = DynamoDBClient::new(aws_config); DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, platform_details: PlatformMetadata, access_token_creation_time: DateTime, ) -> Result { let device_key_upload = registration_state.flattened_device_key_upload; let user_id = self .add_user_to_users_table( Some((registration_state.username, Blob::new(password_file))), None, registration_state.user_id, registration_state.farcaster_id, ) .await?; // When initial device list is present, we should apply it // instead of auto-creating one. if let Some(device_list) = registration_state.initial_device_list { let initial_device_list = DeviceListUpdate::try_from(device_list)?; self .register_primary_device( &user_id, device_key_upload.clone(), platform_details, access_token_creation_time, initial_device_list, ) .await?; } else { self .add_device( &user_id, device_key_upload.clone(), platform_details, access_token_creation_time, ) .await?; } self .append_one_time_prekeys( &user_id, &device_key_upload.device_id_key, &device_key_upload.content_one_time_keys, &device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } #[allow(clippy::too_many_arguments)] pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: SocialProof, user_id: Option, platform_metadata: PlatformMetadata, access_token_creation_time: DateTime, farcaster_id: Option, initial_device_list: Option, ) -> Result { let wallet_identity = EthereumIdentity { wallet_address: wallet_address.clone(), social_proof, }; let user_id = self .add_user_to_users_table( None, Some(wallet_identity), user_id, farcaster_id, ) .await?; // When initial device list is present, we should apply it // instead of auto-creating one. if let Some(device_list) = initial_device_list { let initial_device_list = DeviceListUpdate::try_from(device_list)?; self .register_primary_device( &user_id, flattened_device_key_upload.clone(), platform_metadata, access_token_creation_time, initial_device_list, ) .await?; } else { self .add_device( &user_id, flattened_device_key_upload.clone(), platform_metadata, access_token_creation_time, ) .await?; } self .append_one_time_prekeys( &user_id, &flattened_device_key_upload.device_id_key, &flattened_device_key_upload.content_one_time_keys, &flattened_device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } async fn add_user_to_users_table( &self, username_and_password_file: Option<(String, Blob)>, wallet_identity: Option, user_id: Option, farcaster_id: Option, ) -> Result { let user_id = user_id.unwrap_or_else(generate_uuid); let mut user = HashMap::from([( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), )]); if let Some((username, password_file)) = username_and_password_file.clone() { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username.clone()), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); user.insert( USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME.to_string(), AttributeValue::S(username.to_lowercase()), ); } if let Some(eth_identity) = wallet_identity.clone() { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(eth_identity.wallet_address), ); user.insert( USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), eth_identity.social_proof.into(), ); } if let Some(fid) = farcaster_id { user.insert( USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME.to_string(), AttributeValue::S(fid), ); } let put_user = Put::builder() .table_name(USERS_TABLE) .set_item(Some(user)) // make sure we don't accidentally overwrite existing row .condition_expression("attribute_not_exists(#pk)") .expression_attribute_names("#pk", USERS_TABLE_PARTITION_KEY) .build() .expect("key, update_expression or table_name not set in Update builder"); let put_user_operation = TransactWriteItem::builder().put(put_user).build(); let partition_key_value = match (username_and_password_file, wallet_identity) { (Some((username, _)), _) => username, (_, Some(ethereum_identity)) => ethereum_identity.wallet_address, _ => return Err(Error::MalformedItem), }; // We make sure to delete the user from the reserved usernames table when we // add them to the users table let delete_user_from_reserved_usernames = Delete::builder() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(partition_key_value), ) .build() .expect("key or table_name not set in Delete builder"); let delete_user_from_reserved_usernames_operation = TransactWriteItem::builder() .delete(delete_user_from_reserved_usernames) .build(); self .client .transact_write_items() .set_transact_items(Some(vec![ put_user_operation, delete_user_from_reserved_usernames_operation, ])) .send() .await .map_err(|e| { error!( errorType = error_types::GENERIC_DB_LOG, "Add user transaction failed: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(user_id) } pub async fn add_user_device( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, platform_metadata: PlatformMetadata, access_token_creation_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = flattened_device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_one_time_keys.clone(); // add device to the device list if not exists let device_id = flattened_device_key_upload.device_id_key.clone(); let device_exists = self .device_exists(user_id.clone(), device_id.clone()) .await?; if device_exists { self .update_device_login_time( user_id.clone(), device_id, access_token_creation_time, ) .await?; return Ok(()); } // add device to the new device list self .add_device( &user_id, flattened_device_key_upload, platform_metadata, access_token_creation_time, ) .await?; self .append_one_time_prekeys( &user_id, &device_id, &content_one_time_keys, ¬if_one_time_keys, ) .await?; Ok(()) } pub async fn update_wallet_user_social_proof( &self, user_id: &str, social_proof: SocialProof, ) -> Result<(), Error> { self .client .update_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.to_string()), ) .update_expression("SET #social_proof = :v") .condition_expression("attribute_exists(#social_proof)") .expression_attribute_names( "#social_proof", USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, ) .expression_attribute_values(":v", social_proof.into()) .send() .await .map_err(|e| { // ConditionalCheckFailedException means we're updating // non-wallet user (DB item without social proof) error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to update social proof: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn get_keyserver_keys_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device = user_devices .into_iter() .find(|device| *device.device_type() == GrpcDeviceType::Keyserver); let Some(keyserver) = maybe_keyserver_device else { return Ok(None); }; debug!( "Found keyserver in devices table (ID={})", &keyserver.device_id ); let (notif_one_time_key, requested_more_keys) = self .get_one_time_key( user_id, &keyserver.device_id, OlmAccountType::Notification, true, ) .await .unwrap_or_else(|e| { error!( errorType = error_types::OTK_DB_LOG, "Error retrieving notification one-time key: {:?}", e ); (None, true) }); let (content_one_time_key, _) = self .get_one_time_key( user_id, &keyserver.device_id, OlmAccountType::Content, !requested_more_keys, ) .await .unwrap_or_else(|e| { error!( errorType = error_types::OTK_DB_LOG, "Error retrieving content one-time key: {:?}", e ); (None, true) }); debug!( "Able to get notif one-time key for keyserver {}: {}", &keyserver.device_id, notif_one_time_key.is_some() ); debug!( "Able to get content one-time key for keyserver {}: {}", &keyserver.device_id, content_one_time_key.is_some() ); let outbound_payload = OutboundKeys { key_payload: keyserver.device_key_info.key_payload, key_payload_signature: keyserver.device_key_info.key_payload_signature, content_prekey: keyserver.content_prekey, notif_prekey: keyserver.notif_prekey, content_one_time_key, notif_one_time_key, }; Ok(Some(outbound_payload)) } pub async fn get_keyserver_device_id_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device_id = user_devices .into_iter() .find(|device| *device.device_type() == GrpcDeviceType::Keyserver) .map(|device| device.device_id); Ok(maybe_keyserver_device_id) } pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } /// Deletes all user data from DynamoDB. Returns device IDs /// from user's device list. #[tracing::instrument(skip_all)] pub async fn delete_user(&self, user_id: String) -> Result<(), Error> { // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self.delete_otks_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user's devices"); self.delete_devices_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user's access tokens"); self.delete_all_tokens_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user"); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to delete user {}", user_id ); Err(Error::AwsSdk(e.into())) } }?; Ok(()) } pub async fn wallet_address_taken( &self, wallet_address: String, ) -> Result { let result = self .get_user_id_from_user_info(wallet_address, &AuthType::Wallet) .await?; Ok(result.is_some()) } pub async fn username_taken(&self, username: String) -> Result { let username_lower = username.to_lowercase(); let request = self .client .query() .table_name(USERS_TABLE) .index_name(USERS_TABLE_USERNAME_LOWER_INDEX) .key_condition_expression("#username_lower = :username_lower") .expression_attribute_names( "#username_lower", USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, ) .expression_attribute_values( ":username_lower", AttributeValue::S(username_lower), ); let response = request.send().await.map_err(|e| { error!( errorType = error_types::GENERIC_DB_LOG, "Failed to query lowercase usernames by index: {:?}", e ); Error::AwsSdk(e.into()) })?; let username_available = response.items().is_empty(); Ok(!username_available) } pub async fn filter_out_taken_usernames( &self, user_details: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; let db_usernames_set: HashSet = db_usernames .into_iter() .map(|username| username.to_lowercase()) .collect(); let available_user_details: Vec = user_details .into_iter() .filter(|user_detail| { !db_usernames_set.contains(&user_detail.username.to_lowercase()) }) .collect(); Ok(available_user_details) } #[tracing::instrument(skip_all)] async fn get_user_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result>, Error> { let (index, attribute_name, attribute_value) = match auth_type { AuthType::Password => ( USERS_TABLE_USERNAME_LOWER_INDEX, USERS_TABLE_USERNAME_LOWER_ATTRIBUTE_NAME, user_info.to_lowercase(), ), AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, user_info.clone(), ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(attribute_value)) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: first_item .get(USERS_TABLE_PARTITION_KEY) .cloned() .into(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_keys_for_user( &self, user_id: &str, get_one_time_keys: bool, ) -> Result, Error> { let mut devices_response = self.get_keys_for_user_devices(user_id).await?; if devices_response.is_empty() { debug!("No devices found for user {}", user_id); return Ok(None); } if get_one_time_keys { for (device_id_key, device_keys) in devices_response.iter_mut() { let requested_more_keys; (device_keys.notif_one_time_key, requested_more_keys) = self .get_one_time_key( user_id, device_id_key, OlmAccountType::Notification, true, ) .await .unwrap_or_else(|e| { error!( errorType = error_types::OTK_DB_LOG, "Error retrieving notification one-time key: {:?}", e ); (None, true) }); (device_keys.content_one_time_key, _) = self .get_one_time_key( user_id, device_id_key, OlmAccountType::Content, !requested_more_keys, ) .await .unwrap_or_else(|e| { error!( errorType = error_types::OTK_DB_LOG, "Error retrieving content one-time key: {:?}", e ); (None, true) }); } } Ok(Some(devices_response)) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => user .take_attr(USERS_TABLE_PARTITION_KEY) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } #[tracing::instrument(skip_all)] pub async fn get_user_info_and_password_file_from_username( &self, username: &str, ) -> Result, Error> { match self .get_user_from_user_info(username.to_string(), &AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = user.take_attr(USERS_TABLE_PARTITION_KEY)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; let original_username = user.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE)?; Ok(Some(UserInfoAndPasswordFile { user_id, original_username, password_file, })) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_username_and_password_file( &self, user_id: &str, ) -> Result)>, Error> { let Some(mut user) = self.get_item_from_users_table(user_id).await?.item else { return Ok(None); }; let username = user.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((username, password_file))) } /// Returns an error if `user_id` does not exist in users table pub async fn user_is_password_authenticated( &self, user_id: &str, ) -> Result { let Some(user_item) = self.get_item_from_users_table(user_id).await?.item else { error!(errorType = error_types::GENERIC_DB_LOG, "user not found"); return Err(Error::MissingItem); }; Ok(user_item.contains_key(USERS_TABLE_REGISTRATION_ATTRIBUTE)) } async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn find_db_user_identities( &self, user_ids: impl IntoIterator, ) -> Result, Error> { use comm_lib::database::batch_operations::{ batch_get, ExponentialBackoffConfig, }; let primary_keys = user_ids.into_iter().map(|user_id| { create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id, )) }); let projection_expression = [ USERS_TABLE_PARTITION_KEY, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, ] .join(", "); debug!( num_requests = primary_keys.size_hint().0, "Attempting to batch get user identifiers" ); let responses = batch_get( &self.client, USERS_TABLE, primary_keys, Some(projection_expression), ExponentialBackoffConfig::default(), ) .await .map_err(Error::from)?; debug!("Found {} matching user identifiers in DDB", responses.len()); let mut results = HashMap::with_capacity(responses.len()); for response in responses { let user_id = response.get_attr(USERS_TABLE_PARTITION_KEY)?; // if this fails, it means that projection expression didnt have all attrs it needed let identity = DBIdentity::try_from(response)?; results.insert(user_id, identity); } Ok(results) } /// Retrieves username for password users or wallet address for wallet users /// Returns `None` if user not found #[tracing::instrument(skip_all)] pub async fn get_user_identity( &self, user_id: &str, ) -> Result, Error> { self .get_item_from_users_table(user_id) .await? .item .map(DBIdentity::try_from) .transpose() .map_err(|e| { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GENERIC_DB_LOG, "Database item is missing an identifier" ); e }) } /// Returns all usernames and wallet addresses from `identity-users` table async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression("#username, #walletAddress") .expression_attribute_names("#username", USERS_TABLE_USERNAME_ATTRIBUTE) .expression_attribute_names( "#walletAddress", USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(items) = scan_output.items { for mut item in items { if let Ok(username) = item.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE) { result.push(username); } else if let Ok(wallet_address) = item.take_attr(USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE) { result.push(wallet_address); } } } Ok(result) } pub async fn get_all_user_details(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression("#userID, #username, #walletAddress") .expression_attribute_names("#userID", USERS_TABLE_PARTITION_KEY) .expression_attribute_names("#username", USERS_TABLE_USERNAME_ATTRIBUTE) .expression_attribute_names( "#walletAddress", USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); let Some(items) = scan_output.items else { return Ok(result); }; for mut item in items { let Ok(user_id) = item.take_attr(USERS_TABLE_PARTITION_KEY) else { error!( errorType = error_types::GENERIC_DB_LOG, "Partition key missing for item" ); continue; }; if let Ok(username) = item.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE) { result.push(UserDetail { username, user_id }); } else if let Ok(wallet_address) = item.take_attr(USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE) { result.push(UserDetail { username: wallet_address, user_id, }) } } Ok(result) } pub async fn get_all_reserved_user_details( &self, ) -> Result, Error> { let scan_output = self .client .scan() .table_name(RESERVED_USERNAMES_TABLE) .projection_expression(format!( "{RESERVED_USERNAMES_TABLE_PARTITION_KEY},\ {RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE}" )) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let (Ok(username), Ok(user_id)) = ( attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), attribute.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE), ) { result.push(UserDetail { username, user_id }); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = item.take_attr(NONCE_TABLE_PARTITION_KEY)?; let created = DateTime::::try_from_attr( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(NONCE_TABLE_CREATED_ATTRIBUTE), )?; let expiration_time = DateTime::::try_from_attr( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, user_details: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for user_chunk in user_details.chunks(25) { let write_requests = user_chunk .iter() .map(|user_detail| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(user_detail.username.to_string()), ) .item( RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, AttributeValue::S(user_detail.user_id.to_string()), ) .item( RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, AttributeValue::S(user_detail.username.to_lowercase()), ) .build() .expect("no items set in PutRequest builder"); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } #[tracing::instrument(skip_all)] pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!(errorType = error_types::GENERIC_DB_LOG, "DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn get_user_id_from_reserved_usernames_table( &self, username: &str, ) -> Result, Error> { self .query_reserved_usernames_table( username, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, ) .await } pub async fn get_original_username_from_reserved_usernames_table( &self, username: &str, ) -> Result, Error> { self .query_reserved_usernames_table( username, RESERVED_USERNAMES_TABLE_PARTITION_KEY, ) .await } async fn query_reserved_usernames_table( &self, username: &str, attribute: &str, ) -> Result, Error> { let username_lower = username.to_lowercase(); self .query_reserved_usernames_table_index( &username_lower, ( RESERVED_USERNAMES_TABLE_USERNAME_LOWER_INDEX, RESERVED_USERNAMES_TABLE_USERNAME_LOWER_ATTRIBUTE, ), attribute, ) .await } #[tracing::instrument(skip_all)] pub async fn query_reserved_usernames_by_user_ids( &self, user_ids: Vec, ) -> Result, Error> { debug!("Querying for {} reserved usernames", user_ids.len()); const NUM_CONCURRENT_TASKS: usize = 16; let mut tasks = tokio::task::JoinSet::new(); let mut results = HashMap::with_capacity(user_ids.len()); for local_user_ids in user_ids.into_n_chunks(NUM_CONCURRENT_TASKS) { let db = self.clone(); let task = async move { let mut local_results = HashMap::new(); for user_id in local_user_ids { let query_result = db .query_reserved_usernames_table_index( &user_id, ( RESERVED_USERNAMES_TABLE_USER_ID_INDEX, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, ), RESERVED_USERNAMES_TABLE_PARTITION_KEY, ) .await?; if let Some(username) = query_result { local_results.insert(user_id, username); } } Ok::<_, Error>(local_results) }; tasks.spawn(task.in_current_span()); } while let Some(result) = tasks.join_next().await { match result { Ok(Ok(task_result)) => { results.extend(task_result); } Ok(Err(query_error)) => { error!( errorType = error_types::GENERIC_DB_LOG, "Failed to query reserved usernames by userID: {:?}", query_error ); tasks.abort_all(); return Err(query_error); } Err(join_error) => { error!( errorType = error_types::GENERIC_DB_LOG, "Failed to join task: {:?}", join_error ); tasks.abort_all(); return Err(Error::Status(tonic::Status::aborted( tonic_status_messages::UNEXPECTED_ERROR, ))); } } } Ok(results) } async fn query_reserved_usernames_table_index( &self, key_value: impl Into, // tuple of (index name, key attribute) index_and_key: (&'static str, &'static str), attribute: &str, ) -> Result, Error> { let (index, key_attr) = index_and_key; let response = self .client .query() .table_name(RESERVED_USERNAMES_TABLE) .index_name(index) .key_condition_expression("#key_name = :key_value") .expression_attribute_names("#key_name", key_attr) .expression_attribute_values( ":key_value", AttributeValue::S(key_value.into()), ) .limit(1) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let QueryOutput { items: Some(mut results), .. } = response else { return Ok(None); }; let result = results .pop() .map(|mut attrs| attrs.take_attr::(attribute)) .transpose()?; Ok(result) } } type AttributeName = String; type Devices = HashMap; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } #[deprecated(note = "Use `comm_lib` counterpart instead")] #[allow(dead_code)] fn parse_map_attribute( attribute_name: &str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", errorType = error_types::GENERIC_DB_LOG, "Unexpected attribute type when parsing map attribute" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", errorType = error_types::GENERIC_DB_LOG, "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::Missing, )) } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } #[test] fn test_int_to_device_type() { let valid_result = DeviceType::try_from(3); assert!(valid_result.is_ok()); assert_eq!(valid_result.unwrap(), DeviceType::Android); let invalid_result = DeviceType::try_from(6); assert!(invalid_result.is_err()); } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index c53bd55ef..750851d00 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,990 +1,992 @@ use std::collections::{HashMap, HashSet}; use crate::comm_service::{backup, tunnelbroker}; use crate::config::CONFIG; use crate::database::{DeviceListUpdate, PlatformDetails}; use crate::device_list::validation::DeviceListValidator; use crate::device_list::SignedDeviceList; use crate::error::consume_error; use crate::log::redact_sensitive_data; use crate::{ client_service::{handle_db_error, WorkflowInProgress}, constants::{error_types, request_metadata, tonic_status_messages}, database::DatabaseClient, grpc_services::shared::{get_platform_metadata, get_value}, }; use chrono::DateTime; use comm_lib::auth::AuthService; +use comm_lib::blob::client::BlobServiceClient; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, trace}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, PrimaryDeviceLogoutRequest, PrivilegedDeleteUsersRequest, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserDevicesPlatformDetails, UserIdentitiesRequest, UserIdentitiesResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, + blob_client: BlobServiceClient, comm_auth_service: AuthService, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { trace!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { trace!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::MISSING_CREDENTIALS) })?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle.block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) })?; if !valid_token { return Err(Status::aborted(tonic_status_messages::BAD_CREDENTIALS)); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::USER_ID_MISSING) })?; let device_id = get_value(request, request_metadata::DEVICE_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::DEVICE_ID_MISSING) })?; Ok((user_id, device_id)) } fn spawn_delete_tunnelbroker_data_task(device_ids: Vec) { tokio::spawn(async move { debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids.as_slice() ); let result = tunnelbroker::delete_devices_data(&device_ids).await; consume_error(result); }); } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { #[tracing::instrument(skip_all)] async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_key = message.new_content_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_CONTENT_KEYS) })?; let notif_key = message.new_notif_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_NOTIF_KEYS) })?; self .db_client .update_device_prekeys( user_id, device_id, content_key.into(), notif_key.into(), ) .await?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } #[tracing::instrument(skip_all)] async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identity(user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } #[tracing::instrument(skip_all)] async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identity(&message.user_id) .await? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await? else { return Err(Status::not_found( tonic_status_messages::KEYSERVER_NOT_FOUND, )); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } #[tracing::instrument(skip_all)] async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await?; Ok(tonic::Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let Some((username, password_file)) = self .db_client .get_username_and_password_file(&user_id) .await? else { return Err(tonic::Status::permission_denied( tonic_status_messages::WALLET_USER, )); }; let message = request.into_inner(); let mut server_login = comm_opaque2::server::Login::new(); let login_response = server_login .start( &CONFIG.server_setup, &password_file, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let registration_response = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdatePasswordInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(Box::new(update_state))) .await?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: registration_response, opaque_login_response: login_response, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(user_id, password_file) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self.db_client.remove_device(&user_id, &device_id).await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; let device_list = self .db_client .get_current_device_list(&user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition("no device list")); }; tokio::spawn(async move { debug!( "Sending device list updates to {:?}", device_list.device_ids ); let device_ids: Vec<&str> = device_list.device_ids.iter().map(AsRef::as_ref).collect(); let result = tunnelbroker::send_device_list_update(&device_ids).await; consume_error(result); }); spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_primary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!( "Primary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; // Get and verify singleton device list let parsed_device_list: SignedDeviceList = message.signed_device_list.parse()?; let update_payload = DeviceListUpdate::try_from(parsed_device_list)?; crate::device_list::verify_singleton_device_list( &update_payload, &device_id, None, )?; self .db_client .apply_devicelist_update( &user_id, update_payload, // - We've already validated the list so no need to do it here. // - Need to pass the type because it cannot be inferred from None None::, // We don't want side effects - we'll take care of removing devices // on our own. (Side effect would skip the primary device). false, ) .await?; debug!(user_id, "Attempting to delete user's access tokens"); self.db_client.delete_all_tokens_for_user(&user_id).await?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self .db_client .delete_otks_table_rows_for_user(&user_id) .await?; debug!(user_id, "Attempting to delete user's devices"); let device_ids = self .db_client .delete_devices_data_for_user(&user_id) .await?; spawn_delete_tunnelbroker_data_task(device_ids); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_secondary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; debug!( "Secondary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Secondary, ) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await?; spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); let user_is_password_authenticated = self .db_client .user_is_password_authenticated(&user_id) .await?; if user_is_password_authenticated { return Err(tonic::Status::permission_denied( tonic_status_messages::PASSWORD_USER, )); } self.delete_tunnelbroker_and_backup_data(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = DeletePasswordUserInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self.db_client.get_workflow(message.session_id).await? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; self.delete_tunnelbroker_and_backup_data(&user_id).await?; self.db_client.delete_user(user_id.clone()).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn privileged_delete_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { const STAFF_USER_IDS: [&str; 1] = ["256"]; let (user_id, _) = get_user_and_device_id(&request)?; if !STAFF_USER_IDS.contains(&user_id.as_str()) { return Err(Status::permission_denied( tonic_status_messages::USER_IS_NOT_STAFF, )); } for user_id_to_delete in request.into_inner().user_ids { self .delete_tunnelbroker_and_backup_data(&user_id_to_delete) .await?; self.db_client.delete_user(user_id_to_delete).await?; } let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::from_timestamp_millis(timestamp).ok_or_else(|| { tonic::Status::invalid_argument( tonic_status_messages::INVALID_TIMESTAMP, ) }) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(SignedDeviceList::try_from) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } #[tracing::instrument(skip_all)] async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); let request_count = user_ids.len(); let user_ids: HashSet = user_ids.into_iter().collect(); debug!( "Requesting device lists and platform details for {} users ({} unique)", request_count, user_ids.len() ); // 1. Fetch device lists let device_lists = self.db_client.get_current_device_lists(user_ids).await?; trace!("Found device lists for {} users", device_lists.keys().len()); // 2. Fetch platform details let flattened_user_device_ids: Vec<(String, String)> = device_lists .iter() .flat_map(|(user_id, device_list)| { device_list .device_ids .iter() .map(|device_id| (user_id.clone(), device_id.clone())) .collect::>() }) .collect(); let platform_details = self .db_client .get_devices_platform_details(flattened_user_device_ids) .await?; trace!( "Found platform details for {} users", platform_details.keys().len() ); // 3. Prepare output format let users_device_lists: HashMap = device_lists .into_iter() .map(|(user_id, device_list_row)| { let signed_list = SignedDeviceList::try_from(device_list_row)?; let serialized_list = signed_list.as_json_string()?; Ok((user_id, serialized_list)) }) .collect::>()?; let users_devices_platform_details = platform_details .into_iter() .map(|(user_id, devices_map)| { (user_id, UserDevicesPlatformDetails::from(devices_map)) }) .collect(); let response = PeersDeviceListsResponse { users_device_lists, users_devices_platform_details, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; let validator = crate::device_list::validation::update_device_list_rpc_validator; self .db_client .apply_devicelist_update(&user_id, update, Some(validator), true) .await?; Ok(Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await?; if get_farcaster_users_response.len() > 1 { error!( errorType = error_types::GRPC_SERVICES_LOG, "multiple users associated with the same Farcaster ID" ); return Err(Status::failed_precondition( tonic_status_messages::CANNOT_LINK_FID, )); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists(tonic_status_messages::FID_TAKEN)); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self.db_client.remove_farcaster_id(user_id).await?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn find_user_identities( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_ids: HashSet = message.user_ids.into_iter().collect(); let users_table_results = self .db_client .find_db_user_identities(user_ids.clone()) .await?; // Look up only user IDs that haven't been found in users table let reserved_user_ids_to_query: Vec = user_ids .into_iter() .filter(|user_id| !users_table_results.contains_key(user_id)) .collect(); let reserved_user_identifiers = self .db_client .query_reserved_usernames_by_user_ids(reserved_user_ids_to_query) .await?; let identities = users_table_results .into_iter() .map(|(user_id, identifier)| (user_id, identifier.into())) .collect(); let response = UserIdentitiesResponse { identities, reserved_user_identifiers, }; return Ok(Response::new(response)); } #[tracing::instrument(skip_all)] async fn sync_platform_details( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let platform_metadata = get_platform_metadata(&request)?; let platform_details = PlatformDetails::new(platform_metadata, None) .map_err(|_| { Status::invalid_argument( tonic_status_messages::INVALID_PLATFORM_METADATA, ) })?; self .db_client .update_device_platform_details(user_id, device_id, platform_details) .await?; Ok(Response::new(Empty {})) } } #[allow(dead_code)] enum DeviceListItemKind { Any, Primary, Secondary, } impl AuthenticatedService { async fn verify_device_on_device_list( &self, user_id: &String, device_id: &String, device_kind: DeviceListItemKind, ) -> Result<(), tonic::Status> { let device_list = self .db_client .get_current_device_list(user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, )); }; use DeviceListItemKind as DeviceKind; let device_on_list = match device_kind { DeviceKind::Any => device_list.has_device(device_id), DeviceKind::Primary => device_list.is_primary_device(device_id), DeviceKind::Secondary => device_list.has_secondary_device(device_id), }; if !device_on_list { debug!( "Device {} not in device list for user {}", device_id, user_id ); return Err(Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } Ok(()) } async fn delete_tunnelbroker_and_backup_data( &self, user_id: &str, ) -> Result<(), Status> { debug!("Attempting to delete Backup data for user: {}", &user_id); let (device_list_result, delete_backup_result) = tokio::join!( self.db_client.get_current_device_list(user_id), backup::delete_backup_user_data(user_id, &self.comm_auth_service) ); let device_ids = device_list_result? .map(|list| list.device_ids) .unwrap_or_default(); delete_backup_result?; debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids ); tunnelbroker::delete_devices_data(&device_ids).await?; Ok(()) } } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct UpdatePasswordInfo { pub opaque_server_login: comm_opaque2::server::Login, } diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs index b6aa3ba54..45e664fc1 100644 --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -1,145 +1,160 @@ use comm_lib::auth::AuthService; -use comm_lib::aws; use comm_lib::aws::config::timeout::TimeoutConfig; use comm_lib::aws::config::BehaviorVersion; +use comm_lib::aws::{self, AwsConfig}; +use comm_lib::blob::client::BlobServiceClient; use config::Command; use database::DatabaseClient; use tonic::transport::Server; use tonic_web::GrpcWebLayer; mod client_service; mod config; pub mod constants; mod cors; mod database; pub mod ddb_utils; mod device_list; pub mod error; mod grpc_services; mod grpc_utils; mod http; mod id; mod keygen; mod log; mod nonce; mod olm; mod regex; mod reserved_users; mod siwe; mod sync_identity_search; mod token; mod websockets; mod comm_service { pub mod backup; pub mod blob; pub mod tunnelbroker; } use constants::{COMM_SERVICES_USE_JSON_LOGS, IDENTITY_SERVICE_SOCKET_ADDR}; use cors::cors_layer; use keygen::generate_and_persist_keypair; use std::env; use sync_identity_search::sync_index; use tokio::time::Duration; use tracing::{self, info, Level}; use tracing_subscriber::EnvFilter; use client_service::{ClientService, IdentityClientServiceServer}; use grpc_services::authenticated::AuthenticatedService; use grpc_services::protos::auth::identity_client_service_server::IdentityClientServiceServer as AuthServer; use websockets::errors::BoxedError; +async fn load_aws_config() -> AwsConfig { + let mut config_builder = + comm_lib::aws::config::defaults(BehaviorVersion::v2024_03_28()) + .timeout_config( + TimeoutConfig::builder() + .connect_timeout(Duration::from_secs(60)) + .build(), + ) + .region("us-east-2"); + + if let Some(endpoint) = &config::CONFIG.localstack_endpoint { + info!("Using Localstack. AWS endpoint URL: {}", endpoint); + config_builder = config_builder.endpoint_url(endpoint); + } + + config_builder.load().await +} + #[tokio::main] async fn main() -> Result<(), BoxedError> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(EnvFilter::DEFAULT_ENV) .from_env_lossy(); let use_json_logs: bool = env::var(COMM_SERVICES_USE_JSON_LOGS) .unwrap_or("false".to_string()) .parse() .unwrap_or_default(); if use_json_logs { let subscriber = tracing_subscriber::fmt() .json() .with_env_filter(filter) .finish(); tracing::subscriber::set_global_default(subscriber)?; } else { let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; } match config::parse_cli_command() { Command::Keygen { dir } => { generate_and_persist_keypair(dir)?; } Command::Server => { - config::load_server_config(); + let cfg = config::load_server_config(); let addr = IDENTITY_SERVICE_SOCKET_ADDR.parse()?; - let aws_config = aws::config::defaults(BehaviorVersion::v2024_03_28()) - .timeout_config( - TimeoutConfig::builder() - .connect_timeout(Duration::from_secs(60)) - .build(), - ) - .region("us-east-2") - .load() - .await; + let aws_config = load_aws_config().await; let comm_auth_service = AuthService::new(&aws_config, "http://localhost:50054".to_string()); + let blob_client = BlobServiceClient::new(cfg.blob_service_url.to_owned()); let database_client = DatabaseClient::new(&aws_config); let inner_client_service = ClientService::new(database_client.clone()); let client_service = IdentityClientServiceServer::with_interceptor( inner_client_service, grpc_services::shared::version_interceptor, ); - let inner_auth_service = - AuthenticatedService::new(database_client.clone(), comm_auth_service); + let inner_auth_service = AuthenticatedService::new( + database_client.clone(), + blob_client, + comm_auth_service, + ); let db_client = database_client.clone(); let auth_service = AuthServer::with_interceptor(inner_auth_service, move |req| { grpc_services::authenticated::auth_interceptor(req, &db_client) .and_then(grpc_services::shared::version_interceptor) }); info!("Listening to gRPC traffic on {}", addr); let grpc_server = Server::builder() .accept_http1(true) .layer(cors_layer()) .layer(GrpcWebLayer::new()) .trace_fn(|_| { tracing::info_span!( "grpc_request", request_id = uuid::Uuid::new_v4().to_string() ) }) .add_service(client_service) .add_service(auth_service) .serve(addr); let websocket_server = websockets::run_server(database_client); return tokio::select! { websocket_result = websocket_server => websocket_result, grpc_result = grpc_server => { grpc_result.map_err(|e| e.into()) }, }; } Command::SyncIdentitySearch => { let aws_config = aws::config::defaults(BehaviorVersion::v2024_03_28()) .region("us-east-2") .load() .await; let database_client = DatabaseClient::new(&aws_config); let sync_result = sync_index(&database_client).await; error::consume_error(sync_result); } } Ok(()) }