diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -62,7 +62,7 @@ let _first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await - .expect("Second keyserver keys request failed") + .expect("keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs --- a/services/identity/src/constants.rs +++ b/services/identity/src/constants.rs @@ -119,24 +119,21 @@ // migration-specific attrs pub const ATTR_CODE_VERSION: &str = "codeVersion"; pub const ATTR_LOGIN_TIME: &str = "loginTime"; + + // one-time key constants + pub const ATTR_CONTENT_OTK_COUNT: &str = "contentOTKCount"; + pub const ATTR_NOTIF_OTK_COUNT: &str = "notifOTKCount"; } // One time keys table, which need to exist in their own table to ensure // atomicity of additions and removals pub mod one_time_keys_table { - // The `PARTITION_KEY` will contain "notification_${deviceID}" or - // "content_${deviceID}" to allow for both key sets to coexist in the same table pub const NAME: &str = "identity-one-time-keys"; - pub const PARTITION_KEY: &str = "deviceID"; - pub const DEVICE_ID: &str = PARTITION_KEY; - pub const SORT_KEY: &str = "oneTimeKey"; - pub const ONE_TIME_KEY: &str = SORT_KEY; + pub const PARTITION_KEY: &str = "userID#deviceID#olmAccount"; + pub const SORT_KEY: &str = "timestamp#keyNumber"; + pub const ATTR_ONE_TIME_KEY: &str = "oneTimeKey"; } -// One-time key constants for device info map -pub const CONTENT_ONE_TIME_KEY: &str = "contentOneTimeKey"; -pub const NOTIF_ONE_TIME_KEY: &str = "notifOneTimeKey"; - // Tokio pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; @@ -235,3 +232,16 @@ pub const VALID_USERNAME_REGEX_STRING: &str = r"^[a-zA-Z0-9][a-zA-Z0-9-_]{0,190}$"; + +// Retry + +// TODO: Replace this with `ExponentialBackoffConfig` from `comm-lib` +pub mod retry { + pub const MAX_ATTEMPTS: usize = 8; + + pub const CONDITIONAL_CHECK_FAILED: &str = "ConditionalCheckFailed"; + pub const TRANSACTION_CONFLICT: &str = "TransactionConflict"; +} + +// One-time keys +pub const ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT: usize = 49; diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,29 +1,40 @@ -use comm_lib::aws::ddb::{ - operation::{ - delete_item::DeleteItemOutput, get_item::GetItemOutput, - put_item::PutItemOutput, query::QueryOutput, - }, - primitives::Blob, - types::{AttributeValue, PutRequest, ReturnConsumedCapacity, WriteRequest}, -}; +use comm_lib::aws::ddb::types::Delete; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; +use comm_lib::{ + aws::{ + ddb::{ + operation::{ + delete_item::DeleteItemOutput, get_item::GetItemOutput, + put_item::PutItemOutput, query::QueryOutput, + }, + primitives::Blob, + types::{ + AttributeValue, PutRequest, TransactWriteItem, Update, WriteRequest, + }, + }, + DynamoDBError, + }, + database::parse_int_attribute, +}; use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; +pub use crate::database::device_list::DeviceIDAttribute; +use crate::ddb_utils::into_one_time_update_requests; use crate::{ constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, ddb_utils::EthereumIdentity, reserved_users::UserDetail, }; use crate::{ ddb_utils::{ - create_one_time_key_partition_key, into_one_time_put_requests, Identifier, - OlmAccountType, + create_one_time_key_partition_key, into_one_time_put_requests, + is_transaction_retryable, Identifier, OlmAccountType, }, grpc_services::protos, }; @@ -44,7 +55,8 @@ ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, - RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, + ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, RESERVED_USERNAMES_TABLE, + RESERVED_USERNAMES_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, @@ -169,7 +181,6 @@ let device_key_upload = registration_state.flattened_device_key_upload; let user_id = self .add_user_to_users_table( - device_key_upload.clone(), Some((registration_state.username, Blob::new(password_file))), None, registration_state.user_id, @@ -180,12 +191,21 @@ self .add_device( &user_id, - device_key_upload, + device_key_upload.clone(), code_version, access_token_creation_time, ) .await?; + self + .append_one_time_prekeys( + &user_id, + &device_key_upload.device_id_key, + &device_key_upload.content_one_time_keys, + &device_key_upload.notif_one_time_keys, + ) + .await?; + Ok(user_id) } @@ -205,7 +225,6 @@ }; let user_id = self .add_user_to_users_table( - flattened_device_key_upload.clone(), None, Some(wallet_identity), user_id, @@ -216,18 +235,26 @@ self .add_device( &user_id, - flattened_device_key_upload, + flattened_device_key_upload.clone(), code_version, access_token_creation_time, ) .await?; + self + .append_one_time_prekeys( + &user_id, + &flattened_device_key_upload.device_id_key, + &flattened_device_key_upload.content_one_time_keys, + &flattened_device_key_upload.notif_one_time_keys, + ) + .await?; + Ok(user_id) } async fn add_user_to_users_table( &self, - flattened_device_key_upload: FlattenedDeviceKeyUpload, username_and_password_file: Option<(String, Blob)>, wallet_identity: Option, user_id: Option, @@ -280,14 +307,6 @@ .await .map_err(|e| Error::AwsSdk(e.into()))?; - self - .append_one_time_prekeys( - flattened_device_key_upload.device_id_key, - flattened_device_key_upload.content_one_time_keys, - flattened_device_key_upload.notif_one_time_keys, - ) - .await?; - Ok(user_id) } @@ -323,7 +342,7 @@ // add device to the new device list self .add_device( - user_id, + &user_id, flattened_device_key_upload, code_version, access_token_creation_time, @@ -332,9 +351,10 @@ self .append_one_time_prekeys( - device_id, - content_one_time_keys, - notif_one_time_keys, + &user_id, + &device_id, + &content_one_time_keys, + ¬if_one_time_keys, ) .await?; @@ -359,7 +379,7 @@ let user_id: String = user_info.take_attr(USERS_TABLE_PARTITION_KEY)?; let social_proof: Option = user_info.take_attr(USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME)?; - let user_devices = self.get_current_devices(user_id).await?; + let user_devices = self.get_current_devices(user_id.clone()).await?; let maybe_keyserver_device = user_devices .into_iter() .find(|device| device.device_type == GrpcDeviceType::Keyserver); @@ -373,10 +393,14 @@ ); let notif_one_time_key: Option = self - .get_one_time_key(&keyserver.device_id, OlmAccountType::Notification) + .get_one_time_key( + &user_id, + &keyserver.device_id, + OlmAccountType::Notification, + ) .await?; let content_one_time_key: Option = self - .get_one_time_key(&keyserver.device_id, OlmAccountType::Content) + .get_one_time_key(&user_id, &keyserver.device_id, OlmAccountType::Content) .await?; debug!( @@ -422,14 +446,19 @@ /// key pub async fn get_one_time_key( &self, + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result, Error> { + use crate::constants::devices_table; use crate::constants::one_time_keys_table as otk_table; + use crate::constants::retry; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; - let query_result = self.get_one_time_keys(device_id, account_type).await?; - let items = query_result.items(); + let attr_otk_count = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block @@ -442,124 +471,255 @@ }); } - // If no one-time keys exist, or if there aren't enough, request more. - // Additionally, if no one-time keys exist, return early. - let item_vec = if let Some(items_list) = items { - if items_list.len() < ONE_TIME_KEY_MINIMUM_THRESHOLD { + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let otk_count = + self.get_otk_count(user_id, device_id, account_type).await?; + if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD { spawn_refresh_keys_task(device_id); } - items_list - } else { - debug!("Unable to find {:?} one-time key", account_type); - spawn_refresh_keys_task(device_id); - return Ok(None); - }; + if otk_count < 1 { + return Ok(None); + } - let mut result = None; - // Attempt to delete the one-time keys individually, a successful delete - // mints the one-time key to the requester - for item in item_vec { - let pk: String = item.get_attr(otk_table::PARTITION_KEY)?; - let otk: String = item.get_attr(otk_table::SORT_KEY)?; - - let composite_key = HashMap::from([ - (otk_table::PARTITION_KEY.to_string(), AttributeValue::S(pk)), - ( - otk_table::SORT_KEY.to_string(), - AttributeValue::S(otk.clone()), - ), - ]); - - debug!("Attempting to delete a {:?} one time key", account_type); - match self - .client - .delete_item() - .set_key(Some(composite_key)) + let query_result = self + .get_one_time_keys(user_id, device_id, account_type) + .await?; + let mut items = query_result.items.unwrap_or_default(); + let mut item = items.pop().unwrap_or_default(); + let pk = item.take_attr(otk_table::PARTITION_KEY)?; + let sk = item.take_attr(otk_table::SORT_KEY)?; + let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; + + let delete_otk = Delete::builder() .table_name(otk_table::NAME) + .key(otk_table::PARTITION_KEY, AttributeValue::S(pk)) + .key(otk_table::SORT_KEY, AttributeValue::S(sk)) + .build(); + + let delete_otk_operation = + TransactWriteItem::builder().delete(delete_otk).build(); + + let update_otk_count = Update::builder() + .table_name(devices_table::NAME) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) + .expression_attribute_values( + ":decrement_val", + AttributeValue::N("-1".to_string()), + ) + .condition_expression(format!("{} = :old_val", attr_otk_count)) + .expression_attribute_values( + ":old_val", + AttributeValue::N(otk_count.to_string()), + ) + .build(); + + let update_otk_count_operation = TransactWriteItem::builder() + .update(update_otk_count) + .build(); + + let transaction = self + .client + .transact_write_items() + .set_transact_items(Some(vec![ + delete_otk_operation, + update_otk_count_operation, + ])) .send() - .await - { - Ok(_) => { - result = Some(otk); - break; - } - // This err should only happen if a delete occurred between the read - // above and this delete + .await; + + match transaction { + Ok(_) => return Ok(Some(otk)), Err(e) => { - debug!("Unable to delete key: {:?}", e); - continue; + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = + vec![retry::CONDITIONAL_CHECK_FAILED, retry::TRANSACTION_CONFLICT] + .into_iter() + .collect(); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while retrieving one-time key - retrying"); + } else { + error!( + "One-time key retrieval transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } } } } - - // Return deleted key - Ok(result) } pub async fn get_one_time_keys( &self, + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::one_time_keys_table::*; - // Add related prefix to partition key to grab the correct result set let partition_key = - create_one_time_key_partition_key(device_id, account_type); + create_one_time_key_partition_key(user_id, device_id, account_type); self .client .query() .table_name(NAME) - .key_condition_expression(format!("{} = :pk", PARTITION_KEY)) + .key_condition_expression("#pk = :pk") + .expression_attribute_names("#pk", PARTITION_KEY) .expression_attribute_values(":pk", AttributeValue::S(partition_key)) - .return_consumed_capacity(ReturnConsumedCapacity::Total) + .limit(1) .send() .await .map_err(|e| Error::AwsSdk(e.into())) - .map(|response| { - let capacity_units = response - .consumed_capacity() - .and_then(|it| it.capacity_units()); - debug!("OTK read consumed capacity: {:?}", capacity_units); - response - }) } pub async fn append_one_time_prekeys( &self, - device_id: String, - content_one_time_keys: Vec, - notif_one_time_keys: Vec, + user_id: &str, + device_id: &str, + content_one_time_keys: &Vec, + notif_one_time_keys: &Vec, ) -> Result<(), Error> { - use crate::constants::one_time_keys_table; + use crate::constants::retry; - let mut otk_requests = into_one_time_put_requests( + let num_content_keys = content_one_time_keys.len(); + let num_notif_keys = notif_one_time_keys.len(); + + if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + { + return Err(Error::OneTimeKeyUploadLimitExceeded); + } + + let current_time = chrono::Utc::now(); + + let content_otk_requests = into_one_time_put_requests( + &user_id, &device_id, content_one_time_keys, OlmAccountType::Content, + current_time, ); - let notif_otk_requests: Vec = into_one_time_put_requests( + let notif_otk_requests = into_one_time_put_requests( + &user_id, &device_id, notif_one_time_keys, OlmAccountType::Notification, + current_time, ); - otk_requests.extend(notif_otk_requests); - // BatchWriteItem has a hard limit of 25 writes per call - for requests in otk_requests.chunks(25) { - self + let update_otk_count_operation = into_one_time_update_requests( + &user_id, + &device_id, + num_content_keys, + num_notif_keys, + ); + + let mut operations = Vec::new(); + operations.extend_from_slice(&content_otk_requests); + operations.extend_from_slice(¬if_otk_requests); + operations.push(update_otk_count_operation); + + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let transaction = self .client - .batch_write_item() - .request_items(one_time_keys_table::NAME, requests.to_vec()) + .transact_write_items() + .set_transact_items(Some(operations.clone())) .send() - .await - .map_err(|e| Error::AwsSdk(e.into()))?; + .await; + + match transaction { + Ok(_) => break, + Err(e) => { + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = + vec![retry::TRANSACTION_CONFLICT].into_iter().collect(); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while uploading one-time keys - retrying"); + } else { + error!( + "One-time key upload transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } + } + } } Ok(()) } + async fn get_otk_count( + &self, + user_id: &str, + device_id: &str, + account_type: OlmAccountType, + ) -> Result { + use crate::constants::devices_table; + + let attr_name = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; + + let response = self + .client + .get_item() + .table_name(devices_table::NAME) + .projection_expression(attr_name) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .send() + .await + .map_err(|e| { + error!("Failed to get user's OTK count: {:?}", e); + Error::AwsSdk(e.into()) + })?; + + let mut user_item = response.item.unwrap_or_default(); + match parse_int_attribute(attr_name, user_item.remove(attr_name)) { + Ok(num) => Ok(num), + Err(DBItemError { + attribute_error: DBItemAttributeError::Missing, + .. + }) => Ok(0), + Err(e) => Err(Error::Attribute(e)), + } + } + pub async fn update_user_password( &self, user_id: String, @@ -893,10 +1053,14 @@ if get_one_time_keys { for (device_id_key, device_keys) in devices_response.iter_mut() { device_keys.notif_one_time_key = self - .get_one_time_key(device_id_key, OlmAccountType::Notification) + .get_one_time_key( + user_id, + device_id_key, + OlmAccountType::Notification, + ) .await?; device_keys.content_one_time_key = self - .get_one_time_key(device_id_key, OlmAccountType::Content) + .get_one_time_key(user_id, device_id_key, OlmAccountType::Content) .await?; } } diff --git a/services/identity/src/database/device_list.rs b/services/identity/src/database/device_list.rs --- a/services/identity/src/database/device_list.rs +++ b/services/identity/src/database/device_list.rs @@ -30,6 +30,8 @@ use super::DatabaseClient; +// We omit the content and notif one-time key count attributes from this struct +// because they are internal helpers and are not provided by users #[derive(Clone, Debug)] pub struct DeviceRow { pub user_id: String, @@ -114,7 +116,7 @@ } // helper structs for converting to/from attribute values for sort key (a.k.a itemID) -struct DeviceIDAttribute(String); +pub struct DeviceIDAttribute(pub String); struct DeviceListKeyAttribute(DateTime); impl From for AttributeValue { @@ -686,8 +688,9 @@ ) -> Result<(), Error> { let content_one_time_keys = device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = device_key_upload.notif_one_time_keys.clone(); + let user_id_string = user_id.into(); let new_device = DeviceRow::from_device_key_upload( - user_id, + user_id_string.clone(), device_key_upload, code_version, login_time, @@ -708,9 +711,10 @@ self .append_one_time_prekeys( - device_id, - content_one_time_keys, - notif_one_time_keys, + &user_id_string, + &device_id, + &content_one_time_keys, + ¬if_one_time_keys, ) .await?; diff --git a/services/identity/src/ddb_utils.rs b/services/identity/src/ddb_utils.rs --- a/services/identity/src/ddb_utils.rs +++ b/services/identity/src/ddb_utils.rs @@ -1,14 +1,23 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use comm_lib::{ - aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}, + aws::{ + ddb::types::{ + error::TransactionCanceledException, AttributeValue, Put, + TransactWriteItem, Update, + }, + DynamoDBError, + }, database::{AttributeExtractor, AttributeMap}, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::iter::IntoIterator; -use crate::constants::{ - USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_USERNAME_ATTRIBUTE, - USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, +use crate::{ + constants::{ + USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_USERNAME_ATTRIBUTE, + USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, + }, + database::DeviceIDAttribute, }; #[derive(Copy, Clone, Debug)] @@ -17,55 +26,119 @@ Notification, } -// Prefix the one time keys with the olm account variant. This allows for a single -// DDB table to contain both notification and content keys for a device. pub fn create_one_time_key_partition_key( + user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> String { - match account_type { - OlmAccountType::Content => format!("content_{device_id}"), - OlmAccountType::Notification => format!("notification_{device_id}"), - } + let account_type = match account_type { + OlmAccountType::Content => "content", + OlmAccountType::Notification => "notif", + }; + format!("{user_id}#{device_id}#{account_type}") +} + +fn create_one_time_key_sort_key( + key_number: usize, + current_time: DateTime, +) -> String { + let timestamp = current_time.to_rfc3339(); + format!("{timestamp}#{:02}", key_number) } fn create_one_time_key_put_request( + user_id: &str, device_id: &str, one_time_key: String, + key_number: usize, account_type: OlmAccountType, -) -> WriteRequest { + current_time: DateTime, +) -> Put { use crate::constants::one_time_keys_table::*; let partition_key = - create_one_time_key_partition_key(device_id, account_type); - let builder = PutRequest::builder(); + create_one_time_key_partition_key(user_id, device_id, account_type); + let sort_key = create_one_time_key_sort_key(key_number, current_time); + + let builder = Put::builder(); let attrs = HashMap::from([ (PARTITION_KEY.to_string(), AttributeValue::S(partition_key)), - (SORT_KEY.to_string(), AttributeValue::S(one_time_key)), + (SORT_KEY.to_string(), AttributeValue::S(sort_key)), + ( + ATTR_ONE_TIME_KEY.to_string(), + AttributeValue::S(one_time_key), + ), ]); - let put_request = builder.set_item(Some(attrs)).build(); - - WriteRequest::builder().put_request(put_request).build() + builder.table_name(NAME).set_item(Some(attrs)).build() } pub fn into_one_time_put_requests( + user_id: &str, device_id: &str, one_time_keys: T, account_type: OlmAccountType, -) -> Vec + current_time: DateTime, +) -> Vec where T: IntoIterator, ::Item: ToString, { one_time_keys .into_iter() - .map(|otk| { - create_one_time_key_put_request(device_id, otk.to_string(), account_type) + .enumerate() + .map(|(index, otk)| { + create_one_time_key_put_request( + user_id, + device_id, + otk.to_string(), + index, + account_type, + current_time, + ) }) + .map(|put_request| TransactWriteItem::builder().put(put_request).build()) .collect() } +pub fn into_one_time_update_requests( + user_id: &str, + device_id: &str, + num_content_keys: usize, + num_notif_keys: usize, +) -> TransactWriteItem { + use crate::constants::devices_table; + + let update_otk_count = Update::builder() + .table_name(devices_table::NAME) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .update_expression(format!( + "ADD {} :num_content, {} :num_notif", + devices_table::ATTR_CONTENT_OTK_COUNT, + devices_table::ATTR_NOTIF_OTK_COUNT + )) + .expression_attribute_values( + ":num_content", + AttributeValue::N(num_content_keys.to_string()), + ) + .expression_attribute_values( + ":num_notif", + AttributeValue::N(num_notif_keys.to_string()), + ) + .build(); + + TransactWriteItem::builder() + .update(update_otk_count) + .build() +} + pub trait DateTimeExt { fn from_utc_timestamp_millis(timestamp: i64) -> Option>; } @@ -114,3 +187,63 @@ } } } + +pub fn is_transaction_retryable( + err: &DynamoDBError, + retryable_codes: &HashSet<&str>, +) -> bool { + match err { + DynamoDBError::TransactionCanceledException( + TransactionCanceledException { + cancellation_reasons: Some(reasons), + .. + }, + ) => reasons.iter().any(|reason| { + retryable_codes.contains(&reason.code().unwrap_or_default()) + }), + _ => false, + } +} + +#[cfg(test)] +mod tests { + use crate::constants::one_time_keys_table; + + use super::*; + + #[test] + fn test_into_one_time_put_requests() { + let otks = ["not", "real", "keys"]; + let current_time = Utc::now(); + + let requests = into_one_time_put_requests( + "abc", + "123", + otks, + OlmAccountType::Content, + current_time, + ); + + assert_eq!(requests.len(), 3); + + for (index, request) in requests.into_iter().enumerate() { + let mut item = request.put.unwrap().item.unwrap(); + assert_eq!( + item.remove(one_time_keys_table::PARTITION_KEY).unwrap(), + AttributeValue::S("abc#123#content".to_string()) + ); + assert_eq!( + item.remove(one_time_keys_table::SORT_KEY).unwrap(), + AttributeValue::S(format!( + "{}#{:02}", + current_time.to_rfc3339(), + index + )) + ); + assert_eq!( + item.remove(one_time_keys_table::ATTR_ONE_TIME_KEY).unwrap(), + AttributeValue::S(otks[index].to_string()) + ); + } + } +} diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -24,6 +24,10 @@ Serde(serde_json::Error), #[display(...)] CannotOverwrite, + #[display(...)] + OneTimeKeyUploadLimitExceeded, + #[display(...)] + MaxRetriesExceeded, } #[derive(Debug, derive_more::Display, derive_more::Error)] diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -234,9 +234,10 @@ self .db_client .append_one_time_prekeys( - device_id, - message.content_one_time_prekeys, - message.notif_one_time_prekeys, + &user_id, + &device_id, + &message.content_one_time_prekeys, + &message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; diff --git a/services/terraform/modules/shared/dynamodb.tf b/services/terraform/modules/shared/dynamodb.tf --- a/services/terraform/modules/shared/dynamodb.tf +++ b/services/terraform/modules/shared/dynamodb.tf @@ -238,17 +238,17 @@ resource "aws_dynamodb_table" "identity-one-time-keys" { name = "identity-one-time-keys" - hash_key = "deviceID" - range_key = "oneTimeKey" + hash_key = "userID#deviceID#olmAccount" + range_key = "timestamp#keyNumber" billing_mode = "PAY_PER_REQUEST" attribute { - name = "deviceID" + name = "userID#deviceID#olmAccount" type = "S" } attribute { - name = "oneTimeKey" + name = "timestamp#keyNumber" type = "S" } }