diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,48 +1,31 @@ -use comm_lib::aws::ddb::types::Delete; +use comm_lib::aws::ddb::{ + operation::{ + delete_item::DeleteItemOutput, get_item::GetItemOutput, + put_item::PutItemOutput, query::QueryOutput, + }, + primitives::Blob, + types::{AttributeValue, PutRequest, WriteRequest}, +}; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; -use comm_lib::{ - aws::{ - ddb::{ - operation::{ - delete_item::DeleteItemOutput, get_item::GetItemOutput, - put_item::PutItemOutput, query::QueryOutput, - }, - primitives::Blob, - types::{ - AttributeValue, PutRequest, TransactWriteItem, Update, WriteRequest, - }, - }, - DynamoDBError, - }, - database::parse_int_attribute, -}; use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; pub use crate::database::device_list::DeviceIDAttribute; -use crate::ddb_utils::into_one_time_update_requests; -use crate::olm::is_valid_olm_key; use crate::{ constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME, ddb_utils::EthereumIdentity, reserved_users::UserDetail, siwe::SocialProof, }; use crate::{ - ddb_utils::{ - create_one_time_key_partition_key, into_one_time_put_requests, - is_transaction_retryable, Identifier, OlmAccountType, - }, + ddb_utils::{Identifier, OlmAccountType}, grpc_services::protos, }; -use crate::{ - error::{consume_error, Error}, - grpc_utils::DeviceKeysInfo, -}; +use crate::{error::Error, grpc_utils::DeviceKeysInfo}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; @@ -56,8 +39,7 @@ ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, - ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, RESERVED_USERNAMES_TABLE, - RESERVED_USERNAMES_TABLE_PARTITION_KEY, + RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, @@ -72,6 +54,7 @@ mod device_list; mod farcaster; +mod one_time_keys; mod workflows; pub use device_list::{DeviceListRow, DeviceListUpdate, DeviceRow}; @@ -428,292 +411,6 @@ Ok(maybe_keyserver_device_id) } - /// Will "mint" a single one-time key by attempting to successfully delete a - /// key - pub async fn get_one_time_key( - &self, - user_id: &str, - device_id: &str, - account_type: OlmAccountType, - ) -> Result, Error> { - use crate::constants::devices_table; - use crate::constants::one_time_keys_table as otk_table; - use crate::constants::retry; - use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; - - let attr_otk_count = match account_type { - OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, - OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, - }; - - fn spawn_refresh_keys_task(device_id: &str) { - // Clone the string slice to move into the async block - let device_id = device_id.to_string(); - tokio::spawn(async move { - debug!("Attempting to request more keys for device: {}", &device_id); - let result = - crate::tunnelbroker::send_refresh_keys_request(&device_id).await; - consume_error(result); - }); - } - - // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` - // in `comm-lib` to handle transactions with retries - let mut attempt = 0; - - loop { - attempt += 1; - if attempt > retry::MAX_ATTEMPTS { - return Err(Error::MaxRetriesExceeded); - } - - let otk_count = - self.get_otk_count(user_id, device_id, account_type).await?; - if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD { - spawn_refresh_keys_task(device_id); - } - if otk_count < 1 { - return Ok(None); - } - - let query_result = self - .get_one_time_keys(user_id, device_id, account_type) - .await?; - let mut items = query_result.items.unwrap_or_default(); - let mut item = items.pop().unwrap_or_default(); - let pk = item.take_attr(otk_table::PARTITION_KEY)?; - let sk = item.take_attr(otk_table::SORT_KEY)?; - let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; - - let delete_otk = Delete::builder() - .table_name(otk_table::NAME) - .key(otk_table::PARTITION_KEY, AttributeValue::S(pk)) - .key(otk_table::SORT_KEY, AttributeValue::S(sk)) - .build(); - - let delete_otk_operation = - TransactWriteItem::builder().delete(delete_otk).build(); - - let update_otk_count = Update::builder() - .table_name(devices_table::NAME) - .key( - devices_table::ATTR_USER_ID, - AttributeValue::S(user_id.to_string()), - ) - .key( - devices_table::ATTR_ITEM_ID, - DeviceIDAttribute(device_id.into()).into(), - ) - .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) - .expression_attribute_values( - ":decrement_val", - AttributeValue::N("-1".to_string()), - ) - .condition_expression(format!("{} = :old_val", attr_otk_count)) - .expression_attribute_values( - ":old_val", - AttributeValue::N(otk_count.to_string()), - ) - .build(); - - let update_otk_count_operation = TransactWriteItem::builder() - .update(update_otk_count) - .build(); - - let transaction = self - .client - .transact_write_items() - .set_transact_items(Some(vec![ - delete_otk_operation, - update_otk_count_operation, - ])) - .send() - .await; - - match transaction { - Ok(_) => return Ok(Some(otk)), - Err(e) => { - let dynamo_db_error = DynamoDBError::from(e); - let retryable_codes = HashSet::from([ - retry::CONDITIONAL_CHECK_FAILED, - retry::TRANSACTION_CONFLICT, - ]); - if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { - info!("Encountered transaction conflict while retrieving one-time key - retrying"); - } else { - error!( - "One-time key retrieval transaction failed: {:?}", - dynamo_db_error - ); - return Err(Error::AwsSdk(dynamo_db_error)); - } - } - } - } - } - - pub async fn get_one_time_keys( - &self, - user_id: &str, - device_id: &str, - account_type: OlmAccountType, - ) -> Result { - use crate::constants::one_time_keys_table::*; - - let partition_key = - create_one_time_key_partition_key(user_id, device_id, account_type); - - self - .client - .query() - .table_name(NAME) - .key_condition_expression("#pk = :pk") - .expression_attribute_names("#pk", PARTITION_KEY) - .expression_attribute_values(":pk", AttributeValue::S(partition_key)) - .limit(1) - .send() - .await - .map_err(|e| Error::AwsSdk(e.into())) - } - - pub async fn append_one_time_prekeys( - &self, - user_id: &str, - device_id: &str, - content_one_time_keys: &Vec, - notif_one_time_keys: &Vec, - ) -> Result<(), Error> { - use crate::constants::retry; - - let num_content_keys = content_one_time_keys.len(); - let num_notif_keys = notif_one_time_keys.len(); - - if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT - || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT - { - return Err(Error::OneTimeKeyUploadLimitExceeded); - } - - if content_one_time_keys - .iter() - .any(|otk| !is_valid_olm_key(otk)) - || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk)) - { - debug!("Invalid one-time key format"); - return Err(Error::InvalidFormat); - } - - let current_time = chrono::Utc::now(); - - let content_otk_requests = into_one_time_put_requests( - user_id, - device_id, - content_one_time_keys, - OlmAccountType::Content, - current_time, - ); - let notif_otk_requests = into_one_time_put_requests( - user_id, - device_id, - notif_one_time_keys, - OlmAccountType::Notification, - current_time, - ); - - let update_otk_count_operation = into_one_time_update_requests( - user_id, - device_id, - num_content_keys, - num_notif_keys, - ); - - let mut operations = Vec::new(); - operations.extend_from_slice(&content_otk_requests); - operations.extend_from_slice(¬if_otk_requests); - operations.push(update_otk_count_operation); - - // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` - // in `comm-lib` to handle transactions with retries - let mut attempt = 0; - - loop { - attempt += 1; - if attempt > retry::MAX_ATTEMPTS { - return Err(Error::MaxRetriesExceeded); - } - - let transaction = self - .client - .transact_write_items() - .set_transact_items(Some(operations.clone())) - .send() - .await; - - match transaction { - Ok(_) => break, - Err(e) => { - let dynamo_db_error = DynamoDBError::from(e); - let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]); - if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { - info!("Encountered transaction conflict while uploading one-time keys - retrying"); - } else { - error!( - "One-time key upload transaction failed: {:?}", - dynamo_db_error - ); - return Err(Error::AwsSdk(dynamo_db_error)); - } - } - } - } - - Ok(()) - } - - async fn get_otk_count( - &self, - user_id: &str, - device_id: &str, - account_type: OlmAccountType, - ) -> Result { - use crate::constants::devices_table; - - let attr_name = match account_type { - OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, - OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, - }; - - let response = self - .client - .get_item() - .table_name(devices_table::NAME) - .projection_expression(attr_name) - .key( - devices_table::ATTR_USER_ID, - AttributeValue::S(user_id.to_string()), - ) - .key( - devices_table::ATTR_ITEM_ID, - DeviceIDAttribute(device_id.into()).into(), - ) - .send() - .await - .map_err(|e| { - error!("Failed to get user's OTK count: {:?}", e); - Error::AwsSdk(e.into()) - })?; - - let mut user_item = response.item.unwrap_or_default(); - match parse_int_attribute(attr_name, user_item.remove(attr_name)) { - Ok(num) => Ok(num), - Err(DBItemError { - attribute_error: DBItemAttributeError::Missing, - .. - }) => Ok(0), - Err(e) => Err(Error::Attribute(e)), - } - } - pub async fn update_user_password( &self, user_id: String, diff --git a/services/identity/src/database/one_time_keys.rs b/services/identity/src/database/one_time_keys.rs new file mode 100644 --- /dev/null +++ b/services/identity/src/database/one_time_keys.rs @@ -0,0 +1,316 @@ +use std::collections::HashSet; + +use comm_lib::{ + aws::{ + ddb::{ + operation::query::QueryOutput, + types::{AttributeValue, Delete, TransactWriteItem, Update}, + }, + DynamoDBError, + }, + database::{ + parse_int_attribute, AttributeExtractor, DBItemAttributeError, DBItemError, + }, +}; +use tracing::{debug, error, info}; + +use crate::{ + constants::ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, + database::DeviceIDAttribute, + ddb_utils::{ + create_one_time_key_partition_key, into_one_time_put_requests, + into_one_time_update_requests, is_transaction_retryable, OlmAccountType, + }, + error::{consume_error, Error}, + olm::is_valid_olm_key, +}; + +use super::DatabaseClient; + +impl DatabaseClient { + /// Will "mint" a single one-time key by attempting to successfully delete a + /// key + pub(super) async fn get_one_time_key( + &self, + user_id: &str, + device_id: &str, + account_type: OlmAccountType, + ) -> Result, Error> { + use crate::constants::devices_table; + use crate::constants::one_time_keys_table as otk_table; + use crate::constants::retry; + use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; + + let attr_otk_count = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; + + fn spawn_refresh_keys_task(device_id: &str) { + // Clone the string slice to move into the async block + let device_id = device_id.to_string(); + tokio::spawn(async move { + debug!("Attempting to request more keys for device: {}", &device_id); + let result = + crate::tunnelbroker::send_refresh_keys_request(&device_id).await; + consume_error(result); + }); + } + + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let otk_count = + self.get_otk_count(user_id, device_id, account_type).await?; + if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD { + spawn_refresh_keys_task(device_id); + } + if otk_count < 1 { + return Ok(None); + } + + let query_result = self + .get_one_time_keys(user_id, device_id, account_type) + .await?; + let mut items = query_result.items.unwrap_or_default(); + let mut item = items.pop().unwrap_or_default(); + let pk = item.take_attr(otk_table::PARTITION_KEY)?; + let sk = item.take_attr(otk_table::SORT_KEY)?; + let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; + + let delete_otk = Delete::builder() + .table_name(otk_table::NAME) + .key(otk_table::PARTITION_KEY, AttributeValue::S(pk)) + .key(otk_table::SORT_KEY, AttributeValue::S(sk)) + .build(); + + let delete_otk_operation = + TransactWriteItem::builder().delete(delete_otk).build(); + + let update_otk_count = Update::builder() + .table_name(devices_table::NAME) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) + .expression_attribute_values( + ":decrement_val", + AttributeValue::N("-1".to_string()), + ) + .condition_expression(format!("{} = :old_val", attr_otk_count)) + .expression_attribute_values( + ":old_val", + AttributeValue::N(otk_count.to_string()), + ) + .build(); + + let update_otk_count_operation = TransactWriteItem::builder() + .update(update_otk_count) + .build(); + + let transaction = self + .client + .transact_write_items() + .set_transact_items(Some(vec![ + delete_otk_operation, + update_otk_count_operation, + ])) + .send() + .await; + + match transaction { + Ok(_) => return Ok(Some(otk)), + Err(e) => { + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = HashSet::from([ + retry::CONDITIONAL_CHECK_FAILED, + retry::TRANSACTION_CONFLICT, + ]); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while retrieving one-time key - retrying"); + } else { + error!( + "One-time key retrieval transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } + } + } + } + } + + async fn get_one_time_keys( + &self, + user_id: &str, + device_id: &str, + account_type: OlmAccountType, + ) -> Result { + use crate::constants::one_time_keys_table::*; + + let partition_key = + create_one_time_key_partition_key(user_id, device_id, account_type); + + self + .client + .query() + .table_name(NAME) + .key_condition_expression("#pk = :pk") + .expression_attribute_names("#pk", PARTITION_KEY) + .expression_attribute_values(":pk", AttributeValue::S(partition_key)) + .limit(1) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into())) + } + + pub async fn append_one_time_prekeys( + &self, + user_id: &str, + device_id: &str, + content_one_time_keys: &Vec, + notif_one_time_keys: &Vec, + ) -> Result<(), Error> { + use crate::constants::retry; + + let num_content_keys = content_one_time_keys.len(); + let num_notif_keys = notif_one_time_keys.len(); + + if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT + { + return Err(Error::OneTimeKeyUploadLimitExceeded); + } + + if content_one_time_keys + .iter() + .any(|otk| !is_valid_olm_key(otk)) + || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk)) + { + debug!("Invalid one-time key format"); + return Err(Error::InvalidFormat); + } + + let current_time = chrono::Utc::now(); + + let content_otk_requests = into_one_time_put_requests( + user_id, + device_id, + content_one_time_keys, + OlmAccountType::Content, + current_time, + ); + let notif_otk_requests = into_one_time_put_requests( + user_id, + device_id, + notif_one_time_keys, + OlmAccountType::Notification, + current_time, + ); + + let update_otk_count_operation = into_one_time_update_requests( + user_id, + device_id, + num_content_keys, + num_notif_keys, + ); + + let mut operations = Vec::new(); + operations.extend_from_slice(&content_otk_requests); + operations.extend_from_slice(¬if_otk_requests); + operations.push(update_otk_count_operation); + + // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` + // in `comm-lib` to handle transactions with retries + let mut attempt = 0; + + loop { + attempt += 1; + if attempt > retry::MAX_ATTEMPTS { + return Err(Error::MaxRetriesExceeded); + } + + let transaction = self + .client + .transact_write_items() + .set_transact_items(Some(operations.clone())) + .send() + .await; + + match transaction { + Ok(_) => break, + Err(e) => { + let dynamo_db_error = DynamoDBError::from(e); + let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]); + if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { + info!("Encountered transaction conflict while uploading one-time keys - retrying"); + } else { + error!( + "One-time key upload transaction failed: {:?}", + dynamo_db_error + ); + return Err(Error::AwsSdk(dynamo_db_error)); + } + } + } + } + + Ok(()) + } + + async fn get_otk_count( + &self, + user_id: &str, + device_id: &str, + account_type: OlmAccountType, + ) -> Result { + use crate::constants::devices_table; + + let attr_name = match account_type { + OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, + OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, + }; + + let response = self + .client + .get_item() + .table_name(devices_table::NAME) + .projection_expression(attr_name) + .key( + devices_table::ATTR_USER_ID, + AttributeValue::S(user_id.to_string()), + ) + .key( + devices_table::ATTR_ITEM_ID, + DeviceIDAttribute(device_id.into()).into(), + ) + .send() + .await + .map_err(|e| { + error!("Failed to get user's OTK count: {:?}", e); + Error::AwsSdk(e.into()) + })?; + + let mut user_item = response.item.unwrap_or_default(); + match parse_int_attribute(attr_name, user_item.remove(attr_name)) { + Ok(num) => Ok(num), + Err(DBItemError { + attribute_error: DBItemAttributeError::Missing, + .. + }) => Ok(0), + Err(e) => Err(Error::Attribute(e)), + } + } +}