diff --git a/services/commtest/tests/identity_device_list_tests.rs b/services/commtest/tests/identity_device_list_tests.rs index fb22862a2..86d4fdeaa 100644 --- a/services/commtest/tests/identity_device_list_tests.rs +++ b/services/commtest/tests/identity_device_list_tests.rs @@ -1,352 +1,354 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; use commtest::identity::device::{ login_user_device, logout_user_device, register_user_device, DEVICE_TYPE, PLACEHOLDER_CODE_VERSION, }; use commtest::service_addr; use grpc_clients::identity::authenticated::ChainedInterceptedAuthClient; use grpc_clients::identity::get_auth_client; use grpc_clients::identity::protos::auth::{ PeersDeviceListsRequest, UpdateDeviceListRequest, }; use grpc_clients::identity::protos::authenticated::GetDeviceListRequest; use grpc_clients::identity::DeviceType; use serde::Deserialize; use serde_json::json; // 1. register user with android device // 2. register a web device // 3. remove android device // 4. register ios device // 5. get device list - should have 4 updates: // - [android] // - [android, web] // - [web] // - [ios, web] - mobile should be first #[tokio::test] async fn test_device_list_rotation() { use commtest::identity::olm_account_infos::{ DEFAULT_CLIENT_KEYS as DEVICE_KEYS_ANDROID, MOCK_CLIENT_KEYS_1 as DEVICE_KEYS_WEB, MOCK_CLIENT_KEYS_2 as DEVICE_KEYS_IOS, }; // Create viewer (user that doesn't change devices) let viewer = register_user_device(None, None).await; let mut auth_client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), viewer.user_id.clone(), viewer.device_id, viewer.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let android_device_id = &DEVICE_KEYS_ANDROID.primary_identity_public_keys.ed25519; let web_device_id = &DEVICE_KEYS_WEB.primary_identity_public_keys.ed25519; let ios_device_id = &DEVICE_KEYS_IOS.primary_identity_public_keys.ed25519; // 1. Register user with primary Android device let android = register_user_device(Some(&DEVICE_KEYS_ANDROID), Some(DeviceType::Android)) .await; let user_id = android.user_id.clone(); let username = android.username.clone(); // 2. Log in a web device let _web = login_user_device( &username, Some(&DEVICE_KEYS_WEB), Some(DeviceType::Web), false, ) .await; // 3. Remove android device logout_user_device(android).await; // 4. Log in an iOS device let _ios = login_user_device( &username, Some(&DEVICE_KEYS_IOS), Some(DeviceType::Ios), false, ) .await; // Get device list updates for the user let device_lists_response: Vec> = get_device_list_history(&mut auth_client, &user_id) .await .into_iter() .map(|device_list| device_list.devices) .collect(); let expected_device_list: Vec> = vec![ vec![android_device_id.into()], vec![android_device_id.into(), web_device_id.into()], vec![web_device_id.into()], vec![ios_device_id.into(), web_device_id.into()], ]; assert_eq!(device_lists_response, expected_device_list); } #[tokio::test] async fn test_update_device_list_rpc() { // Register user with primary device let primary_device = register_user_device(None, None).await; let mut auth_client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), primary_device.user_id.clone(), primary_device.device_id, primary_device.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); // Initial device list check let initial_device_list = get_device_list_history(&mut auth_client, &primary_device.user_id) .await .into_iter() .map(|device_list| device_list.devices) .next() .expect("Expected to get single device list update"); assert!(initial_device_list.len() == 1, "Expected single device"); let primary_device_id = initial_device_list[0].clone(); // perform update by adding a new device + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let raw_update_payload = json!({ "devices": [primary_device_id, "device2"], - "timestamp": 123456789, + "timestamp": now.as_millis(), }); let update_payload = json!({ "rawDeviceList": serde_json::to_string(&raw_update_payload).unwrap(), }); let update_request = UpdateDeviceListRequest { new_device_list: serde_json::to_string(&update_payload) .expect("failed to serialize payload"), }; auth_client .update_device_list(update_request) .await .expect("Update device list RPC failed"); // get device list again let last_device_list = get_device_list_history(&mut auth_client, &primary_device.user_id).await; let last_device_list = last_device_list .last() .expect("Failed to get last device list update"); // check that the device list is properly updated assert_eq!( last_device_list.devices, vec![primary_device_id, "device2".into()] ); } #[tokio::test] async fn test_keyserver_force_login() { use commtest::identity::olm_account_infos::{ DEFAULT_CLIENT_KEYS as DEVICE_KEYS_ANDROID, MOCK_CLIENT_KEYS_1 as DEVICE_KEYS_KEYSERVER_1, MOCK_CLIENT_KEYS_2 as DEVICE_KEYS_KEYSERVER_2, }; // Create viewer (user that doesn't change devices) let viewer = register_user_device(None, None).await; let mut auth_client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), viewer.user_id.clone(), viewer.device_id, viewer.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); let android_device_id = &DEVICE_KEYS_ANDROID.primary_identity_public_keys.ed25519; let keyserver_1_device_id = &DEVICE_KEYS_KEYSERVER_1.primary_identity_public_keys.ed25519; let keyserver_2_device_id = &DEVICE_KEYS_KEYSERVER_2.primary_identity_public_keys.ed25519; // 1. Register user with primary Android device let android = register_user_device(Some(&DEVICE_KEYS_ANDROID), Some(DeviceType::Android)) .await; let user_id = android.user_id.clone(); let username = android.username.clone(); // 2. Log in on keyserver 1 let _keyserver_1 = login_user_device( &username, Some(&DEVICE_KEYS_KEYSERVER_1), Some(DeviceType::Keyserver), false, ) .await; // 3. Log in on keyserver 2 with force = true let _keyserver_2 = login_user_device( &username, Some(&DEVICE_KEYS_KEYSERVER_2), Some(DeviceType::Keyserver), true, ) .await; // Get device list updates for the user let device_lists_response: Vec> = get_device_list_history(&mut auth_client, &user_id) .await .into_iter() .map(|device_list| device_list.devices) .collect(); let expected_device_list: Vec> = vec![ vec![android_device_id.into()], vec![android_device_id.into(), keyserver_1_device_id.into()], vec![android_device_id.into()], vec![android_device_id.into(), keyserver_2_device_id.into()], ]; assert_eq!(device_lists_response, expected_device_list); } #[tokio::test] async fn test_device_list_multifetch() { // Create viewer (user that only auths request) let viewer = register_user_device(None, None).await; let mut auth_client = get_auth_client( &service_addr::IDENTITY_GRPC.to_string(), viewer.user_id.clone(), viewer.device_id, viewer.access_token, PLACEHOLDER_CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .expect("Couldn't connect to identity service"); // Register users and prepare expected device lists let mut expected_device_lists = HashMap::new(); for _ in 0..5 { let user = register_user_device(None, None).await; expected_device_lists.insert(user.user_id, vec![user.device_id]); } // Fetch device lists from server let user_ids: Vec<_> = expected_device_lists.keys().cloned().collect(); let request = PeersDeviceListsRequest { user_ids }; let response_device_lists = auth_client .get_device_lists_for_users(request) .await .expect("GetDeviceListsForUser RPC failed") .into_inner() .users_device_lists; // verify if response has the same user IDs as request let expected_user_ids: HashSet = expected_device_lists.keys().cloned().collect(); let response_user_ids: HashSet = response_device_lists.keys().cloned().collect(); let difference: HashSet<_> = expected_user_ids .symmetric_difference(&response_user_ids) .collect(); assert!(difference.is_empty(), "User IDs differ: {:?}", difference); // verify device list for each user for (user_id, expected_devices) in expected_device_lists { let response_payload = response_device_lists.get(&user_id).unwrap(); let returned_devices = SignedDeviceList::from_str(response_payload) .expect("failed to deserialize signed device list") .into_raw() .devices; assert_eq!( returned_devices, expected_devices, "Device list differs for user: {}, Expected {:?}, but got {:?}", user_id, expected_devices, returned_devices ); } } // See GetDeviceListResponse in identity_authenticated.proto // for details on the response format. #[derive(Deserialize)] #[serde(rename_all = "camelCase")] #[allow(unused)] struct RawDeviceList { devices: Vec, timestamp: i64, } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct SignedDeviceList { raw_device_list: String, } impl SignedDeviceList { fn into_raw(self) -> RawDeviceList { self .raw_device_list .parse() .expect("Failed to parse raw device list") } } impl FromStr for SignedDeviceList { type Err = serde_json::Error; fn from_str(s: &str) -> Result { serde_json::from_str(s) } } impl FromStr for RawDeviceList { type Err = serde_json::Error; fn from_str(s: &str) -> Result { // The device list payload is sent as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to deserialize serde_json::from_str(&s.replace(r#"\""#, r#"""#)) } } async fn get_device_list_history( client: &mut ChainedInterceptedAuthClient, user_id: &str, ) -> Vec { let request = GetDeviceListRequest { user_id: user_id.to_string(), since_timestamp: None, }; let response = client .get_device_list_for_user(request) .await .expect("Get device list request failed") .into_inner(); response .device_list_updates .into_iter() .map(|update| { SignedDeviceList::from_str(&update) .expect("Failed to parse device list update") .into_raw() }) .collect() } diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs index 355167825..90db6011e 100644 --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -1,253 +1,257 @@ use tokio::time::Duration; // Secrets pub const SECRETS_DIRECTORY: &str = "secrets"; pub const SECRETS_SETUP_FILE: &str = "server_setup.txt"; // DynamoDB // User table information, supporting opaque_ke 2.0 and X3DH information // Users can sign in either through username+password or Eth wallet. // // This structure should be aligned with the messages defined in // shared/protos/identity_unauthenticated.proto // // Structure for a user should be: // { // userID: String, // opaqueRegistrationData: Option, // username: Option, // walletAddress: Option, // devices: HashMap // } // // A device is defined as: // { // deviceType: String, # client or keyserver // keyPayload: String, // keyPayloadSignature: String, // identityPreKey: String, // identityPreKeySignature: String, // identityOneTimeKeys: Vec, // notifPreKey: String, // notifPreKeySignature: String, // notifOneTimeKeys: Vec, // socialProof: Option // } // } // // Additional context: // "devices" uses the signing public identity key of the device as a key for the devices map // "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification) // if "deviceType" == "keyserver", then the device will not have any notif key information pub const USERS_TABLE: &str = "identity-users"; pub const USERS_TABLE_PARTITION_KEY: &str = "userID"; pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData"; pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username"; pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str = "deviceType"; pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress"; pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof"; pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str = "deviceListTimestamp"; pub const USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME: &str = "farcasterID"; pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index"; pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index"; pub const USERS_TABLE_FARCASTER_ID_INDEX: &str = "farcasterID-index"; pub const ACCESS_TOKEN_TABLE: &str = "identity-tokens"; pub const ACCESS_TOKEN_TABLE_PARTITION_KEY: &str = "userID"; pub const ACCESS_TOKEN_SORT_KEY: &str = "signingPublicKey"; pub const ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE: &str = "authType"; pub const ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE: &str = "valid"; pub const ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE: &str = "token"; pub const NONCE_TABLE: &str = "identity-nonces"; pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce"; pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created"; pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime"; pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; pub const WORKFLOWS_IN_PROGRESS_TABLE: &str = "identity-workflows-in-progress"; pub const WORKFLOWS_IN_PROGRESS_PARTITION_KEY: &str = "id"; pub const WORKFLOWS_IN_PROGRESS_WORKFLOW_ATTRIBUTE: &str = "workflow"; pub const WORKFLOWS_IN_PROGRESS_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str = "expirationTimeUnix"; // Usernames reserved because they exist in Ashoat's keyserver already pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames"; pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username"; pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID"; // Users table social proof attribute pub const SOCIAL_PROOF_MESSAGE_ATTRIBUTE: &str = "siweMessage"; pub const SOCIAL_PROOF_SIGNATURE_ATTRIBUTE: &str = "siweSignature"; pub mod devices_table { /// table name pub const NAME: &str = "identity-devices"; pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index"; /// partition key pub const ATTR_USER_ID: &str = "userID"; /// sort key pub const ATTR_ITEM_ID: &str = "itemID"; // itemID prefixes (one shouldn't be a prefix of the other) pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-"; pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-"; // device-specific attrs pub const ATTR_DEVICE_TYPE: &str = "deviceType"; pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo"; pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey"; pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey"; // IdentityKeyInfo constants pub const ATTR_KEY_PAYLOAD: &str = "keyPayload"; pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature"; // PreKey constants pub const ATTR_PREKEY: &str = "preKey"; pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature"; // device-list-specific attrs pub const ATTR_TIMESTAMP: &str = "timestamp"; pub const ATTR_DEVICE_IDS: &str = "deviceIDs"; // migration-specific attrs pub const ATTR_CODE_VERSION: &str = "codeVersion"; pub const ATTR_LOGIN_TIME: &str = "loginTime"; // one-time key constants pub const ATTR_CONTENT_OTK_COUNT: &str = "contentOTKCount"; pub const ATTR_NOTIF_OTK_COUNT: &str = "notifOTKCount"; } // One time keys table, which need to exist in their own table to ensure // atomicity of additions and removals pub mod one_time_keys_table { pub const NAME: &str = "identity-one-time-keys"; pub const PARTITION_KEY: &str = "userID#deviceID#olmAccount"; pub const SORT_KEY: &str = "timestamp#keyNumber"; pub const ATTR_ONE_TIME_KEY: &str = "oneTimeKey"; } // Tokio pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054"; pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004"; pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); // Token pub const ACCESS_TOKEN_LENGTH: usize = 512; // Temporary config pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN"; pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY"; // Nonce pub const NONCE_LENGTH: usize = 17; pub const NONCE_TTL_DURATION: Duration = Duration::from_secs(120); // seconds +// Device list + +pub const DEVICE_LIST_TIMESTAMP_VALID_FOR: Duration = Duration::from_secs(300); + // Workflows in progress pub const WORKFLOWS_IN_PROGRESS_TTL_DURATION: Duration = Duration::from_secs(120); // Identity pub const DEFAULT_IDENTITY_ENDPOINT: &str = "http://localhost:50054"; // LocalStack pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; // OPAQUE Server Setup pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP"; // Identity Search pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT"; pub const DEFAULT_OPENSEARCH_ENDPOINT: &str = "identity-search-domain.us-east-2.opensearch.localhost.localstack.cloud:4566"; pub const IDENTITY_SEARCH_INDEX: &str = "users"; pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20; // Tunnelbroker pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT"; pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051"; // X3DH key management // Threshold for requesting more one_time keys pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5; // Number of keys to be refreshed when below the threshold pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5; // Minimum supported code versions pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270; // Request metadata pub mod request_metadata { pub const CODE_VERSION: &str = "code_version"; pub const DEVICE_TYPE: &str = "device_type"; pub const USER_ID: &str = "user_id"; pub const DEVICE_ID: &str = "device_id"; pub const ACCESS_TOKEN: &str = "access_token"; } // CORS pub mod cors { use std::time::Duration; pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60); pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] = ["grpc-status", "grpc-message", "grpc-status-details-bin"]; pub const DEFAULT_ALLOW_HEADERS: [&str; 9] = [ "x-grpc-web", "content-type", "x-user-agent", "grpc-timeout", super::request_metadata::CODE_VERSION, super::request_metadata::DEVICE_TYPE, super::request_metadata::USER_ID, super::request_metadata::DEVICE_ID, super::request_metadata::ACCESS_TOKEN, ]; pub const ALLOW_ORIGIN_LIST: &str = "ALLOW_ORIGIN_LIST"; } // Regex pub const VALID_USERNAME_REGEX_STRING: &str = r"^[a-zA-Z0-9][a-zA-Z0-9-_]{0,190}$"; // Retry // TODO: Replace this with `ExponentialBackoffConfig` from `comm-lib` pub mod retry { pub const MAX_ATTEMPTS: usize = 8; pub const CONDITIONAL_CHECK_FAILED: &str = "ConditionalCheckFailed"; pub const TRANSACTION_CONFLICT: &str = "TransactionConflict"; } // One-time keys pub const ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT: usize = 24; pub const ONE_TIME_KEY_SIZE: usize = 43; // as defined in olm pub const MAX_ONE_TIME_KEYS: usize = 100; // as defined in olm diff --git a/services/identity/src/database/device_list.rs b/services/identity/src/database/device_list.rs index 8e45da128..9db84c4a7 100644 --- a/services/identity/src/database/device_list.rs +++ b/services/identity/src/database/device_list.rs @@ -1,1410 +1,1425 @@ use std::collections::{HashMap, HashSet}; use chrono::{DateTime, Utc}; use comm_lib::{ aws::ddb::{ operation::{get_item::GetItemOutput, query::builders::QueryFluentBuilder}, types::{ error::TransactionCanceledException, AttributeValue, Delete, DeleteRequest, Put, TransactWriteItem, Update, WriteRequest, }, }, database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, DynamoDBError, TryFromAttribute, }, }; use tracing::{debug, error, warn}; use crate::{ client_service::FlattenedDeviceKeyUpload, constants::{ devices_table::{self, *}, USERS_TABLE, USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, }, error::{DeviceListError, Error}, grpc_services::protos::{self, unauth::DeviceType}, grpc_utils::DeviceKeysInfo, olm::is_valid_olm_key, }; use super::DatabaseClient; // We omit the content and notif one-time key count attributes from this struct // because they are internal helpers and are not provided by users #[derive(Clone, Debug)] pub struct DeviceRow { pub user_id: String, pub device_id: String, pub device_type: DeviceType, pub device_key_info: IdentityKeyInfo, pub content_prekey: Prekey, pub notif_prekey: Prekey, // migration-related data pub code_version: u64, /// Timestamp of last login (access token generation) pub login_time: DateTime, } #[derive(Clone, Debug)] pub struct DeviceListRow { pub user_id: String, pub timestamp: DateTime, pub device_ids: Vec, } #[derive(Clone, Debug)] pub struct IdentityKeyInfo { pub key_payload: String, pub key_payload_signature: String, } #[derive(Clone, Debug)] pub struct Prekey { pub prekey: String, pub prekey_signature: String, } /// A struct representing device list update request /// payload; issued by the primary device #[derive(derive_more::Constructor)] pub struct DeviceListUpdate { pub devices: Vec, pub timestamp: DateTime, } impl DeviceRow { pub fn from_device_key_upload( user_id: impl Into, upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Result { if !is_valid_olm_key(&upload.content_prekey) || !is_valid_olm_key(&upload.notif_prekey) { error!("Invalid prekey format"); return Err(Error::InvalidFormat); } let device_row = Self { user_id: user_id.into(), device_id: upload.device_id_key, device_type: DeviceType::from_str_name(upload.device_type.as_str_name()) .expect("DeviceType conversion failed. Identity client and server protos mismatch"), device_key_info: IdentityKeyInfo { key_payload: upload.key_payload, key_payload_signature: upload.key_payload_signature, }, content_prekey: Prekey { prekey: upload.content_prekey, prekey_signature: upload.content_prekey_signature, }, notif_prekey: Prekey { prekey: upload.notif_prekey, prekey_signature: upload.notif_prekey_signature, }, code_version, login_time, }; Ok(device_row) } } impl DeviceListRow { /// Generates new device list row from given devices - fn new(user_id: impl Into, device_ids: Vec) -> Self { + fn new( + user_id: impl Into, + device_ids: Vec, + timestamp: Option>, + ) -> Self { Self { user_id: user_id.into(), device_ids, - timestamp: Utc::now(), + timestamp: timestamp.unwrap_or_else(Utc::now), } } } // helper structs for converting to/from attribute values for sort key (a.k.a itemID) pub struct DeviceIDAttribute(pub String); struct DeviceListKeyAttribute(DateTime); impl From for AttributeValue { fn from(value: DeviceIDAttribute) -> Self { AttributeValue::S(format!("{DEVICE_ITEM_KEY_PREFIX}{}", value.0)) } } impl From for AttributeValue { fn from(value: DeviceListKeyAttribute) -> Self { AttributeValue::S(format!( "{DEVICE_LIST_KEY_PREFIX}{}", value.0.to_rfc3339() )) } } impl TryFrom> for DeviceIDAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device- prefix let device_id = item_id .strip_prefix(DEVICE_ITEM_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, })? .to_string(); Ok(Self(device_id)) } } impl TryFrom> for DeviceListKeyAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = String::try_from_attr(ATTR_ITEM_ID, value)?; // remove the device-list- prefix, then parse the timestamp let timestamp: DateTime = item_id .strip_prefix(DEVICE_LIST_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, }) .and_then(|s| { s.parse().map_err(|e| { DBItemError::new( ATTR_ITEM_ID.to_string(), item_id.clone().into(), DBItemAttributeError::InvalidTimestamp(e), ) }) })?; Ok(Self(timestamp)) } } impl TryFrom for DeviceRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(ATTR_USER_ID)?; let DeviceIDAttribute(device_id) = attrs.remove(ATTR_ITEM_ID).try_into()?; let raw_device_type: String = attrs.take_attr(ATTR_DEVICE_TYPE)?; let device_type = DeviceType::from_str_name(&raw_device_type).ok_or_else(|| { DBItemError::new( ATTR_DEVICE_TYPE.to_string(), raw_device_type.into(), DBItemAttributeError::InvalidValue, ) })?; let device_key_info = attrs .take_attr::(ATTR_DEVICE_KEY_INFO) .and_then(IdentityKeyInfo::try_from)?; let content_prekey = attrs .take_attr::(ATTR_CONTENT_PREKEY) .and_then(Prekey::try_from)?; let notif_prekey = attrs .take_attr::(ATTR_NOTIF_PREKEY) .and_then(Prekey::try_from)?; let code_version = attrs .remove(ATTR_CODE_VERSION) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .unwrap_or_default(); let login_time: DateTime = attrs.take_attr(ATTR_LOGIN_TIME)?; Ok(Self { user_id, device_id, device_type, device_key_info, content_prekey, notif_prekey, code_version, login_time, }) } } impl From for AttributeMap { fn from(value: DeviceRow) -> Self { HashMap::from([ (ATTR_USER_ID.to_string(), AttributeValue::S(value.user_id)), ( ATTR_ITEM_ID.to_string(), DeviceIDAttribute(value.device_id).into(), ), ( ATTR_DEVICE_TYPE.to_string(), AttributeValue::S(value.device_type.as_str_name().to_string()), ), ( ATTR_DEVICE_KEY_INFO.to_string(), value.device_key_info.into(), ), (ATTR_CONTENT_PREKEY.to_string(), value.content_prekey.into()), (ATTR_NOTIF_PREKEY.to_string(), value.notif_prekey.into()), // migration attributes ( ATTR_CODE_VERSION.to_string(), AttributeValue::N(value.code_version.to_string()), ), ( ATTR_LOGIN_TIME.to_string(), AttributeValue::S(value.login_time.to_rfc3339()), ), ]) } } impl From for protos::unauth::IdentityKeyInfo { fn from(value: IdentityKeyInfo) -> Self { Self { payload: value.key_payload, payload_signature: value.key_payload_signature, } } } impl From for AttributeValue { fn from(value: IdentityKeyInfo) -> Self { let attrs = HashMap::from([ ( ATTR_KEY_PAYLOAD.to_string(), AttributeValue::S(value.key_payload), ), ( ATTR_KEY_PAYLOAD_SIGNATURE.to_string(), AttributeValue::S(value.key_payload_signature), ), ]); AttributeValue::M(attrs) } } impl TryFrom for IdentityKeyInfo { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let key_payload = attrs.take_attr(ATTR_KEY_PAYLOAD)?; let key_payload_signature = attrs.take_attr(ATTR_KEY_PAYLOAD_SIGNATURE)?; Ok(Self { key_payload, key_payload_signature, }) } } impl From for AttributeValue { fn from(value: Prekey) -> Self { let attrs = HashMap::from([ (ATTR_PREKEY.to_string(), AttributeValue::S(value.prekey)), ( ATTR_PREKEY_SIGNATURE.to_string(), AttributeValue::S(value.prekey_signature), ), ]); AttributeValue::M(attrs) } } impl From for protos::unauth::Prekey { fn from(value: Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl From for Prekey { fn from(value: protos::unauth::Prekey) -> Self { Self { prekey: value.prekey, prekey_signature: value.prekey_signature, } } } impl TryFrom for Prekey { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let prekey = attrs.take_attr(ATTR_PREKEY)?; let prekey_signature = attrs.take_attr(ATTR_PREKEY_SIGNATURE)?; Ok(Self { prekey, prekey_signature, }) } } impl TryFrom for DeviceListRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(ATTR_USER_ID)?; let DeviceListKeyAttribute(timestamp) = attrs.remove(ATTR_ITEM_ID).try_into()?; // validate timestamps are in sync let timestamps_match = attrs .remove(ATTR_TIMESTAMP) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .filter(|val| *val == timestamp.timestamp_millis()) .is_some(); if !timestamps_match { warn!( "DeviceList timestamp mismatch for (userID={}, itemID={})", &user_id, timestamp.to_rfc3339() ); } let device_ids: Vec = attrs.take_attr(ATTR_DEVICE_IDS)?; Ok(Self { user_id, timestamp, device_ids, }) } } impl From for AttributeMap { fn from(device_list: DeviceListRow) -> Self { let mut attrs = HashMap::new(); attrs.insert( ATTR_USER_ID.to_string(), AttributeValue::S(device_list.user_id.clone()), ); attrs.insert( ATTR_ITEM_ID.to_string(), DeviceListKeyAttribute(device_list.timestamp).into(), ); attrs.insert( ATTR_TIMESTAMP.to_string(), AttributeValue::N(device_list.timestamp.timestamp_millis().to_string()), ); attrs.insert( ATTR_DEVICE_IDS.to_string(), AttributeValue::L( device_list .device_ids .into_iter() .map(AttributeValue::S) .collect(), ), ); attrs } } impl DatabaseClient { /// Retrieves user's current devices and their full data pub async fn get_current_devices( &self, user_id: impl Into, ) -> Result, Error> { let response = query_rows_with_prefix(self, user_id, DEVICE_ITEM_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to get current devices: {:?}", e); Error::AwsSdk(e.into()) })?; let Some(rows) = response.items else { return Ok(Vec::new()); }; rows .into_iter() .map(DeviceRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Gets user's device list history pub async fn get_device_list_history( &self, user_id: impl Into, since: Option>, ) -> Result, Error> { let rows = if let Some(since) = since { // When timestamp is provided, it's better to query device lists by timestamp LSI self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id AND #timestamp > :since") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#timestamp", ATTR_TIMESTAMP) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .expression_attribute_values( ":since", AttributeValue::N(since.timestamp_millis().to_string()), ) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items } else { // Query all device lists for user query_rows_with_prefix(self, user_id, DEVICE_LIST_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to query device list updates (all): {:?}", e); Error::AwsSdk(e.into()) })? .items }; rows .unwrap_or_default() .into_iter() .map(DeviceListRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Returns all devices' keys for the given user. Response is in the same format /// as [DatabaseClient::get_keys_for_user] for compatibility reasons. pub async fn get_keys_for_user_devices( &self, user_id: impl Into, ) -> Result { let user_devices = self.get_current_devices(user_id).await?; let user_devices_keys = user_devices .into_iter() .map(|device| (device.device_id.clone(), DeviceKeysInfo::from(device))) .collect(); Ok(user_devices_keys) } pub async fn update_device_prekeys( &self, user_id: impl Into, device_id: impl Into, content_prekey: Prekey, notif_prekey: Prekey, ) -> Result<(), Error> { if !is_valid_olm_key(&content_prekey.prekey) || !is_valid_olm_key(¬if_prekey.prekey) { error!("Invalid prekey format"); return Err(Error::InvalidFormat); } self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression( "SET #content_prekey = :content_prekey, #notif_prekey = :notif_prekey", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#content_prekey", ATTR_CONTENT_PREKEY) .expression_attribute_names("#notif_prekey", ATTR_NOTIF_PREKEY) .expression_attribute_values(":content_prekey", content_prekey.into()) .expression_attribute_values(":notif_prekey", notif_prekey.into()) .send() .await .map_err(|e| { error!("Failed to update device prekeys: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(()) } /// Checks if given device exists on user's current device list pub async fn device_exists( &self, user_id: impl Into, device_id: impl Into, ) -> Result { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) // only fetch the primary key, we don't need the rest .projection_expression(format!("{ATTR_USER_ID}, {ATTR_ITEM_ID}")) .send() .await .map_err(|e| { error!("Failed to check if device exists: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(item.is_some()) } pub async fn get_device_data( &self, user_id: impl Into, device_id: impl Into, ) -> Result, Error> { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .send() .await .map_err(|e| { error!("Failed to fetch device data: {:?}", e); Error::AwsSdk(e.into()) })?; let Some(attrs) = item else { return Ok(None); }; let device_data = DeviceRow::try_from(attrs)?; Ok(Some(device_data)) } /// Fails if the device list is empty pub async fn get_primary_device_data( &self, user_id: &str, ) -> Result { let device_list = self.get_current_device_list(user_id).await?; let Some(primary_device_id) = device_list .as_ref() .and_then(|list| list.device_ids.first()) else { error!(user_id, "Device list is empty. Cannot fetch primary device"); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); }; self .get_device_data(user_id, primary_device_id) .await? .ok_or_else(|| { error!( "Corrupt database. Missing primary device data for user {}", user_id ); Error::MissingItem }) } /// Required only for migration purposes (determining primary device) pub async fn update_device_login_time( &self, user_id: impl Into, device_id: impl Into, login_time: DateTime, ) -> Result<(), Error> { self .client .update_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .update_expression("SET #login_time = :login_time") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_names("#login_time", ATTR_LOGIN_TIME) .expression_attribute_values( ":login_time", AttributeValue::S(login_time.to_rfc3339()), ) .send() .await .map_err(|e| { error!("Failed to update device login time: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(()) } pub async fn get_current_device_list( &self, user_id: impl Into, ) -> Result, Error> { self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id") // sort descending .scan_index_forward(false) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .limit(1) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items .and_then(|mut items| items.pop()) .map(DeviceListRow::try_from) .transpose() .map_err(Error::from) } /// Adds device data to devices table. If the device already exists, its /// data is overwritten. This does not update the device list; the device ID /// should already be present in the device list. pub async fn put_device_data( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Result<(), Error> { let content_one_time_keys = device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = device_key_upload.notif_one_time_keys.clone(); let user_id_string = user_id.into(); let new_device = DeviceRow::from_device_key_upload( user_id_string.clone(), device_key_upload, code_version, login_time, )?; let device_id = new_device.device_id.clone(); self .client .put_item() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .send() .await .map_err(|e| { error!("Failed to put device data: {:?}", e); Error::AwsSdk(e.into()) })?; self .append_one_time_prekeys( &user_id_string, &device_id, &content_one_time_keys, ¬if_one_time_keys, ) .await?; Ok(()) } /// Adds new device to user's device list. If the device already exists, the /// operation fails. Transactionally generates new device list version. pub async fn add_device( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, code_version: u64, login_time: DateTime, ) -> Result<(), Error> { let user_id: String = user_id.into(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let new_device = DeviceRow::from_device_key_upload( &user_id, device_key_upload, code_version, login_time, )?; if device_ids.iter().any(|id| &new_device.device_id == id) { warn!( "Device already exists in user's device list \ (userID={}, deviceID={})", &user_id, &new_device.device_id ); return Err(Error::DeviceList(DeviceListError::DeviceAlreadyExists)); } device_ids.push(new_device.device_id.clone()); // Reorder devices (determine primary device again) devices_data.push(new_device.clone()); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Put new device let put_device = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_operation = TransactWriteItem::builder().put(put_device).build(); - Ok(Some(put_device_operation)) + Ok((Some(put_device_operation), None)) }) .await?; Ok(()) } /// Removes device from user's device list. If the device doesn't exist, the /// operation fails. Transactionally generates new device list version. pub async fn remove_device( &self, user_id: impl Into, device_id: impl AsRef, ) -> Result<(), Error> { let user_id: String = user_id.into(); let device_id = device_id.as_ref(); self .transact_update_devicelist(&user_id, |device_ids, mut devices_data| { let device_exists = device_ids.iter().any(|id| id == device_id); if !device_exists { warn!( "Device doesn't exist in user's device list \ (userID={}, deviceID={})", &user_id, device_id ); return Err(Error::DeviceList(DeviceListError::DeviceNotFound)); } device_ids.retain(|id| id != device_id); // Reorder devices (determine primary device again) devices_data.retain(|d| d.device_id != device_id); migration::reorder_device_list(&user_id, device_ids, &devices_data); // Delete device DDB operation let delete_device = Delete::builder() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.clone())) .key( ATTR_ITEM_ID, DeviceIDAttribute(device_id.to_string()).into(), ) .condition_expression( "attribute_exists(#user_id) AND attribute_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let operation = TransactWriteItem::builder().delete(delete_device).build(); - Ok(Some(operation)) + Ok((Some(operation), None)) }) .await?; Ok(()) } /// applies updated device list received from primary device pub async fn apply_devicelist_update( &self, user_id: &str, update: DeviceListUpdate, ) -> Result { let DeviceListUpdate { - devices: new_list, .. + devices: new_list, + timestamp, } = update; self .transact_update_devicelist(user_id, |current_list, _| { // TODO: Add proper validation according to the whitepaper // currently only adding new device is supported (new.len - old.len = 1) let new_set: HashSet<_> = new_list.iter().collect(); let current_set: HashSet<_> = current_list.iter().collect(); // difference is A - B (only new devices) let difference: HashSet<_> = new_set.difference(¤t_set).collect(); if difference.len() != 1 { warn!("Received invalid device list update"); return Err(Error::DeviceList( DeviceListError::InvalidDeviceListUpdate, )); } debug!("Applying device list update. Difference: {:?}", difference); *current_list = new_list; - Ok(None) + Ok((None, Some(timestamp))) }) .await } /// Performs a transactional update of the device list for the user. Afterwards /// generates a new device list and updates the timestamp in the users table. /// This is done in a transaction. Operation fails if the device list has been /// updated concurrently (timestamp mismatch). /// Returns the new device list row that has been saved to database. async fn transact_update_devicelist( &self, user_id: &str, // The closure performing a transactional update of the device list. // It receives two arguments: // 1. A mutable reference to the current device list (ordered device IDs). // 2. Details (full data) of the current devices (unordered). - // The closure should return a transactional DDB - // operation to be performed when updating the device list. + // The closure should return a two-element tuple: + // - (optional) transactional DDB operation to be performed + // when updating the device list. + // - (optional) new device list timestamp. Defaults to `Utc::now()`. action: impl FnOnce( &mut Vec, Vec, - ) -> Result, Error>, + ) -> Result< + (Option, Option>), + Error, + >, ) -> Result { let previous_timestamp = get_current_devicelist_timestamp(self, user_id).await?; let current_devices_data = self.get_current_devices(user_id).await?; let mut device_ids = self .get_current_device_list(user_id) .await? .map(|device_list| device_list.device_ids) .unwrap_or_default(); // Perform the update action, then generate new device list - let operation = action(&mut device_ids, current_devices_data)?; - let new_device_list = DeviceListRow::new(user_id, device_ids); + let (operation, timestamp) = action(&mut device_ids, current_devices_data)?; + + crate::device_list::verify_device_list_timestamp( + previous_timestamp.as_ref(), + timestamp.as_ref(), + )?; + let new_device_list = DeviceListRow::new(user_id, device_ids, timestamp); // Update timestamp in users table let timestamp_update_operation = device_list_timestamp_update_operation( user_id, previous_timestamp, new_device_list.timestamp, ); // Put updated device list (a new version) let put_device_list = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device_list.clone().into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_list_operation = TransactWriteItem::builder().put(put_device_list).build(); let operations = if let Some(operation) = operation { vec![ operation, put_device_list_operation, timestamp_update_operation, ] } else { vec![put_device_list_operation, timestamp_update_operation] }; self .client .transact_write_items() .set_transact_items(Some(operations)) .send() .await .map_err(|e| match DynamoDBError::from(e) { DynamoDBError::TransactionCanceledException( TransactionCanceledException { cancellation_reasons: Some(reasons), .. }, ) if reasons .iter() .any(|reason| reason.code() == Some("ConditionalCheckFailed")) => { Error::DeviceList(DeviceListError::ConcurrentUpdateError) } other => { error!("Device list update transaction failed: {:?}", other); Error::AwsSdk(other) } })?; Ok(new_device_list) } /// Deletes all user data from devices table pub async fn delete_devices_table_rows_for_user( &self, user_id: impl Into, ) -> Result<(), Error> { // 1. get all rows // 2. batch write delete all // we project only the primary keys so we can pass these directly to delete requests let primary_keys = self .client .query() .table_name(devices_table::NAME) .projection_expression("#user_id, #item_id") .key_condition_expression("#user_id = :user_id") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .consistent_read(true) .send() .await .map_err(|e| { error!("Failed to list user's items in devices table: {:?}", e); Error::AwsSdk(e.into()) })? .items .unwrap_or_default(); let delete_requests = primary_keys .into_iter() .map(|item| { let request = DeleteRequest::builder().set_key(Some(item)).build(); WriteRequest::builder().delete_request(request).build() }) .collect::>(); // TODO: We can use the batch write helper from comm-services-lib when integrated for batch in delete_requests.chunks(25) { self .client .batch_write_item() .request_items(devices_table::NAME, batch.to_vec()) .send() .await .map_err(|e| { error!("Failed to batch delete items from devices table: {:?}", e); Error::AwsSdk(e.into()) })?; } Ok(()) } } /// Gets timestamp of user's current device list. Returns None if the user /// doesn't have a device list yet. Storing the timestamp in the users table is /// required for consistency. It's used as a condition when updating the device /// list. async fn get_current_devicelist_timestamp( db: &crate::database::DatabaseClient, user_id: impl Into, ) -> Result>, Error> { let response = db .client .get_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .projection_expression(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME) .send() .await .map_err(|e| { error!("Failed to get user's device list timestamp: {:?}", e); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); let raw_datetime = user_item.remove(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME); // existing records will not have this field when // updating device list for the first time if raw_datetime.is_none() { return Ok(None); } let timestamp = DateTime::::try_from_attr( USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, raw_datetime, )?; Ok(Some(timestamp)) } /// Generates update expression for current device list timestamp in users table. /// The previous timestamp is used as a condition to ensure that the value hasn't changed /// since we got it. This avoids race conditions when updating the device list. fn device_list_timestamp_update_operation( user_id: impl Into, previous_timestamp: Option>, new_timestamp: DateTime, ) -> TransactWriteItem { let update_builder = match previous_timestamp { Some(previous_timestamp) => Update::builder() .condition_expression("#device_list_timestamp = :previous_timestamp") .expression_attribute_values( ":previous_timestamp", AttributeValue::S(previous_timestamp.to_rfc3339()), ), // If there's no previous timestamp, the attribute shouldn't exist yet None => Update::builder() .condition_expression("attribute_not_exists(#device_list_timestamp)"), }; let update = update_builder .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .update_expression("SET #device_list_timestamp = :new_timestamp") .expression_attribute_names( "#device_list_timestamp", USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, ) .expression_attribute_values( ":new_timestamp", AttributeValue::S(new_timestamp.to_rfc3339()), ) .build(); TransactWriteItem::builder().update(update).build() } /// Helper function to query rows by given sort key prefix fn query_rows_with_prefix( db: &crate::database::DatabaseClient, user_id: impl Into, prefix: &'static str, ) -> QueryFluentBuilder { db.client .query() .table_name(devices_table::NAME) .key_condition_expression( "#user_id = :user_id AND begins_with(#item_id, :device_prefix)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values(":user_id", AttributeValue::S(user_id.into())) .expression_attribute_values( ":device_prefix", AttributeValue::S(prefix.to_string()), ) .consistent_read(true) } // Helper module for "migration" code into new device list schema. // We can get rid of this when primary device takes over the responsibility // of managing the device list. mod migration { use std::{cmp::Ordering, collections::HashSet}; use tracing::{debug, error, info}; use super::*; pub(super) fn reorder_device_list( user_id: &str, list: &mut [String], devices_data: &[DeviceRow], ) { if !verify_device_list_match(list, devices_data) { error!("Found corrupt device list for user (userID={})!", user_id); return; } let Some(first_device) = list.first() else { debug!("Skipping device list rotation. Nothing to reorder."); return; }; let Some(primary_device) = determine_primary_device(devices_data) else { info!( "No valid primary device found for user (userID={}).\ Skipping device list reorder.", user_id ); return; }; if first_device == &primary_device.device_id { debug!("Skipping device list reorder. Primary device is already first"); return; } // swap primary device with the first one let Some(primary_device_idx) = list.iter().position(|id| id == &primary_device.device_id) else { error!( "Primary device not found in device list (userID={})", user_id ); return; }; list.swap(0, primary_device_idx); info!("Reordered device list for user (userID={})", user_id); } // checks if device list matches given devices data fn verify_device_list_match( list: &[String], devices_data: &[DeviceRow], ) -> bool { if list.len() != devices_data.len() { error!("Device list length mismatch!"); return false; } let actual_device_ids = devices_data .iter() .map(|device| &device.device_id) .collect::>(); let device_list_set = list.iter().collect::>(); if let Some(corrupt_device_id) = device_list_set .symmetric_difference(&actual_device_ids) .next() { error!( "Device list is corrupt (unknown deviceID={})", corrupt_device_id ); return false; } true } /// Returns reference to primary device (if any) from given list of devices /// or None if there's no valid primary device. fn determine_primary_device(devices: &[DeviceRow]) -> Option<&DeviceRow> { // 1. Find mobile devices with valid token // 2. Prioritize these with latest code version // 3. If there's a tie, select the one with latest login time let mut mobile_devices = devices .iter() .filter(|device| { device.device_type == DeviceType::Ios || device.device_type == DeviceType::Android }) .collect::>(); mobile_devices.sort_by(|a, b| { let code_version_cmp = b.code_version.cmp(&a.code_version); if code_version_cmp == Ordering::Equal { b.login_time.cmp(&a.login_time) } else { code_version_cmp } }); mobile_devices.first().cloned() } #[cfg(test)] mod tests { use super::*; use chrono::Duration; #[test] fn reorder_skips_no_devices() { let mut list = vec![]; reorder_device_list("", &mut list, &[]); assert_eq!(list, Vec::::new()); } #[test] fn reorder_skips_single_device() { let mut list = vec!["test".into()]; let devices_data = vec![create_test_device("test", DeviceType::Web, 0, Utc::now())]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["test"]); } #[test] fn reorder_skips_for_valid_list() { let mut list = vec!["mobile".into(), "web".into()]; let devices_data = vec![ create_test_device("mobile", DeviceType::Android, 1, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn reorder_swaps_primary_device_when_possible() { let mut list = vec!["web".into(), "mobile".into()]; let devices_data = vec![ create_test_device("web", DeviceType::Web, 0, Utc::now()), create_test_device("mobile", DeviceType::Android, 1, Utc::now()), ]; reorder_device_list("", &mut list, &devices_data); assert_eq!(list, vec!["mobile", "web"]); } #[test] fn determine_primary_device_returns_none_for_empty_list() { let devices = vec![]; assert!(determine_primary_device(&devices).is_none()); } #[test] fn determine_primary_device_returns_none_for_web_only() { let devices = vec![create_test_device("web", DeviceType::Web, 0, Utc::now())]; assert!( determine_primary_device(&devices).is_none(), "Primary device should be None for web-only devices" ); } #[test] fn determine_primary_device_prioritizes_mobile() { let devices = vec![ create_test_device("mobile", DeviceType::Android, 0, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile", "Primary device should be mobile" ); } #[test] fn determine_primary_device_prioritizes_latest_code_version() { let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, Utc::now()), create_test_device("mobile2", DeviceType::Android, 2, Utc::now()), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2", "Primary device should be mobile with latest code version" ); } #[test] fn determine_primary_device_prioritizes_latest_login_time() { let devices = vec![ create_test_device("mobile1_today", DeviceType::Ios, 1, Utc::now()), create_test_device( "mobile2_yesterday", DeviceType::Android, 1, Utc::now() - Duration::days(1), ), create_test_device("web", DeviceType::Web, 0, Utc::now()), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1_today", "Primary device should be mobile with latest login time" ); } #[test] fn determine_primary_device_keeps_deterministic_order() { // Given two identical devices, the first one should be selected as primary let today = Utc::now(); let devices_with_latest_code_version = vec![ create_test_device("mobile1", DeviceType::Android, 1, today), create_test_device("mobile2", DeviceType::Android, 1, today), ]; let primary_device = determine_primary_device(&devices_with_latest_code_version) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile1", "Primary device selection should be deterministic" ); } #[test] fn determine_primary_device_all_rules_together() { use DeviceType::{Android, Ios, Web}; let today = Utc::now(); let yesterday = today - Duration::days(1); let devices = vec![ create_test_device("mobile1_today", Android, 1, today), create_test_device("mobile2_today", Android, 2, today), create_test_device("mobile3_yesterday", Ios, 1, yesterday), create_test_device("mobile4_yesterday", Ios, 2, yesterday), create_test_device("web", Web, 5, today), ]; let primary_device = determine_primary_device(&devices) .expect("Primary device should be present"); assert_eq!( primary_device.device_id, "mobile2_today", "Primary device should be mobile with latest code version and login time" ); } fn create_test_device( id: &str, platform: DeviceType, code_version: u64, login_time: DateTime, ) -> DeviceRow { DeviceRow { user_id: "test".into(), device_id: id.into(), device_type: platform, device_key_info: IdentityKeyInfo { key_payload: "".into(), key_payload_signature: "".into(), }, content_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, notif_prekey: Prekey { prekey: "".into(), prekey_signature: "".into(), }, code_version, login_time, } } } } diff --git a/services/identity/src/device_list.rs b/services/identity/src/device_list.rs new file mode 100644 index 000000000..32caf1959 --- /dev/null +++ b/services/identity/src/device_list.rs @@ -0,0 +1,88 @@ +use chrono::{DateTime, Duration, Utc}; + +use crate::{ + constants::DEVICE_LIST_TIMESTAMP_VALID_FOR, error::DeviceListError, +}; + +/// Returns `true` if given timestamp is valid. The timestamp is considered +/// valid under the following condition: +/// - `new_timestamp` is greater than `previous_timestamp` (if provided) +/// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] +/// +/// Note: For Identity-managed device lists, the timestamp can be `None`. +/// Verification is then skipped +fn is_new_timestamp_valid( + previous_timestamp: Option<&DateTime>, + new_timestamp: Option<&DateTime>, +) -> bool { + let Some(new_timestamp) = new_timestamp else { + return true; + }; + + if let Some(previous_timestamp) = previous_timestamp { + if new_timestamp < previous_timestamp { + return false; + } + } + + let timestamp_valid_duration = + Duration::from_std(DEVICE_LIST_TIMESTAMP_VALID_FOR) + .expect("FATAL - Invalid duration constant provided"); + + Utc::now().signed_duration_since(new_timestamp) < timestamp_valid_duration +} + +/// Returns error if new timestamp is invalid. The timestamp is considered +/// valid under the following condition: +/// - `new_timestamp` is greater than `previous_timestamp` (if provided) +/// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] +/// +/// Note: For Identity-managed device lists, the timestamp can be `None`. +/// Verification is then skipped +pub fn verify_device_list_timestamp( + previous_timestamp: Option<&DateTime>, + new_timestamp: Option<&DateTime>, +) -> Result<(), DeviceListError> { + if !is_new_timestamp_valid(previous_timestamp, new_timestamp) { + return Err(DeviceListError::InvalidDeviceListUpdate); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp_validation() { + let valid_timestamp = Utc::now() - Duration::milliseconds(100); + let previous_timestamp = Utc::now() - Duration::seconds(10); + let too_old_timestamp = previous_timestamp - Duration::seconds(1); + let expired_timestamp = Utc::now() - Duration::minutes(20); + + assert!( + verify_device_list_timestamp( + Some(&previous_timestamp), + Some(&valid_timestamp) + ) + .is_ok(), + "Valid timestamp should pass verification" + ); + assert!( + verify_device_list_timestamp( + Some(&previous_timestamp), + Some(&too_old_timestamp) + ) + .is_err(), + "Timestamp older than previous, should fail verification" + ); + assert!( + verify_device_list_timestamp(None, Some(&expired_timestamp)).is_err(), + "Expired timestamp should fail verification" + ); + assert!( + verify_device_list_timestamp(None, None).is_ok(), + "No provided timestamp should pass" + ); + } +} diff --git a/services/identity/src/main.rs b/services/identity/src/main.rs index 35bdc1a27..bd0429db0 100644 --- a/services/identity/src/main.rs +++ b/services/identity/src/main.rs @@ -1,99 +1,100 @@ use comm_lib::aws; use config::Command; use database::DatabaseClient; use tonic::transport::Server; use tonic_web::GrpcWebLayer; mod client_service; mod config; pub mod constants; mod cors; mod database; pub mod ddb_utils; +mod device_list; pub mod error; mod grpc_services; mod grpc_utils; mod id; mod keygen; mod nonce; mod olm; mod regex; mod reserved_users; mod siwe; mod sync_identity_search; mod token; mod tunnelbroker; mod websockets; use constants::IDENTITY_SERVICE_SOCKET_ADDR; use cors::cors_layer; use keygen::generate_and_persist_keypair; use sync_identity_search::sync_index; use tracing::{self, info, Level}; use tracing_subscriber::EnvFilter; use client_service::{ClientService, IdentityClientServiceServer}; use grpc_services::authenticated::AuthenticatedService; use grpc_services::protos::auth::identity_client_service_server::IdentityClientServiceServer as AuthServer; use websockets::errors::BoxedError; #[tokio::main] async fn main() -> Result<(), BoxedError> { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(EnvFilter::DEFAULT_ENV) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; match config::parse_cli_command() { Command::Keygen { dir } => { generate_and_persist_keypair(dir)?; } Command::Server => { config::load_server_config(); let addr = IDENTITY_SERVICE_SOCKET_ADDR.parse()?; let aws_config = aws::config::from_env().region("us-east-2").load().await; let database_client = DatabaseClient::new(&aws_config); let inner_client_service = ClientService::new(database_client.clone()); let client_service = IdentityClientServiceServer::with_interceptor( inner_client_service, grpc_services::shared::version_interceptor, ); let inner_auth_service = AuthenticatedService::new(database_client.clone()); let auth_service = AuthServer::with_interceptor(inner_auth_service, move |req| { grpc_services::authenticated::auth_interceptor(req, &database_client) .and_then(grpc_services::shared::version_interceptor) }); info!("Listening to gRPC traffic on {}", addr); let grpc_server = Server::builder() .accept_http1(true) .layer(cors_layer()) .layer(GrpcWebLayer::new()) .add_service(client_service) .add_service(auth_service) .serve(addr); let websocket_server = websockets::run_server(); return tokio::select! { websocket_result = websocket_server => websocket_result, grpc_result = grpc_server => { grpc_result.map_err(|e| e.into()) }, }; } Command::SyncIdentitySearch => { let aws_config = aws::config::from_env().region("us-east-2").load().await; let database_client = DatabaseClient::new(&aws_config); let sync_result = sync_index(&database_client).await; error::consume_error(sync_result); } } Ok(()) }