diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs index 346572580..cb0100d28 100644 --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,88 +1,88 @@ use commtest::identity::device::{ register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use commtest::tunnelbroker::socket::{create_socket, receive_message}; use futures_util::StreamExt; use grpc_clients::identity::get_auth_client; use grpc_clients::identity::protos::authenticated::{ OutboundKeysForUserRequest, UploadOneTimeKeysRequest, }; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn test_tunnelbroker_invalid_auth() { let mut device_info = register_user_device(None, None).await; device_info.access_token = "".to_string(); let socket = create_socket(&device_info).await; assert!(matches!(socket, Result::Err(_))) } #[tokio::test] async fn test_tunnelbroker_valid_auth() { let device_info = register_user_device(None, None).await; let mut socket = create_socket(&device_info).await.unwrap(); socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { let identity_grpc_endpoint = service_addr::IDENTITY_GRPC.to_string(); let device_info = register_user_device(None, None).await; // Request outbound keys, which should trigger identity service to ask for more keys let mut client = get_auth_client( &identity_grpc_endpoint, device_info.user_id.clone(), device_info.device_id, device_info.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let upload_request = UploadOneTimeKeysRequest { content_one_time_prekeys: vec!["content1".to_string()], notif_one_time_prekeys: vec!["notif1".to_string()], }; client.upload_one_time_keys(upload_request).await.unwrap(); let keyserver_request = OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); let _first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await - .expect("Second keyserver keys request failed") + .expect("keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The current threshold is 5, but we only upload two. Should receive request // from Tunnelbroker to refresh keys // Create session as a keyserver let device_info = register_user_device(None, None).await; let mut socket = create_socket(&device_info).await.unwrap(); for _ in 0..2 { let response = receive_message(&mut socket).await.unwrap(); let serialized_response: RefreshKeyRequest = serde_json::from_str(&response).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; assert_eq!(serialized_response, expected_response); } } diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs index 5c2da0c78..23c9db034 100644 --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -1,241 +1,251 @@ use tokio::time::Duration; // Secrets pub const SECRETS_DIRECTORY: &str = "secrets"; pub const SECRETS_SETUP_FILE: &str = "server_setup.txt"; // DynamoDB // User table information, supporting opaque_ke 2.0 and X3DH information // Users can sign in either through username+password or Eth wallet. // // This structure should be aligned with the messages defined in // shared/protos/identity_unauthenticated.proto // // Structure for a user should be: // { // userID: String, // opaqueRegistrationData: Option, // username: Option, // walletAddress: Option, // devices: HashMap // } // // A device is defined as: // { // deviceType: String, # client or keyserver // keyPayload: String, // keyPayloadSignature: String, // identityPreKey: String, // identityPreKeySignature: String, // identityOneTimeKeys: Vec, // notifPreKey: String, // notifPreKeySignature: String, // notifOneTimeKeys: Vec, // socialProof: Option // } // } // // Additional context: // "devices" uses the signing public identity key of the device as a key for the devices map // "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification) // if "deviceType" == "keyserver", then the device will not have any notif key information pub const USERS_TABLE: &str = "identity-users"; pub const USERS_TABLE_PARTITION_KEY: &str = "userID"; pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData"; pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username"; pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str = "deviceType"; pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress"; pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof"; pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str = "deviceListTimestamp"; pub const USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME: &str = "farcasterID"; pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index"; pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index"; pub const USERS_TABLE_FARCASTER_ID_INDEX: &str = "farcasterID-index"; pub const ACCESS_TOKEN_TABLE: &str = "identity-tokens"; pub const ACCESS_TOKEN_TABLE_PARTITION_KEY: &str = "userID"; pub const ACCESS_TOKEN_SORT_KEY: &str = "signingPublicKey"; pub const ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE: &str = "authType"; pub const ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE: &str = "valid"; pub const ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE: &str = "token"; pub const NONCE_TABLE: &str = "identity-nonces"; pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce"; pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime"; pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; pub const WORKFLOWS_IN_PROGRESS_TABLE: &str = "identity-workflows-in-progress"; pub const WORKFLOWS_IN_PROGRESS_PARTITION_KEY: &str = "id"; pub const WORKFLOWS_IN_PROGRESS_WORKFLOW_ATTRIBUTE: &str = "workflow"; pub const WORKFLOWS_IN_PROGRESS_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; // Usernames reserved because they exist in Ashoat's keyserver already pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames"; pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username"; pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID"; // Users table social proof attribute pub const SOCIAL_PROOF_MESSAGE_ATTRIBUTE: &str = "siweMessage"; pub const SOCIAL_PROOF_SIGNATURE_ATTRIBUTE: &str = "siweSignature"; pub mod devices_table { /// table name pub const NAME: &str = "identity-devices"; pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index"; /// partition key pub const ATTR_USER_ID: &str = "userID"; /// sort key pub const ATTR_ITEM_ID: &str = "itemID"; // itemID prefixes (one shouldn't be a prefix of the other) pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-"; pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-"; // device-specific attrs pub const ATTR_DEVICE_TYPE: &str = "deviceType"; pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo"; pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey"; pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey"; // IdentityKeyInfo constants pub const ATTR_KEY_PAYLOAD: &str = "keyPayload"; pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature"; // PreKey constants pub const ATTR_PREKEY: &str = "preKey"; pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature"; // device-list-specific attrs pub const ATTR_TIMESTAMP: &str = "timestamp"; pub const ATTR_DEVICE_IDS: &str = "deviceIDs"; // migration-specific attrs pub const ATTR_CODE_VERSION: &str = "codeVersion"; pub const ATTR_LOGIN_TIME: &str = "loginTime"; + + // one-time key constants + pub const ATTR_CONTENT_OTK_COUNT: &str = "contentOTKCount"; + pub const ATTR_NOTIF_OTK_COUNT: &str = "notifOTKCount"; } // One time keys table, which need to exist in their own table to ensure // atomicity of additions and removals pub mod one_time_keys_table { - // The `PARTITION_KEY` will contain "notification_${deviceID}" or - // "content_${deviceID}" to allow for both key sets to coexist in the same table pub const NAME: &str = "identity-one-time-keys"; - pub const PARTITION_KEY: &str = "deviceID"; - pub const DEVICE_ID: &str = PARTITION_KEY; - pub const SORT_KEY: &str = "oneTimeKey"; - pub const ONE_TIME_KEY: &str = SORT_KEY; + pub const PARTITION_KEY: &str = "userID#deviceID#olmAccount"; + pub const SORT_KEY: &str = "timestamp#keyNumber"; + pub const ATTR_ONE_TIME_KEY: &str = "oneTimeKey"; } -// One-time key constants for device info map -pub const CONTENT_ONE_TIME_KEY: &str = "contentOneTimeKey"; -pub const NOTIF_ONE_TIME_KEY: &str = "notifOneTimeKey"; - // Tokio pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054"; pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004"; pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); // Token pub const ACCESS_TOKEN_LENGTH: usize = 512; // Temporary config pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN"; pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY"; // Nonce pub const NONCE_LENGTH: usize = 17; pub const NONCE_TTL_DURATION: Duration = Duration::from_secs(120); // seconds // Workflows in progress pub const WORKFLOWS_IN_PROGRESS_TTL_DURATION: Duration = Duration::from_secs(120); // Identity pub const DEFAULT_IDENTITY_ENDPOINT: &str = "http://localhost:50054"; // LocalStack pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; // OPAQUE Server Setup pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP"; // Identity Search pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT"; pub const DEFAULT_OPENSEARCH_ENDPOINT: &str = "identity-search-domain.us-east-2.opensearch.localhost.localstack.cloud:4566"; pub const IDENTITY_SEARCH_INDEX: &str = "users"; pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20; // Tunnelbroker pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT"; pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051"; // X3DH key management // Threshold for requesting more one_time keys pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5; // Number of keys to be refreshed when below the threshold pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5; // Minimum supported code versions pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270; // Request metadata pub mod request_metadata { pub const CODE_VERSION: &str = "code_version"; pub const DEVICE_TYPE: &str = "device_type"; pub const USER_ID: &str = "user_id"; pub const DEVICE_ID: &str = "device_id"; pub const ACCESS_TOKEN: &str = "access_token"; } // CORS pub mod cors { use std::time::Duration; pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] = ["grpc-status", "grpc-message", "grpc-status-details-bin"]; pub const DEFAULT_ALLOW_HEADERS: [&str; 9] = [ "x-grpc-web", "content-type", "x-user-agent", "grpc-timeout", super::request_metadata::CODE_VERSION, super::request_metadata::DEVICE_TYPE, super::request_metadata::USER_ID, super::request_metadata::DEVICE_ID, super::request_metadata::ACCESS_TOKEN, ]; pub const ALLOW_ORIGIN_LIST: &str = "ALLOW_ORIGIN_LIST"; } // Regex pub const VALID_USERNAME_REGEX_STRING: &str = r"^[a-zA-Z0-9][a-zA-Z0-9-_]{0,190}$"; + +// Retry + +// TODO: Replace this with `ExponentialBackoffConfig` from `comm-lib` +pub mod retry { + pub const MAX_ATTEMPTS: usize = 8; + + pub const CONDITIONAL_CHECK_FAILED: &str = "ConditionalCheckFailed"; + pub const TRANSACTION_CONFLICT: &str = "TransactionConflict"; +} + +// One-time keys +pub const ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT: usize = 49; diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index fefbfc9e2..5517bb945 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1432 +1,1595 @@ -use comm_lib::aws::ddb::{ - operation::{ - delete_item::DeleteItemOutput, get_item::GetItemOutput, - put_item::PutItemOutput, query::QueryOutput, - }, - primitives::Blob, - types::{AttributeValue, PutRequest, ReturnConsumedCapacity, WriteRequest}, -}; +use comm_lib::aws::ddb::types::Delete; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; +use comm_lib::{ + aws::{ + ddb::{ + operation::{ + delete_item::DeleteItemOutput, get_item::GetItemOutput, + put_item::PutItemOutput, query::QueryOutput, + }, + primitives::Blob, + types::{ + AttributeValue, PutRequest, TransactWriteItem, Update, WriteRequest, + }, + }, + DynamoDBError, + }, + database::parse_int_attribute, +}; use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; +pub use crate::database::device_list::DeviceIDAttribute; +use crate::ddb_utils::into_one_time_update_requests; use crate::{ constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, ddb_utils::EthereumIdentity, reserved_users::UserDetail, siwe::SocialProof, }; use crate::{ ddb_utils::{ - create_one_time_key_partition_key, into_one_time_put_requests, Identifier, - OlmAccountType, + create_one_time_key_partition_key, into_one_time_put_requests, + is_transaction_retryable, Identifier, OlmAccountType, }, grpc_services::protos, }; use crate::{ error::{consume_error, Error}, grpc_utils::DeviceKeysInfo, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, - RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, + ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, RESERVED_USERNAMES_TABLE, + RESERVED_USERNAMES_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::{AccessTokenData, AuthType}; pub use grpc_clients::identity::DeviceType; mod device_list; mod farcaster; mod workflows; pub use device_list::{DeviceListRow, DeviceListUpdate, DeviceRow}; use self::device_list::Prekey; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } pub struct DBDeviceTypeInt(pub i32); impl TryFrom for DeviceType { type Error = crate::error::Error; fn try_from(value: DBDeviceTypeInt) -> Result { let device_result = DeviceType::try_from(value.0); device_result.map_err(|_| { Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), attribute_value: Some(AttributeValue::N(value.0.to_string())).into(), attribute_error: DBItemAttributeError::InvalidValue, }) }) } } pub struct OutboundKeys { pub key_payload: String, pub key_payload_signature: String, pub content_prekey: Prekey, pub notif_prekey: Prekey, pub content_one_time_key: Option, pub notif_one_time_key: Option, } impl From for protos::auth::OutboundKeyInfo { fn from(db_keys: OutboundKeys) -> Self { use protos::unauth::IdentityKeyInfo; Self { identity_info: Some(IdentityKeyInfo { payload: db_keys.key_payload, payload_signature: db_keys.key_payload_signature, }), content_prekey: Some(db_keys.content_prekey.into()), notif_prekey: Some(db_keys.notif_prekey.into()), one_time_content_prekey: db_keys.content_one_time_key, one_time_notif_prekey: db_keys.notif_one_time_key, } } } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { let client = match &CONFIG.localstack_endpoint { Some(endpoint) => { info!( "Configuring DynamoDB client to use LocalStack endpoint: {}", endpoint ); let ddb_config_builder = comm_lib::aws::ddb::config::Builder::from(aws_config) .endpoint_url(endpoint); DynamoDBClient::from_conf(ddb_config_builder.build()) } None => DynamoDBClient::new(aws_config), }; DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, code_version: u64, access_token_creation_time: DateTime, ) -> Result { let device_key_upload = registration_state.flattened_device_key_upload; let user_id = self .add_user_to_users_table( - device_key_upload.clone(), Some((registration_state.username, Blob::new(password_file))), None, registration_state.user_id, registration_state.farcaster_id, ) .await?; self .add_device( &user_id, - device_key_upload, + device_key_upload.clone(), code_version, access_token_creation_time, ) .await?; + self + .append_one_time_prekeys( + &user_id, + &device_key_upload.device_id_key, + &device_key_upload.content_one_time_keys, + &device_key_upload.notif_one_time_keys, + ) + .await?; + Ok(user_id) } pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: SocialProof, user_id: Option, code_version: u64, access_token_creation_time: DateTime, farcaster_id: Option, ) -> Result { let wallet_identity = EthereumIdentity { wallet_address, social_proof, }; let user_id = self .add_user_to_users_table( - flattened_device_key_upload.clone(), None, Some(wallet_identity), user_id, farcaster_id, ) .await?; self .add_device( &user_id, - flattened_device_key_upload, + flattened_device_key_upload.clone(), code_version, access_token_creation_time, ) .await?; + self + .append_one_time_prekeys( + &user_id, + &flattened_device_key_upload.device_id_key, + &flattened_device_key_upload.content_one_time_keys, + &flattened_device_key_upload.notif_one_time_keys, + ) + .await?; + Ok(user_id) } async fn add_user_to_users_table( &self, - flattened_device_key_upload: FlattenedDeviceKeyUpload, username_and_password_file: Option<(String, Blob)>, wallet_identity: Option, user_id: Option, farcaster_id: Option, ) -> Result { let user_id = user_id.unwrap_or_else(generate_uuid); let mut user = HashMap::from([( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), )]); if let Some((username, password_file)) = username_and_password_file { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); } if let Some(eth_identity) = wallet_identity { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(eth_identity.wallet_address), ); user.insert( USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), eth_identity.social_proof.into(), ); } if let Some(fid) = farcaster_id { user.insert( USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME.to_string(), AttributeValue::S(fid), ); } self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(user)) // make sure we don't accidentaly overwrite existing row .condition_expression("attribute_not_exists(#pk)") .expression_attribute_names("#pk", USERS_TABLE_PARTITION_KEY) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; - self - .append_one_time_prekeys( - flattened_device_key_upload.device_id_key, - flattened_device_key_upload.content_one_time_keys, - flattened_device_key_upload.notif_one_time_keys, - ) - .await?; - Ok(user_id) } pub async fn add_user_device( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, code_version: u64, access_token_creation_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = flattened_device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_one_time_keys.clone(); // add device to the device list if not exists let device_id = flattened_device_key_upload.device_id_key.clone(); let device_exists = self .device_exists(user_id.clone(), device_id.clone()) .await?; if device_exists { self .update_device_login_time( user_id.clone(), device_id, access_token_creation_time, ) .await?; return Ok(()); } // add device to the new device list self .add_device( - user_id, + &user_id, flattened_device_key_upload, code_version, access_token_creation_time, ) .await?; self .append_one_time_prekeys( - device_id, - content_one_time_keys, - notif_one_time_keys, + &user_id, + &device_id, + &content_one_time_keys, + ¬if_one_time_keys, ) .await?; Ok(()) } pub async fn get_keyserver_keys_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device = user_devices .into_iter() .find(|device| device.device_type == GrpcDeviceType::Keyserver); let Some(keyserver) = maybe_keyserver_device else { return Ok(None); }; debug!( "Found keyserver in devices table (ID={})", &keyserver.device_id ); let notif_one_time_key: Option = self - .get_one_time_key(&keyserver.device_id, OlmAccountType::Notification) + .get_one_time_key( + user_id, + &keyserver.device_id, + OlmAccountType::Notification, + ) .await?; let content_one_time_key: Option = self - .get_one_time_key(&keyserver.device_id, OlmAccountType::Content) + .get_one_time_key(user_id, &keyserver.device_id, OlmAccountType::Content) .await?; debug!( "Able to get notif one-time key for keyserver {}: {}", &keyserver.device_id, notif_one_time_key.is_some() ); debug!( "Able to get content one-time key for keyserver {}: {}", &keyserver.device_id, content_one_time_key.is_some() ); let outbound_payload = OutboundKeys { key_payload: keyserver.device_key_info.key_payload, key_payload_signature: keyserver.device_key_info.key_payload_signature, content_prekey: keyserver.content_prekey, notif_prekey: keyserver.notif_prekey, content_one_time_key, notif_one_time_key, }; Ok(Some(outbound_payload)) } pub async fn get_keyserver_device_id_for_user( &self, user_id: &str, ) -> Result, Error> { use crate::grpc_services::protos::unauth::DeviceType as GrpcDeviceType; let user_devices = self.get_current_devices(user_id).await?; let maybe_keyserver_device_id = user_devices .into_iter() .find(|device| device.device_type == GrpcDeviceType::Keyserver) .map(|device| device.device_id); Ok(maybe_keyserver_device_id) } /// Will "mint" a single one-time key by attempting to successfully delete a /// key pub async fn get_one_time_key( &self, + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result, Error> { + use crate::constants::devices_table; use crate::constants::one_time_keys_table as otk_table; + use crate::constants::retry; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; - let query_result = self.get_one_time_keys(device_id, account_type).await?; - let items = query_result.items(); + let attr_otk_count = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block let device_id = device_id.to_string(); tokio::spawn(async move { debug!("Attempting to request more keys for device: {}", &device_id); let result = crate::tunnelbroker::send_refresh_keys_request(&device_id).await; consume_error(result); }); } - // If no one-time keys exist, or if there aren't enough, request more. - // Additionally, if no one-time keys exist, return early. - let item_vec = if let Some(items_list) = items { - if items_list.len() < ONE_TIME_KEY_MINIMUM_THRESHOLD { + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let otk_count = + self.get_otk_count(user_id, device_id, account_type).await?; + if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD { spawn_refresh_keys_task(device_id); } - items_list - } else { - debug!("Unable to find {:?} one-time key", account_type); - spawn_refresh_keys_task(device_id); - return Ok(None); - }; + if otk_count < 1 { + return Ok(None); + } - let mut result = None; - // Attempt to delete the one-time keys individually, a successful delete - // mints the one-time key to the requester - for item in item_vec { - let pk: String = item.get_attr(otk_table::PARTITION_KEY)?; - let otk: String = item.get_attr(otk_table::SORT_KEY)?; - - let composite_key = HashMap::from([ - (otk_table::PARTITION_KEY.to_string(), AttributeValue::S(pk)), - ( - otk_table::SORT_KEY.to_string(), - AttributeValue::S(otk.clone()), - ), - ]); - - debug!("Attempting to delete a {:?} one time key", account_type); - match self - .client - .delete_item() - .set_key(Some(composite_key)) + let query_result = self + .get_one_time_keys(user_id, device_id, account_type) + .await?; + let mut items = query_result.items.unwrap_or_default(); + let mut item = items.pop().unwrap_or_default(); + let pk = item.take_attr(otk_table::PARTITION_KEY)?; + let sk = item.take_attr(otk_table::SORT_KEY)?; + let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; + + let delete_otk = Delete::builder() .table_name(otk_table::NAME) + .key(otk_table::PARTITION_KEY, AttributeValue::S(pk)) + .key(otk_table::SORT_KEY, AttributeValue::S(sk)) + .build(); + + let delete_otk_operation = + TransactWriteItem::builder().delete(delete_otk).build(); + + let update_otk_count = Update::builder() + .table_name(devices_table::NAME) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) + .expression_attribute_values( + ":decrement_val", + AttributeValue::N("-1".to_string()), + ) + .condition_expression(format!("{} = :old_val", attr_otk_count)) + .expression_attribute_values( + ":old_val", + AttributeValue::N(otk_count.to_string()), + ) + .build(); + + let update_otk_count_operation = TransactWriteItem::builder() + .update(update_otk_count) + .build(); + + let transaction = self + .client + .transact_write_items() + .set_transact_items(Some(vec![ + delete_otk_operation, + update_otk_count_operation, + ])) .send() - .await - { - Ok(_) => { - result = Some(otk); - break; - } - // This err should only happen if a delete occurred between the read - // above and this delete + .await; + + match transaction { + Ok(_) => return Ok(Some(otk)), Err(e) => { - debug!("Unable to delete key: {:?}", e); - continue; + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = HashSet::from([ + retry::CONDITIONAL_CHECK_FAILED, + retry::TRANSACTION_CONFLICT, + ]); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while retrieving one-time key - retrying"); + } else { + error!( + "One-time key retrieval transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } } } } - - // Return deleted key - Ok(result) } pub async fn get_one_time_keys( &self, + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::one_time_keys_table::*; - // Add related prefix to partition key to grab the correct result set let partition_key = - create_one_time_key_partition_key(device_id, account_type); + create_one_time_key_partition_key(user_id, device_id, account_type); self .client .query() .table_name(NAME) - .key_condition_expression(format!("{} = :pk", PARTITION_KEY)) + .key_condition_expression("#pk = :pk") + .expression_attribute_names("#pk", PARTITION_KEY) .expression_attribute_values(":pk", AttributeValue::S(partition_key)) - .return_consumed_capacity(ReturnConsumedCapacity::Total) + .limit(1) .send() .await .map_err(|e| Error::AwsSdk(e.into())) - .map(|response| { - let capacity_units = response - .consumed_capacity() - .and_then(|it| it.capacity_units()); - debug!("OTK read consumed capacity: {:?}", capacity_units); - response - }) } pub async fn append_one_time_prekeys( &self, - device_id: String, - content_one_time_keys: Vec, - notif_one_time_keys: Vec, + user_id: &str, + device_id: &str, + content_one_time_keys: &Vec, + notif_one_time_keys: &Vec, ) -> Result<(), Error> { - use crate::constants::one_time_keys_table; + use crate::constants::retry; - let mut otk_requests = into_one_time_put_requests( - &device_id, + let num_content_keys = content_one_time_keys.len(); + let num_notif_keys = notif_one_time_keys.len(); + + if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + { + return Err(Error::OneTimeKeyUploadLimitExceeded); + } + + let current_time = chrono::Utc::now(); + + let content_otk_requests = into_one_time_put_requests( + user_id, + device_id, content_one_time_keys, OlmAccountType::Content, + current_time, ); - let notif_otk_requests: Vec = into_one_time_put_requests( - &device_id, + let notif_otk_requests = into_one_time_put_requests( + user_id, + device_id, notif_one_time_keys, OlmAccountType::Notification, + current_time, ); - otk_requests.extend(notif_otk_requests); - // BatchWriteItem has a hard limit of 25 writes per call - for requests in otk_requests.chunks(25) { - self + let update_otk_count_operation = into_one_time_update_requests( + user_id, + device_id, + num_content_keys, + num_notif_keys, + ); + + let mut operations = Vec::new(); + operations.extend_from_slice(&content_otk_requests); + operations.extend_from_slice(¬if_otk_requests); + operations.push(update_otk_count_operation); + + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let transaction = self .client - .batch_write_item() - .request_items(one_time_keys_table::NAME, requests.to_vec()) + .transact_write_items() + .set_transact_items(Some(operations.clone())) .send() - .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .await; + + match transaction { + Ok(_) => break, + Err(e) => { + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while uploading one-time keys - retrying"); + } else { + error!( + "One-time key upload transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } + } + } } Ok(()) } + async fn get_otk_count( + &self, + user_id: &str, + device_id: &str, + account_type: OlmAccountType, + ) -> Result { + use crate::constants::devices_table; + + let attr_name = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; + + let response = self + .client + .get_item() + .table_name(devices_table::NAME) + .projection_expression(attr_name) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .send() + .await + .map_err(|e| { + error!("Failed to get user's OTK count: {:?}", e); + Error::AwsSdk(e.into()) + })?; + + let mut user_item = response.item.unwrap_or_default(); + match parse_int_attribute(attr_name, user_item.remove(attr_name)) { + Ok(num) => Ok(num), + Err(DBItemError { + attribute_error: DBItemAttributeError::Missing, + .. + }) => Ok(0), + Err(e) => Err(Error::Attribute(e)), + } + } + pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn delete_user( &self, user_id: String, ) -> Result { debug!(user_id, "Attempting to delete user's devices"); self.delete_devices_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user"); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete user {}", user_id); Err(Error::AwsSdk(e.into())) } } } pub async fn get_access_token_data( &self, user_id: String, signing_public_key: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), signing_public_key.clone(), ), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = DateTime::::try_from_attr( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, signing_public_key, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and signing public key {} in token table", user_id, signing_public_key ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn verify_access_token( &self, user_id: String, signing_public_key: String, access_token_to_verify: String, ) -> Result { let is_valid = self .get_access_token_data(user_id, signing_public_key) .await? .map(|access_token_data| { constant_time_eq( access_token_data.access_token.as_bytes(), access_token_to_verify.as_bytes(), ) && access_token_data.is_valid() }) .unwrap_or(false); Ok(is_valid) } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.signing_public_key), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn delete_access_token_data( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { self .client .delete_item() .table_name(ACCESS_TOKEN_TABLE) .key( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ) .key( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(device_id_key), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn wallet_address_taken( &self, wallet_address: String, ) -> Result { let result = self .get_user_id_from_user_info(wallet_address, &AuthType::Wallet) .await?; Ok(result.is_some()) } pub async fn username_taken(&self, username: String) -> Result { let result = self .get_user_id_from_user_info(username, &AuthType::Password) .await?; Ok(result.is_some()) } pub async fn filter_out_taken_usernames( &self, user_details: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; let db_usernames_set: HashSet = db_usernames.into_iter().collect(); let available_user_details: Vec = user_details .into_iter() .filter(|user_detail| !db_usernames_set.contains(&user_detail.username)) .collect(); Ok(available_user_details) } async fn get_user_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result>, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: first_item .get(USERS_TABLE_PARTITION_KEY) .cloned() .into(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_keys_for_user( &self, user_id: &str, get_one_time_keys: bool, ) -> Result, Error> { let mut devices_response = self.get_keys_for_user_devices(user_id).await?; if devices_response.is_empty() { debug!("No devices found for user {}", user_id); return Ok(None); } if get_one_time_keys { for (device_id_key, device_keys) in devices_response.iter_mut() { device_keys.notif_one_time_key = self - .get_one_time_key(device_id_key, OlmAccountType::Notification) + .get_one_time_key( + user_id, + device_id_key, + OlmAccountType::Notification, + ) .await?; device_keys.content_one_time_key = self - .get_one_time_key(device_id_key, OlmAccountType::Content) + .get_one_time_key(user_id, device_id_key, OlmAccountType::Content) .await?; } } Ok(Some(devices_response)) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => user .take_attr(USERS_TABLE_PARTITION_KEY) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } pub async fn get_user_id_and_password_file_from_username( &self, username: &str, ) -> Result)>, Error> { match self .get_user_from_user_info(username.to_string(), &AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = user.take_attr(USERS_TABLE_PARTITION_KEY)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((user_id, password_file))) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } /// Retrieves username for password users or wallet address for wallet users /// Returns `None` if user not found pub async fn get_user_identifier( &self, user_id: &str, ) -> Result, Error> { self .get_item_from_users_table(user_id) .await? .item .map(Identifier::try_from) .transpose() .map_err(|e| { error!(user_id, "Database item is missing an identifier"); e }) } async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let Ok(username) = attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE) { result.push(username); } } } Ok(result) } pub async fn get_all_user_details(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(format!( "{USERS_TABLE_USERNAME_ATTRIBUTE}, {USERS_TABLE_PARTITION_KEY}" )) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let (Ok(username), Ok(user_id)) = ( attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), attribute.take_attr(USERS_TABLE_PARTITION_KEY), ) { result.push(UserDetail { username, user_id }); } } } Ok(result) } pub async fn get_all_reserved_user_details( &self, ) -> Result, Error> { let scan_output = self .client .scan() .table_name(RESERVED_USERNAMES_TABLE) .projection_expression(format!( "{RESERVED_USERNAMES_TABLE_PARTITION_KEY},\ {RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE}" )) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let (Ok(username), Ok(user_id)) = ( attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE), attribute.take_attr(RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE), ) { result.push(UserDetail { username, user_id }); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = item.take_attr(NONCE_TABLE_PARTITION_KEY)?; let created = DateTime::::try_from_attr( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(NONCE_TABLE_CREATED_ATTRIBUTE), )?; let expiration_time = DateTime::::try_from_attr( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, user_details: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for user_chunk in user_details.chunks(25) { let write_requests = user_chunk .iter() .map(|user_detail| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(user_detail.username.to_string()), ) .item( RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, AttributeValue::S(user_detail.user_id.to_string()), ) .build(); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn username_in_reserved_usernames_table( &self, username: &str, ) -> Result { match self .client .get_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(username.to_string()), ) .consistent_read(true) .send() .await { Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), Ok(_) => Ok(false), Err(e) => Err(Error::AwsSdk(e.into())), } } } type AttributeName = String; type Devices = HashMap; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } #[deprecated(note = "Use `comm_lib` counterpart instead")] #[allow(dead_code)] fn parse_map_attribute( attribute_name: &str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", "Unexpected attribute type when parsing map attribute" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::Missing, )) } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } #[test] fn test_int_to_device_type() { let valid_result = DeviceType::try_from(3); assert!(valid_result.is_ok()); assert_eq!(valid_result.unwrap(), DeviceType::Android); let invalid_result = DeviceType::try_from(6); assert!(invalid_result.is_err()); } } diff --git a/services/identity/src/database/device_list.rs b/services/identity/src/database/device_list.rs index 2bac00d32..0b4947a75 100644 --- a/services/identity/src/database/device_list.rs +++ b/services/identity/src/database/device_list.rs @@ -1,1392 +1,1396 @@ use std::collections::{HashMap, HashSet}; use chrono::{DateTime, Utc}; use comm_lib::{ aws::ddb::{ operation::{get_item::GetItemOutput, query::builders::QueryFluentBuilder}, types::{ error::TransactionCanceledException, AttributeValue, Delete, DeleteRequest, Put, TransactWriteItem, Update, WriteRequest, }, }, database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, DynamoDBError, TryFromAttribute, }, }; use tracing::{debug, error, warn}; use crate::{ client_service::FlattenedDeviceKeyUpload, constants::{ devices_table::{self, *}, USERS_TABLE, USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, }, error::{DeviceListError, Error}, grpc_services::protos::{self, unauth::DeviceType}, grpc_utils::DeviceKeysInfo, }; use super::DatabaseClient; +// We omit the content and notif one-time key count attributes from this struct +// because they are internal helpers and are not provided by users #[derive(Clone, Debug)] pub struct DeviceRow { pub user_id: String, pub device_id: String, pub device_type: DeviceType, pub device_key_info: IdentityKeyInfo, pub content_prekey: Prekey, pub notif_prekey: Prekey, // migration-related data pub code_version: u64, /// Timestamp of last login (access token generation) pub login_time: DateTime, } #[derive(Clone, Debug)] pub struct DeviceListRow { pub user_id: String, pub timestamp: DateTime, pub device_ids: Vec, } #[derive(Clone, Debug)] pub struct IdentityKeyInfo { pub key_payload: String, pub key_payload_signature: String, } #[derive(Clone, Debug)] pub struct Prekey { pub prekey: String, pub prekey_signature: String, } /// A struct representing device list update request /// payload; issued by the primary device #[derive(derive_more::Constructor)] pub struct DeviceListUpdate { pub devices: Vec, pub timestamp: DateTime, } impl DeviceRow { pub fn from_device_key_upload( user_id: impl Into, upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Self { Self { user_id: user_id.into(), device_id: upload.device_id_key, device_type: DeviceType::from_str_name(upload.device_type.as_str_name()) .expect("DeviceType conversion failed. Identity client and server protos mismatch"), device_key_info: IdentityKeyInfo { key_payload: upload.key_payload, key_payload_signature: upload.key_payload_signature, }, content_prekey: Prekey { prekey: upload.content_prekey, prekey_signature: upload.content_prekey_signature, }, notif_prekey: Prekey { prekey: upload.notif_prekey, prekey_signature: upload.notif_prekey_signature, }, code_version, login_time, } } } impl DeviceListRow { /// Generates new device list row from given devices fn new(user_id: impl Into, device_ids: Vec) -> Self { Self { user_id: user_id.into(), device_ids, timestamp: Utc::now(), } } } // helper structs for converting to/from attribute values for sort key (a.k.a itemID) -struct DeviceIDAttribute(String); +pub struct DeviceIDAttribute(pub String); struct DeviceListKeyAttribute(DateTime); impl From for AttributeValue { fn from(value: DeviceIDAttribute) -> Self { AttributeValue::S(format!("{DEVICE_ITEM_KEY_PREFIX}{}", value.0)) } } impl From for AttributeValue { fn from(value: DeviceListKeyAttribute) -> Self { AttributeValue::S(format!( "{DEVICE_LIST_KEY_PREFIX}{}", value.0.to_rfc3339() )) } } impl TryFrom> for DeviceIDAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device- prefix let device_id = item_id .strip_prefix(DEVICE_ITEM_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, })? .to_string(); Ok(Self(device_id)) } } impl TryFrom> for DeviceListKeyAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device-list- prefix, then parse the timestamp let timestamp: DateTime = item_id .strip_prefix(DEVICE_LIST_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, }) .and_then(|s| { s.parse().map_err(|e| { DBItemError::new( ATTR_ITEM_ID.to_string(), item_id.clone().into(), DBItemAttributeError::InvalidTimestamp(e), ) }) })?; Ok(Self(timestamp)) } } impl TryFrom for DeviceRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(ATTR_USER_ID)?; let DeviceIDAttribute(device_id) = attrs.remove(ATTR_ITEM_ID).try_into()?; let raw_device_type: String = attrs.take_attr(ATTR_DEVICE_TYPE)?; let device_type = DeviceType::from_str_name(&raw_device_type).ok_or_else(|| { DBItemError::new( ATTR_DEVICE_TYPE.to_string(), raw_device_type.into(), DBItemAttributeError::InvalidValue, ) })?; let device_key_info = attrs .take_attr::(ATTR_DEVICE_KEY_INFO) .and_then(IdentityKeyInfo::try_from)?; let content_prekey = attrs .take_attr::(ATTR_CONTENT_PREKEY) .and_then(Prekey::try_from)?; let notif_prekey = attrs .take_attr::(ATTR_NOTIF_PREKEY) .and_then(Prekey::try_from)?; let code_version = attrs .remove(ATTR_CODE_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .unwrap_or_default(); let login_time: DateTime = attrs.take_attr(ATTR_LOGIN_TIME)?; Ok(Self { user_id, device_id, device_type, device_key_info, content_prekey, notif_prekey, code_version, login_time, }) } } impl From for AttributeMap { fn from(value: DeviceRow) -> Self { HashMap::from([ (ATTR_USER_ID.to_string(), AttributeValue::S(value.user_id)), ( ATTR_ITEM_ID.to_string(), DeviceIDAttribute(value.device_id).into(), ), ( ATTR_DEVICE_TYPE.to_string(), AttributeValue::S(value.device_type.as_str_name().to_string()), ), ( ATTR_DEVICE_KEY_INFO.to_string(), value.device_key_info.into(), ), (ATTR_CONTENT_PREKEY.to_string(), value.content_prekey.into()), (ATTR_NOTIF_PREKEY.to_string(), value.notif_prekey.into()), // migration attributes ( ATTR_CODE_VERSION.to_string(), AttributeValue::N(value.code_version.to_string()), ), ( ATTR_LOGIN_TIME.to_string(), AttributeValue::S(value.login_time.to_rfc3339()), ), ]) } } impl From for protos::unauth::IdentityKeyInfo { fn from(value: IdentityKeyInfo) -> Self { Self { payload: value.key_payload, payload_signature: value.key_payload_signature, } } } impl From for AttributeValue { fn from(value: IdentityKeyInfo) -> Self { let attrs = HashMap::from([ ( ATTR_KEY_PAYLOAD.to_string(), AttributeValue::S(value.key_payload), ), ( ATTR_KEY_PAYLOAD_SIGNATURE.to_string(), AttributeValue::S(value.key_payload_signature), ), ]); AttributeValue::M(attrs) } } impl TryFrom for IdentityKeyInfo { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let key_payload = attrs.take_attr(ATTR_KEY_PAYLOAD)?; let key_payload_signature = attrs.take_attr(ATTR_KEY_PAYLOAD_SIGNATURE)?; Ok(Self { key_payload, key_payload_signature, }) } } impl From for AttributeValue { fn from(value: Prekey) -> Self { let attrs = HashMap::from([ (ATTR_PREKEY.to_string(), AttributeValue::S(value.prekey)), ( ATTR_PREKEY_SIGNATURE.to_string(), AttributeValue::S(value.prekey_signature), ), ]); AttributeValue::M(attrs) } } impl From for protos::unauth::Prekey { fn from(value: Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl From for Prekey { fn from(value: protos::unauth::Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl TryFrom for Prekey { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let prekey = attrs.take_attr(ATTR_PREKEY)?; let prekey_signature = attrs.take_attr(ATTR_PREKEY_SIGNATURE)?; Ok(Self { prekey, prekey_signature, }) } } impl TryFrom for DeviceListRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(ATTR_USER_ID)?; let DeviceListKeyAttribute(timestamp) = attrs.remove(ATTR_ITEM_ID).try_into()?; // validate timestamps are in sync let timestamps_match = attrs .remove(ATTR_TIMESTAMP) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .filter(|val| *val == timestamp.timestamp_millis()) .is_some(); if !timestamps_match { warn!( "DeviceList timestamp mismatch for (userID={}, itemID={})", &user_id, timestamp.to_rfc3339() ); } let device_ids: Vec = attrs.take_attr(ATTR_DEVICE_IDS)?; Ok(Self { user_id, timestamp, device_ids, }) } } impl From for AttributeMap { fn from(device_list: DeviceListRow) -> Self { let mut attrs = HashMap::new(); attrs.insert( ATTR_USER_ID.to_string(), AttributeValue::S(device_list.user_id.clone()), ); attrs.insert( ATTR_ITEM_ID.to_string(), DeviceListKeyAttribute(device_list.timestamp).into(), ); attrs.insert( ATTR_TIMESTAMP.to_string(), AttributeValue::N(device_list.timestamp.timestamp_millis().to_string()), ); attrs.insert( ATTR_DEVICE_IDS.to_string(), AttributeValue::L( device_list .device_ids .into_iter() .map(AttributeValue::S) .collect(), ), ); attrs } } impl DatabaseClient { /// Retrieves user's current devices and their full data pub async fn get_current_devices( &self, user_id: impl Into, ) -> Result, Error> { let response = query_rows_with_prefix(self, user_id, DEVICE_ITEM_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to get current devices: {:?}", e); Error::AwsSdk(e.into()) })?; let Some(rows) = response.items else { return Ok(Vec::new()); }; rows .into_iter() .map(DeviceRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Gets user's device list history pub async fn get_device_list_history( &self, user_id: impl Into, since: Option>, ) -> Result, Error> { let rows = if let Some(since) = since { // When timestamp is provided, it's better to query device lists by timestamp LSI self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id AND #timestamp > :since") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#timestamp", ATTR_TIMESTAMP) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .expression_attribute_values( ":since", AttributeValue::N(since.timestamp_millis().to_string()), ) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items } else { // Query all device lists for user query_rows_with_prefix(self, user_id, DEVICE_LIST_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to query device list updates (all): {:?}", e); Error::AwsSdk(e.into()) })? .items }; rows .unwrap_or_default() .into_iter() .map(DeviceListRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Returns all devices' keys for the given user. Response is in the same format /// as [DatabaseClient::get_keys_for_user] for compatibility reasons. pub async fn get_keys_for_user_devices( &self, user_id: impl Into, ) -> Result { let user_devices = self.get_current_devices(user_id).await?; let user_devices_keys = user_devices .into_iter() .map(|device| (device.device_id.clone(), DeviceKeysInfo::from(device))) .collect(); Ok(user_devices_keys) } pub async fn update_device_prekeys( &self, user_id: impl Into, device_id: impl Into, content_prekey: Prekey, notif_prekey: Prekey, ) -> Result<(), Error> { self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression( "SET #content_prekey = :content_prekey, #notif_prekey = :notif_prekey", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#content_prekey", ATTR_CONTENT_PREKEY) .expression_attribute_names("#notif_prekey", ATTR_NOTIF_PREKEY) .expression_attribute_values(":content_prekey", content_prekey.into()) .expression_attribute_values(":notif_prekey", notif_prekey.into()) .send() .await .map_err(|e| { error!("Failed to update device prekeys: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(()) } /// Checks if given device exists on user's current device list pub async fn device_exists( &self, user_id: impl Into, device_id: impl Into, ) -> Result { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) // only fetch the primary key, we don't need the rest .projection_expression(format!("{ATTR_USER_ID}, {ATTR_ITEM_ID}")) .send() .await .map_err(|e| { error!("Failed to check if device exists: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(item.is_some()) } pub async fn get_device_data( &self, user_id: impl Into, device_id: impl Into, ) -> Result, Error> { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .send() .await .map_err(|e| { error!("Failed to fetch device data: {:?}", e); Error::AwsSdk(e.into()) })?; let Some(attrs) = item else { return Ok(None); }; let device_data = DeviceRow::try_from(attrs)?; Ok(Some(device_data)) } /// Fails if the device list is empty pub async fn get_primary_device_data( &self, user_id: &str, ) -> Result { let device_list = self.get_current_device_list(user_id).await?; let Some(primary_device_id) = device_list .as_ref() .and_then(|list| list.device_ids.first()) else { error!(user_id, "Device list is empty. Cannot fetch primary device"); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); }; self .get_device_data(user_id, primary_device_id) .await? .ok_or_else(|| { error!( "Corrupt database. Missing primary device data for user {}", user_id ); Error::MissingItem }) } /// Required only for migration purposes (determining primary device) pub async fn update_device_login_time( &self, user_id: impl Into, device_id: impl Into, login_time: DateTime, ) -> Result<(), Error> { self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression("SET #login_time = :login_time") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#login_time", ATTR_LOGIN_TIME) .expression_attribute_values( ":login_time", AttributeValue::S(login_time.to_rfc3339()), ) .send() .await .map_err(|e| { error!("Failed to update device login time: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn get_current_device_list( &self, user_id: impl Into, ) -> Result, Error> { self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id") // sort descending .scan_index_forward(false) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .limit(1) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items .and_then(|mut items| items.pop()) .map(DeviceListRow::try_from) .transpose() .map_err(Error::from) } /// Adds device data to devices table. If the device already exists, its /// data is overwritten. This does not update the device list; the device ID /// should already be present in the device list. pub async fn put_device_data( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = device_key_upload.notif_one_time_keys.clone(); + let user_id_string = user_id.into(); let new_device = DeviceRow::from_device_key_upload( - user_id, + user_id_string.clone(), device_key_upload, code_version, login_time, ); let device_id = new_device.device_id.clone(); self .client .put_item() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .send() .await .map_err(|e| { error!("Failed to put device data: {:?}", e); Error::AwsSdk(e.into()) })?; self .append_one_time_prekeys( - device_id, - content_one_time_keys, - notif_one_time_keys, + &user_id_string, + &device_id, + &content_one_time_keys, + ¬if_one_time_keys, ) .await?; Ok(()) } /// Adds new device to user's device list. If the device already exists, the /// operation fails. Transactionally generates new device list version. pub async fn add_device( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Result<(), Error> { let user_id: String = user_id.into(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let new_device = DeviceRow::from_device_key_upload( &user_id, device_key_upload, code_version, login_time, ); if device_ids.iter().any(|id| &new_device.device_id == id) { warn!( "Device already exists in user's device list \ (userID={}, deviceID={})", &user_id, &new_device.device_id ); return Err(Error::DeviceList(DeviceListError::DeviceAlreadyExists)); } device_ids.push(new_device.device_id.clone()); // Reorder devices (determine primary device again) devices_data.push(new_device.clone()); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Put new device let put_device = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_operation = TransactWriteItem::builder().put(put_device).build(); Ok(Some(put_device_operation)) }) .await?; Ok(()) } /// Removes device from user's device list. If the device doesn't exist, the /// operation fails. Transactionally generates new device list version. pub async fn remove_device( &self, user_id: impl Into, device_id: impl AsRef, ) -> Result<(), Error> { let user_id: String = user_id.into(); let device_id = device_id.as_ref(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let device_exists = device_ids.iter().any(|id| id == device_id); if !device_exists { warn!( "Device doesn't exist in user's device list \ (userID={}, deviceID={})", &user_id, device_id ); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); } device_ids.retain(|id| id != device_id); // Reorder devices (determine primary device again) devices_data.retain(|d| d.device_id != device_id); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Delete device DDB operation let delete_device = Delete::builder() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.clone())) .key( ATTR_ITEM_ID, DeviceIDAttribute(device_id.to_string()).into(), ) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let operation = TransactWriteItem::builder().delete(delete_device).build(); Ok(Some(operation)) }) .await?; Ok(()) } /// applies updated device list received from primary device pub async fn apply_devicelist_update( &self, user_id: &str, update: DeviceListUpdate, ) -> Result { let DeviceListUpdate { devices: new_list, .. } = update; self .transact_update_devicelist(user_id, |current_list, _| { // TODO: Add proper validation according to the whitepaper // currently only adding new device is supported (new.len - old.len = 1) let new_set: HashSet<_> = new_list.iter().collect(); let current_set: HashSet<_> = current_list.iter().collect(); // difference is A - B (only new devices) let difference: HashSet<_> = new_set.difference(¤t_set).collect(); if difference.len() != 1 { warn!("Received invalid device list update"); return Err(Error::DeviceList( DeviceListError::InvalidDeviceListUpdate, )); } debug!("Applying device list update. Difference: {:?}", difference); *current_list = new_list; Ok(None) }) .await } /// Performs a transactional update of the device list for the user. Afterwards /// generates a new device list and updates the timestamp in the users table. /// This is done in a transaction. Operation fails if the device list has been /// updated concurrently (timestamp mismatch). /// Returns the new device list row that has been saved to database. async fn transact_update_devicelist( &self, user_id: &str, // The closure performing a transactional update of the device list. // It receives two arguments: // 1. A mutable reference to the current device list (ordered device IDs). // 2. Details (full data) of the current devices (unordered). // The closure should return a transactional DDB // operation to be performed when updating the device list. action: impl FnOnce( &mut Vec, Vec, ) -> Result, Error>, ) -> Result { let previous_timestamp = get_current_devicelist_timestamp(self, user_id).await?; let current_devices_data = self.get_current_devices(user_id).await?; let mut device_ids = self .get_current_device_list(user_id) .await? .map(|device_list| device_list.device_ids) .unwrap_or_default(); // Perform the update action, then generate new device list let operation = action(&mut device_ids, current_devices_data)?; let new_device_list = DeviceListRow::new(user_id, device_ids); // Update timestamp in users table let timestamp_update_operation = device_list_timestamp_update_operation( user_id, previous_timestamp, new_device_list.timestamp, ); // Put updated device list (a new version) let put_device_list = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device_list.clone().into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_list_operation = TransactWriteItem::builder().put(put_device_list).build(); let operations = if let Some(operation) = operation { vec![ operation, put_device_list_operation, timestamp_update_operation, ] } else { vec![put_device_list_operation, timestamp_update_operation] }; self .client .transact_write_items() .set_transact_items(Some(operations)) .send() .await .map_err(|e| match DynamoDBError::from(e) { DynamoDBError::TransactionCanceledException( TransactionCanceledException { cancellation_reasons: Some(reasons), .. }, ) if reasons .iter() .any(|reason| reason.code() == Some("ConditionalCheckFailed")) => { Error::DeviceList(DeviceListError::ConcurrentUpdateError) } other => { error!("Device list update transaction failed: {:?}", other); Error::AwsSdk(other) } })?; Ok(new_device_list) } /// Deletes all user data from devices table pub async fn delete_devices_table_rows_for_user( &self, user_id: impl Into, ) -> Result<(), Error> { // 1. get all rows // 2. batch write delete all // we project only the primary keys so we can pass these directly to delete requests let primary_keys = self .client .query() .table_name(devices_table::NAME) .projection_expression("#user_id, #item_id") .key_condition_expression("#user_id = :user_id") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .consistent_read(true) .send() .await .map_err(|e| { error!("Failed to list user's items in devices table: {:?}", e); Error::AwsSdk(e.into()) })? .items .unwrap_or_default(); let delete_requests = primary_keys .into_iter() .map(|item| { let request = DeleteRequest::builder().set_key(Some(item)).build(); WriteRequest::builder().delete_request(request).build() }) .collect::>(); // TODO: We can use the batch write helper from comm-services-lib when integrated for batch in delete_requests.chunks(25) { self .client .batch_write_item() .request_items(devices_table::NAME, batch.to_vec()) .send() .await .map_err(|e| { error!("Failed to batch delete items from devices table: {:?}", e); Error::AwsSdk(e.into()) })?; } Ok(()) } } /// Gets timestamp of user's current device list. Returns None if the user /// doesn't have a device list yet. Storing the timestamp in the users table is /// required for consistency. It's used as a condition when updating the device /// list. async fn get_current_devicelist_timestamp( db: &crate::database::DatabaseClient, user_id: impl Into, ) -> Result>, Error> { let response = db .client .get_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .projection_expression(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME) .send() .await .map_err(|e| { error!("Failed to get user's device list timestamp: {:?}", e); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); let raw_datetime = user_item.remove(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME); // existing records will not have this field when // updating device list for the first time if raw_datetime.is_none() { return Ok(None); } let timestamp = DateTime::::try_from_attr( USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, raw_datetime, )?; Ok(Some(timestamp)) } /// Generates update expression for current device list timestamp in users table. /// The previous timestamp is used as a condition to ensure that the value hasn't changed /// since we got it. This avoids race conditions when updating the device list. fn device_list_timestamp_update_operation( user_id: impl Into, previous_timestamp: Option>, new_timestamp: DateTime, ) -> TransactWriteItem { let update_builder = match previous_timestamp { Some(previous_timestamp) => Update::builder() .condition_expression("#device_list_timestamp = :previous_timestamp") .expression_attribute_values( ":previous_timestamp", AttributeValue::S(previous_timestamp.to_rfc3339()), ), // If there's no previous timestamp, the attribute shouldn't exist yet None => Update::builder() .condition_expression("attribute_not_exists(#device_list_timestamp)"), }; let update = update_builder .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .update_expression("SET #device_list_timestamp = :new_timestamp") .expression_attribute_names( "#device_list_timestamp", USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, ) .expression_attribute_values( ":new_timestamp", AttributeValue::S(new_timestamp.to_rfc3339()), ) .build(); TransactWriteItem::builder().update(update).build() } /// Helper function to query rows by given sort key prefix fn query_rows_with_prefix( db: &crate::database::DatabaseClient, user_id: impl Into, prefix: &'static str, ) -> QueryFluentBuilder { db.client .query() .table_name(devices_table::NAME) .key_condition_expression( "#user_id = :user_id AND begins_with(#item_id, :device_prefix)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values(":user_id", AttributeValue::S(user_id.into())) .expression_attribute_values( ":device_prefix", AttributeValue::S(prefix.to_string()), ) .consistent_read(true) } // Helper module for "migration" code into new device list schema. // We can get rid of this when primary device takes over the responsibility // of managing the device list. mod migration { use std::{cmp::Ordering, collections::HashSet}; use tracing::{debug, error, info}; use super::*; pub(super) fn reorder_device_list( user_id: &str, list: &mut [String], devices_data: &[DeviceRow], ) { if !verify_device_list_match(list, devices_data) { error!("Found corrupt device list for user (userID={})!", user_id); return; } let Some(first_device) = list.first() else { debug!("Skipping device list rotation. Nothing to reorder."); return; }; let Some(primary_device) = determine_primary_device(devices_data) else { info!( "No valid primary device found for user (userID={}).\ Skipping device list reorder.", user_id ); return; }; if first_device == &primary_device.device_id { debug!("Skipping device list reorder. Primary device is already first"); return; } // swap primary device with the first one let Some(primary_device_idx) = list.iter().position(|id| id == &primary_device.device_id) else { error!( "Primary device not found in device list (userID={})", user_id ); return; }; list.swap(0, primary_device_idx); info!("Reordered device list for user (userID={})", user_id); } // checks if device list matches given devices data fn verify_device_list_match( list: &[String], devices_data: &[DeviceRow], ) -> bool { if list.len() != devices_data.len() { error!("Device list length mismatch!"); return false; } let actual_device_ids = devices_data .iter() .map(|device| &device.device_id) .collect::>(); let device_list_set = list.iter().collect::>(); if let Some(corrupt_device_id) = device_list_set .symmetric_difference(&actual_device_ids) .next() { error!( "Device list is corrupt (unknown deviceID={})", corrupt_device_id ); return false; } true } /// Returns reference to primary device (if any) from given list of devices /// or None if there's no valid primary device. fn determine_primary_device(devices: &[DeviceRow]) -> Option<&DeviceRow> { // 1. Find mobile devices with valid token // 2. Prioritize these with latest code version // 3. If there's a tie, select the one with latest login time let mut mobile_devices = devices .iter() .filter(|device| { device.device_type == DeviceType::Ios || device.device_type == DeviceType::Android }) .collect::>(); mobile_devices.sort_by(|a, b| { let code_version_cmp = b.code_version.cmp(&a.code_version); if code_version_cmp == Ordering::Equal { b.login_time.cmp(&a.login_time) } else { code_version_cmp } }); mobile_devices.first().cloned() } #[cfg(test)] mod tests { use super::*; use chrono::Duration; #[test] fn reorder_skips_no_devices() { let mut list = vec![]; reorder_device_list("", &mut list, &[]); assert_eq!(list, Vec::::new()); } #[test] fn reorder_skips_single_device() { let mut list = vec!["test".into()]; let devices_data = vec![create_test_device("test", DeviceType::Web, 0, Utc::now())]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["test"]); } #[test] fn reorder_skips_for_valid_list() { let mut list = vec!["mobile".into(), "web".into()]; let devices_data = vec![ create_test_device("mobile", DeviceType::Android, 1, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn reorder_swaps_primary_device_when_possible() { let mut list = vec!["web".into(), "mobile".into()]; let devices_data = vec![ create_test_device("web", DeviceType::Web, 0, Utc::now()), create_test_device("mobile", DeviceType::Android, 1, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn determine_primary_device_returns_none_for_empty_list() { let devices = vec![]; assert!(determine_primary_device(&devices).is_none()); } #[test] fn determine_primary_device_returns_none_for_web_only() { let devices = vec![create_test_device("web", DeviceType::Web, 0, Utc::now())]; assert!( determine_primary_device(&devices).is_none(), "Primary device should be None for web-only devices" ); } #[test] fn determine_primary_device_prioritizes_mobile() { let devices = vec![ create_test_device("mobile", DeviceType::Android, 0, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile", "Primary device should be mobile" ); } #[test] fn determine_primary_device_prioritizes_latest_code_version() { let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, Utc::now()), create_test_device("mobile2", DeviceType::Android, 2, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2", "Primary device should be mobile with latest code version" ); } #[test] fn determine_primary_device_prioritizes_latest_login_time() { let devices = vec![ create_test_device("mobile1_today", DeviceType::Ios, 1, Utc::now()), create_test_device( "mobile2_yesterday", DeviceType::Android, 1, Utc::now() - Duration::days(1), ), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1_today", "Primary device should be mobile with latest login time" ); } #[test] fn determine_primary_device_keeps_deterministic_order() { // Given two identical devices, the first one should be selected as primary let today = Utc::now(); let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, today), create_test_device("mobile2", DeviceType::Android, 1, today), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1", "Primary device selection should be deterministic" ); } #[test] fn determine_primary_device_all_rules_together() { use DeviceType::{Android, Ios, Web}; let today = Utc::now(); let yesterday = today - Duration::days(1); let devices = vec![ create_test_device("mobile1_today", Android, 1, today), create_test_device("mobile2_today", Android, 2, today), create_test_device("mobile3_yesterday", Ios, 1, yesterday), create_test_device("mobile4_yesterday", Ios, 2, yesterday), create_test_device("web", Web, 5, today), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2_today", "Primary device should be mobile with latest code version and login time" ); } fn create_test_device( id: &str, platform: DeviceType, code_version: u64, login_time: DateTime, ) -> DeviceRow { DeviceRow { user_id: "test".into(), device_id: id.into(), device_type: platform, device_key_info: IdentityKeyInfo { key_payload: "".into(), key_payload_signature: "".into(), }, content_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, notif_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, code_version, login_time, } } } } diff --git a/services/identity/src/ddb_utils.rs b/services/identity/src/ddb_utils.rs index 7b349e7b5..cc9a65e01 100644 --- a/services/identity/src/ddb_utils.rs +++ b/services/identity/src/ddb_utils.rs @@ -1,119 +1,250 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use comm_lib::{ - aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}, + aws::{ + ddb::types::{ + error::TransactionCanceledException, AttributeValue, Put, + TransactWriteItem, Update, + }, + DynamoDBError, + }, database::{AttributeExtractor, AttributeMap}, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::iter::IntoIterator; use crate::{ constants::{ USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, }, + database::DeviceIDAttribute, siwe::SocialProof, }; #[derive(Copy, Clone, Debug)] pub enum OlmAccountType { Content, Notification, } -// Prefix the one time keys with the olm account variant. This allows for a single -// DDB table to contain both notification and content keys for a device. pub fn create_one_time_key_partition_key( + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> String { - match account_type { - OlmAccountType::Content => format!("content_{device_id}"), - OlmAccountType::Notification => format!("notification_{device_id}"), - } + let account_type = match account_type { + OlmAccountType::Content => "content", + OlmAccountType::Notification => "notif", + }; + format!("{user_id}#{device_id}#{account_type}") +} + +fn create_one_time_key_sort_key( + key_number: usize, + current_time: DateTime, +) -> String { + let timestamp = current_time.to_rfc3339(); + format!("{timestamp}#{:02}", key_number) } fn create_one_time_key_put_request( + user_id: &str, device_id: &str, one_time_key: String, + key_number: usize, account_type: OlmAccountType, -) -> WriteRequest { + current_time: DateTime, +) -> Put { use crate::constants::one_time_keys_table::*; let partition_key = - create_one_time_key_partition_key(device_id, account_type); - let builder = PutRequest::builder(); + create_one_time_key_partition_key(user_id, device_id, account_type); + let sort_key = create_one_time_key_sort_key(key_number, current_time); + + let builder = Put::builder(); let attrs = HashMap::from([ (PARTITION_KEY.to_string(), AttributeValue::S(partition_key)), - (SORT_KEY.to_string(), AttributeValue::S(one_time_key)), + (SORT_KEY.to_string(), AttributeValue::S(sort_key)), + ( + ATTR_ONE_TIME_KEY.to_string(), + AttributeValue::S(one_time_key), + ), ]); - let put_request = builder.set_item(Some(attrs)).build(); - - WriteRequest::builder().put_request(put_request).build() + builder.table_name(NAME).set_item(Some(attrs)).build() } pub fn into_one_time_put_requests( + user_id: &str, device_id: &str, one_time_keys: T, account_type: OlmAccountType, -) -> Vec + current_time: DateTime, +) -> Vec where T: IntoIterator, ::Item: ToString, { one_time_keys .into_iter() - .map(|otk| { - create_one_time_key_put_request(device_id, otk.to_string(), account_type) + .enumerate() + .map(|(index, otk)| { + create_one_time_key_put_request( + user_id, + device_id, + otk.to_string(), + index, + account_type, + current_time, + ) }) + .map(|put_request| TransactWriteItem::builder().put(put_request).build()) .collect() } +pub fn into_one_time_update_requests( + user_id: &str, + device_id: &str, + num_content_keys: usize, + num_notif_keys: usize, +) -> TransactWriteItem { + use crate::constants::devices_table; + + let update_otk_count = Update::builder() + .table_name(devices_table::NAME) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .update_expression(format!( + "ADD {} :num_content, {} :num_notif", + devices_table::ATTR_CONTENT_OTK_COUNT, + devices_table::ATTR_NOTIF_OTK_COUNT + )) + .expression_attribute_values( + ":num_content", + AttributeValue::N(num_content_keys.to_string()), + ) + .expression_attribute_values( + ":num_notif", + AttributeValue::N(num_notif_keys.to_string()), + ) + .build(); + + TransactWriteItem::builder() + .update(update_otk_count) + .build() +} + pub trait DateTimeExt { fn from_utc_timestamp_millis(timestamp: i64) -> Option>; } impl DateTimeExt for DateTime { fn from_utc_timestamp_millis(timestamp: i64) -> Option { let naive = NaiveDateTime::from_timestamp_millis(timestamp)?; Some(Self::from_naive_utc_and_offset(naive, Utc)) } } pub enum Identifier { Username(String), WalletAddress(EthereumIdentity), } pub struct EthereumIdentity { pub wallet_address: String, pub social_proof: SocialProof, } impl TryFrom for Identifier { type Error = crate::error::Error; fn try_from(mut value: AttributeMap) -> Result { let username_result = value.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE); if let Ok(username) = username_result { return Ok(Identifier::Username(username)); } let wallet_address_result = value.take_attr(USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE); let social_proof_result = value.take_attr(USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME); if let (Ok(wallet_address), Ok(social_proof)) = (wallet_address_result, social_proof_result) { Ok(Identifier::WalletAddress(EthereumIdentity { wallet_address, social_proof, })) } else { Err(Self::Error::MalformedItem) } } } + +pub fn is_transaction_retryable( + err: &DynamoDBError, + retryable_codes: &HashSet<&str>, +) -> bool { + match err { + DynamoDBError::TransactionCanceledException( + TransactionCanceledException { + cancellation_reasons: Some(reasons), + .. + }, + ) => reasons.iter().any(|reason| { + retryable_codes.contains(&reason.code().unwrap_or_default()) + }), + _ => false, + } +} + +#[cfg(test)] +mod tests { + use crate::constants::one_time_keys_table; + + use super::*; + + #[test] + fn test_into_one_time_put_requests() { + let otks = ["not", "real", "keys"]; + let current_time = Utc::now(); + + let requests = into_one_time_put_requests( + "abc", + "123", + otks, + OlmAccountType::Content, + current_time, + ); + + assert_eq!(requests.len(), 3); + + for (index, request) in requests.into_iter().enumerate() { + let mut item = request.put.unwrap().item.unwrap(); + assert_eq!( + item.remove(one_time_keys_table::PARTITION_KEY).unwrap(), + AttributeValue::S("abc#123#content".to_string()) + ); + assert_eq!( + item.remove(one_time_keys_table::SORT_KEY).unwrap(), + AttributeValue::S(format!( + "{}#{:02}", + current_time.to_rfc3339(), + index + )) + ); + assert_eq!( + item.remove(one_time_keys_table::ATTR_ONE_TIME_KEY).unwrap(), + AttributeValue::S(otks[index].to_string()) + ); + } + } +} diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs index ab2318ee1..21a5c8e50 100644 --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,44 +1,48 @@ use comm_lib::aws::DynamoDBError; use comm_lib::database::DBItemError; use tracing::error; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(...)] Transport(tonic::transport::Error), #[display(...)] Status(tonic::Status), #[display(...)] MissingItem, #[display(...)] DeviceList(DeviceListError), #[display(...)] MalformedItem, #[display(...)] Serde(serde_json::Error), #[display(...)] CannotOverwrite, + #[display(...)] + OneTimeKeyUploadLimitExceeded, + #[display(...)] + MaxRetriesExceeded, } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DeviceListError { DeviceAlreadyExists, DeviceNotFound, ConcurrentUpdateError, InvalidDeviceListUpdate, } pub fn consume_error(result: Result) { match result { Ok(_) => (), Err(e) => { error!("{}", e); } } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index 8d0a80e99..efa2b1440 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,663 +1,664 @@ use std::collections::HashMap; use crate::config::CONFIG; use crate::database::{DeviceListRow, DeviceListUpdate}; use crate::{ client_service::{handle_db_error, UpdateState, WorkflowInProgress}, constants::request_metadata, database::DatabaseClient, ddb_utils::DateTimeExt, grpc_services::shared::get_value, }; use chrono::{DateTime, Utc}; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, warn}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, }; use super::protos::auth::{ PeersDeviceListsRequest, PeersDeviceListsResponse, UserIdentityRequest, UserIdentityResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { debug!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID) .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, request_metadata::DEVICE_ID) .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_prekeys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_prekeys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .update_device_prekeys( user_id, device_id, content_keys.into(), notif_keys.into(), ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identifier(user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? else { return Err(Status::not_found("keyserver not found")); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await .map_err(handle_db_error)?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( - device_id, - message.content_one_time_prekeys, - message.notif_one_time_prekeys, + &user_id, + &device_id, + &message.content_one_time_prekeys, + &message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, user_id.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdateState { user_id }; let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(update_state)) .await .map_err(handle_db_error)?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(state.user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_access_token_data(user_id, device_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn delete_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::::from_utc_timestamp_millis(timestamp) .ok_or_else(|| tonic::Status::invalid_argument("Invalid timestamp")) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(RawDeviceList::from) .map(SignedDeviceList::try_from_raw) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); // do all fetches concurrently let mut fetch_tasks = tokio::task::JoinSet::new(); let mut device_lists = HashMap::with_capacity(user_ids.len()); for user_id in user_ids { let db_client = self.db_client.clone(); fetch_tasks.spawn(async move { let result = db_client.get_current_device_list(&user_id).await; (user_id, result) }); } while let Some(task_result) = fetch_tasks.join_next().await { match task_result { Ok((user_id, Ok(Some(device_list_row)))) => { let raw_list = RawDeviceList::from(device_list_row); let signed_list = SignedDeviceList::try_from_raw(raw_list)?; let serialized_list = signed_list.as_json_string()?; device_lists.insert(user_id, serialized_list); } Ok((user_id, Ok(None))) => { warn!(user_id, "User has no device list, skipping!"); } Ok((user_id, Err(err))) => { error!(user_id, "Failed fetching device list: {err}"); // abort fetching other users fetch_tasks.abort_all(); return Err(handle_db_error(err)); } Err(join_error) => { error!("Failed to join device list task: {join_error}"); fetch_tasks.abort_all(); return Err(Status::aborted("unexpected error")); } } } let response = PeersDeviceListsResponse { users_device_lists: device_lists, }; Ok(Response::new(response)) } async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _device_id) = get_user_and_device_id(&request)?; // TODO: when we stop doing "primary device rotation" (migration procedure) // we should verify if this RPC is called by primary device only let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; self .db_client .apply_devicelist_update(&user_id, update) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await .map_err(handle_db_error)?; if get_farcaster_users_response.len() > 1 { error!("multiple users associated with the same Farcaster ID"); return Err(Status::failed_precondition("cannot link Farcaster ID")); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists( "farcaster ID already associated with different user", )); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .remove_farcaster_id(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn find_user_identity( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let response = UserIdentityResponse { identity: Some(identifier.into()), }; return Ok(Response::new(response)); } } // raw device list that can be serialized to JSON (and then signed in the future) #[derive(serde::Serialize, serde::Deserialize)] struct RawDeviceList { devices: Vec, timestamp: i64, } impl From for RawDeviceList { fn from(row: DeviceListRow) -> Self { Self { devices: row.device_ids, timestamp: row.timestamp.timestamp_millis(), } } } #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct SignedDeviceList { /// JSON-stringified [`RawDeviceList`] raw_device_list: String, } impl SignedDeviceList { /// Serialize (and sign in the future) a [`RawDeviceList`] fn try_from_raw(raw: RawDeviceList) -> Result { let stringified_list = serde_json::to_string(&raw).map_err(|err| { error!("Failed to serialize raw device list: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Self { raw_device_list: stringified_list, }) } fn as_raw(&self) -> Result { // The device list payload is sent as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to deserialize serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) .map_err(|err| { warn!("Failed to deserialize raw device list: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } /// Serializes the signed device list to a JSON string fn as_json_string(&self) -> Result { serde_json::to_string(self).map_err(|err| { error!("Failed to serialize device list updates: {}", err); tonic::Status::failed_precondition("unexpected error") }) } } impl TryFrom for SignedDeviceList { type Error = tonic::Status; fn try_from(request: UpdateDeviceListRequest) -> Result { serde_json::from_str(&request.new_device_list).map_err(|err| { warn!("Failed to deserialize device list update: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } } impl TryFrom for DeviceListUpdate { type Error = tonic::Status; fn try_from(signed_list: SignedDeviceList) -> Result { let RawDeviceList { devices, timestamp: raw_timestamp, } = signed_list.as_raw()?; let timestamp = DateTime::::from_utc_timestamp_millis(raw_timestamp) .ok_or_else(|| { error!("Failed to parse RawDeviceList timestamp!"); tonic::Status::invalid_argument("invalid timestamp") })?; Ok(DeviceListUpdate::new(devices, timestamp)) } } #[cfg(test)] mod tests { use super::*; #[test] fn serialize_device_list_updates() { let raw_updates = vec![ RawDeviceList { devices: vec!["device1".into()], timestamp: 111111111, }, RawDeviceList { devices: vec!["device1".into(), "device2".into()], timestamp: 222222222, }, ]; let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_raw_list2 = r#"{"devices":["device1","device2"],"timestamp":222222222}"#; let signed_updates = raw_updates .into_iter() .map(SignedDeviceList::try_from_raw) .collect::, _>>() .expect("signing device list updates failed"); assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); let stringified_updates = signed_updates .iter() .map(serde_json::to_string) .collect::, _>>() .expect("serialize signed device lists failed"); let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; assert_eq!(stringified_updates[0], expected_stringified_list1); assert_eq!(stringified_updates[1], expected_stringified_list2); } #[test] fn deserialize_device_list_update() { let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; let request = UpdateDeviceListRequest { new_device_list: raw_payload.to_string(), }; let signed_list = SignedDeviceList::try_from(request) .expect("Failed to parse SignedDeviceList"); let update = DeviceListUpdate::try_from(signed_list) .expect("Failed to parse DeviceListUpdate from signed list"); let expected_timestamp = DateTime::::from_utc_timestamp_millis(123456789).unwrap(); assert_eq!(update.timestamp, expected_timestamp); assert_eq!( update.devices, vec!["device1".to_string(), "device2".to_string()] ); } } diff --git a/services/identity/src/siwe.rs b/services/identity/src/siwe.rs index 59929fb6d..e8afa6344 100644 --- a/services/identity/src/siwe.rs +++ b/services/identity/src/siwe.rs @@ -1,158 +1,157 @@ use std::collections::HashMap; use chrono::Utc; use comm_lib::{ aws::ddb::types::AttributeValue, database::{AttributeExtractor, AttributeMap, TryFromAttribute}, }; use regex::Regex; -use serde::{Deserialize, Serialize}; use siwe::Message; use tonic::Status; use tracing::error; use crate::constants::{ SOCIAL_PROOF_MESSAGE_ATTRIBUTE, SOCIAL_PROOF_SIGNATURE_ATTRIBUTE, }; pub fn parse_and_verify_siwe_message( siwe_message: &str, siwe_signature: &str, ) -> Result { let siwe_message: Message = siwe_message.parse().map_err(|e| { error!("Failed to parse SIWE message: {}", e); Status::invalid_argument("invalid message") })?; let decoded_signature = hex::decode(siwe_signature.trim_start_matches("0x")) .map_err(|e| { error!("Failed to decode SIWE signature: {}", e); Status::invalid_argument("invalid signature") })?; let signature = decoded_signature.try_into().map_err(|e| { error!("Conversion to SIWE signature failed: {:?}", e); Status::invalid_argument("invalid message") })?; siwe_message .verify(signature, None, None, Some(&Utc::now())) .map_err(|e| { error!("Signature verification failed: {}", e); Status::unauthenticated("message not authenticated") })?; Ok(siwe_message) } pub fn is_valid_ethereum_address(candidate: &str) -> bool { let ethereum_address_regex = Regex::new(r"^0x[a-fA-F0-9]{40}$").unwrap(); ethereum_address_regex.is_match(candidate) } #[derive(derive_more::Constructor)] pub struct SocialProof { pub message: String, pub signature: String, } impl From for AttributeValue { fn from(value: SocialProof) -> Self { AttributeValue::M(HashMap::from([ ( SOCIAL_PROOF_MESSAGE_ATTRIBUTE.to_string(), AttributeValue::S(value.message), ), ( SOCIAL_PROOF_SIGNATURE_ATTRIBUTE.to_string(), AttributeValue::S(value.signature), ), ])) } } impl TryFrom for SocialProof { type Error = comm_lib::database::DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let message = attrs.take_attr(SOCIAL_PROOF_MESSAGE_ATTRIBUTE)?; let signature = attrs.take_attr(SOCIAL_PROOF_SIGNATURE_ATTRIBUTE)?; Ok(Self { message, signature }) } } impl TryFromAttribute for SocialProof { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { AttributeMap::try_from_attr(attribute_name, attribute) .and_then(SocialProof::try_from) } } #[cfg(test)] mod tests { use crate::constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME; use super::*; #[test] fn test_valid_ethereum_address() { assert!(is_valid_ethereum_address( "0x1234567890123456789012345678901234567890" ),); assert!(is_valid_ethereum_address( "0xABCDEF123456789012345678901234567890ABCD" )); assert!(is_valid_ethereum_address( "0xabcdef123456789012345678901234567890abcd" )); } #[test] fn test_invalid_ethereum_address() { // Shorter than 42 characters assert_eq!( is_valid_ethereum_address("0x12345678901234567890123456789012345678"), false ); // Longer than 42 characters assert_eq!( is_valid_ethereum_address("0x123456789012345678901234567890123456789012"), false ); // Missing 0x prefix assert_eq!( is_valid_ethereum_address("1234567890123456789012345678901234567890"), false ); // Contains invalid characters assert_eq!( is_valid_ethereum_address("0x1234567890GHIJKL9012345678901234567890"), false ); // Empty string assert_eq!(is_valid_ethereum_address(""), false); } #[test] fn test_social_proof_ddb_format() { let message = "foo"; let signature = "bar"; let social_proof = SocialProof::new(message.to_string(), signature.to_string()); let mut user_item = AttributeMap::from([( USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), social_proof.into(), )]); let social_proof_from_attr: SocialProof = user_item .take_attr(USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME) .expect("social proof fetch failed"); assert_eq!(social_proof_from_attr.message, message); assert_eq!(social_proof_from_attr.signature, signature); } } diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf index 15caccf62..fe6b2aee3 100644 --- a/services/terraform/modules/shared/dynamodb.tf +++ b/services/terraform/modules/shared/dynamodb.tf @@ -1,282 +1,282 @@ resource "aws_dynamodb_table" "backup-service-backup" { name = "backup-service-backup" hash_key = "userID" range_key = "backupID" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } attribute { name = "backupID" type = "S" } attribute { name = "created" type = "S" } global_secondary_index { name = "userID-created-index" hash_key = "userID" range_key = "created" projection_type = "INCLUDE" non_key_attributes = ["userKeys"] } } resource "aws_dynamodb_table" "backup-service-log" { name = "backup-service-log" hash_key = "backupID" range_key = "logID" billing_mode = "PAY_PER_REQUEST" attribute { name = "backupID" type = "S" } attribute { name = "logID" type = "N" } } resource "aws_dynamodb_table" "blob-service-blobs" { name = "blob-service-blobs" hash_key = "blob_hash" range_key = "holder" billing_mode = "PAY_PER_REQUEST" attribute { name = "blob_hash" type = "S" } attribute { name = "holder" type = "S" } attribute { name = "last_modified" type = "N" } attribute { name = "unchecked" type = "S" } global_secondary_index { name = "unchecked-index" hash_key = "unchecked" range_key = "last_modified" projection_type = "KEYS_ONLY" } } resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" { name = "tunnelbroker-undelivered-messages" hash_key = "deviceID" range_key = "messageID" billing_mode = "PAY_PER_REQUEST" attribute { name = "deviceID" type = "S" } attribute { name = "messageID" type = "S" } } resource "aws_dynamodb_table" "identity-users" { name = "identity-users" hash_key = "userID" billing_mode = "PAY_PER_REQUEST" stream_enabled = true stream_view_type = "NEW_AND_OLD_IMAGES" attribute { name = "userID" type = "S" } attribute { name = "username" type = "S" } attribute { name = "walletAddress" type = "S" } attribute { name = "farcasterID" type = "S" } global_secondary_index { name = "username-index" hash_key = "username" projection_type = "KEYS_ONLY" } global_secondary_index { name = "walletAddress-index" hash_key = "walletAddress" projection_type = "KEYS_ONLY" } global_secondary_index { name = "farcasterID-index" hash_key = "farcasterID" projection_type = "INCLUDE" non_key_attributes = ["walletAddress", "username"] } } resource "aws_dynamodb_table" "identity-devices" { name = "identity-devices" hash_key = "userID" range_key = "itemID" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } # this is either device ID or device list datetime attribute { name = "itemID" type = "S" } # only for sorting device lists attribute { name = "timestamp" type = "N" } # sparse index allowing to sort device list updates by timestamp local_secondary_index { name = "deviceList-timestamp-index" range_key = "timestamp" projection_type = "ALL" } } resource "aws_dynamodb_table" "identity-tokens" { name = "identity-tokens" hash_key = "userID" range_key = "signingPublicKey" billing_mode = "PAY_PER_REQUEST" attribute { name = "userID" type = "S" } attribute { name = "signingPublicKey" type = "S" } } resource "aws_dynamodb_table" "identity-nonces" { name = "identity-nonces" hash_key = "nonce" billing_mode = "PAY_PER_REQUEST" attribute { name = "nonce" type = "S" } ttl { attribute_name = "expirationTimeUnix" enabled = true } } resource "aws_dynamodb_table" "identity-workflows-in-progress" { name = "identity-workflows-in-progress" hash_key = "id" billing_mode = "PAY_PER_REQUEST" attribute { name = "id" type = "S" } ttl { attribute_name = "expirationTimeUnix" enabled = true } } resource "aws_dynamodb_table" "identity-reserved-usernames" { name = "identity-reserved-usernames" hash_key = "username" billing_mode = "PAY_PER_REQUEST" stream_enabled = true stream_view_type = "NEW_AND_OLD_IMAGES" attribute { name = "username" type = "S" } } resource "aws_dynamodb_table" "identity-one-time-keys" { name = "identity-one-time-keys" - hash_key = "deviceID" - range_key = "oneTimeKey" + hash_key = "userID#deviceID#olmAccount" + range_key = "timestamp#keyNumber" billing_mode = "PAY_PER_REQUEST" attribute { - name = "deviceID" + name = "userID#deviceID#olmAccount" type = "S" } attribute { - name = "oneTimeKey" + name = "timestamp#keyNumber" type = "S" } } resource "aws_dynamodb_table" "feature-flags" { name = "feature-flags" hash_key = "platform" range_key = "feature" billing_mode = "PAY_PER_REQUEST" attribute { name = "platform" type = "S" } attribute { name = "feature" type = "S" } } resource "aws_dynamodb_table" "reports-service-reports" { name = "reports-service-reports" hash_key = "reportID" billing_mode = "PAY_PER_REQUEST" attribute { name = "reportID" type = "S" } }