diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index 0863664d9..d94fb84bd 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1601 +1,1605 @@ use comm_lib::aws::ddb::{ operation::{ delete_item::DeleteItemOutput, get_item::GetItemOutput, put_item::PutItemOutput, query::QueryOutput, }, primitives::Blob, types::{AttributeValue, PutRequest, ReturnConsumedCapacity, WriteRequest}, }; use comm_lib::aws::{AwsConfig, DynamoDBClient}; +use comm_lib::database::{AttributeMap, DBItemAttributeError, DBItemError}; use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use crate::ddb_utils::{ create_one_time_key_partition_key, into_one_time_put_requests, OlmAccountType, }; -use crate::error::{consume_error, DBItemAttributeError, DBItemError, Error}; +use crate::error::{consume_error, Error}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, CONTENT_ONE_TIME_KEY, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, NOTIF_ONE_TIME_KEY, RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, USERS_TABLE, USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_CONTENT_ONE_TIME_KEYS_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_ONE_TIME_KEYS_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::error::{AttributeValueFromHashMap, FromAttributeValue}; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::{AccessTokenData, AuthType}; pub use grpc_clients::identity::DeviceType; mod device_list; pub use device_list::DeviceListRow; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } pub struct DBDeviceTypeInt(pub i32); impl TryFrom for DeviceType { type Error = crate::error::Error; fn try_from(value: DBDeviceTypeInt) -> Result { let device_result = DeviceType::try_from(value.0); device_result.map_err(|_| { Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), - attribute_value: Some(AttributeValue::N(value.0.to_string())), + attribute_value: Some(AttributeValue::N(value.0.to_string())).into(), attribute_error: DBItemAttributeError::InvalidValue, }) }) } } // This is very similar to the protobuf definitions, however, // coupling the protobuf schema to the database API should be avoided. pub struct PreKey { pub prekey: String, pub prekey_signature: String, } pub struct OutboundKeys { pub key_payload: String, pub key_payload_signature: String, pub social_proof: Option, pub content_prekey: PreKey, pub notif_prekey: PreKey, pub content_one_time_key: Option, pub notif_one_time_key: Option, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { let client = match &CONFIG.localstack_endpoint { Some(endpoint) => { info!( "Configuring DynamoDB client to use LocalStack endpoint: {}", endpoint ); let ddb_config_builder = comm_lib::aws::ddb::config::Builder::from(aws_config) .endpoint_url(endpoint); DynamoDBClient::from_conf(ddb_config_builder.build()) } None => DynamoDBClient::new(aws_config), }; DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, ) -> Result { self .add_user_to_users_table( registration_state.flattened_device_key_upload, Some((registration_state.username, Blob::new(password_file))), None, None, registration_state.user_id, ) .await } pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: String, user_id: Option, ) -> Result { self .add_user_to_users_table( flattened_device_key_upload, None, Some(wallet_address), Some(social_proof), user_id, ) .await } async fn add_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, username_and_password_file: Option<(String, Blob)>, wallet_address: Option, social_proof: Option, user_id: Option, ) -> Result { let user_id = user_id.unwrap_or_else(generate_uuid); let device_info = create_device_info(flattened_device_key_upload.clone(), social_proof); let devices = HashMap::from([( flattened_device_key_upload.device_id_key.clone(), AttributeValue::M(device_info), )]); let mut user = HashMap::from([ ( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), ), ( USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), AttributeValue::M(devices), ), ]); if let Some((username, password_file)) = username_and_password_file { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); } if let Some(address) = wallet_address { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(address), ); } self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(user)) // make sure we don't accidentaly overwrite existing row .condition_expression("attribute_not_exists(#pk)") .expression_attribute_names("#pk", USERS_TABLE_PARTITION_KEY) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( flattened_device_key_upload.device_id_key, flattened_device_key_upload.content_one_time_keys, flattened_device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } pub async fn add_password_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, ) -> Result<(), Error> { self .add_device_to_users_table(user_id, flattened_device_key_upload, None) .await } pub async fn add_wallet_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: String, ) -> Result<(), Error> { self .add_device_to_users_table( user_id, flattened_device_key_upload, Some(social_proof), ) .await } pub async fn get_keyserver_keys_for_user( &self, user_id: &str, ) -> Result, Error> { // DynamoDB doesn't have a way to "pop" a value from a list, so we must // first read in user info, then update one_time_keys with value we // gave to requester let user_info = self .get_item_from_users_table(user_id) .await? .item .ok_or(Error::MissingItem)?; let devices = user_info .get(USERS_TABLE_DEVICES_ATTRIBUTE) .ok_or(Error::MissingItem)? .to_hashmap(USERS_TABLE_DEVICES_ATTRIBUTE)?; let mut maybe_keyserver_id = None; for (device_id, device_info) in devices { let device_type = device_info .to_hashmap("device_id")? .get(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME) .ok_or(Error::MissingItem)? .to_string(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME)?; if device_type == "keyserver" { maybe_keyserver_id = Some(device_id); break; } } // Assert that the user has a keyserver, if they don't return None let keyserver_id = match maybe_keyserver_id { None => return Ok(None), Some(id) => id, }; let keyserver = devices.get_map(keyserver_id)?; let notif_one_time_key: Option = self .get_one_time_key(keyserver_id, OlmAccountType::Notification) .await?; let content_one_time_key: Option = self .get_one_time_key(keyserver_id, OlmAccountType::Content) .await?; debug!( "Able to get notif one-time key for keyserver {}: {}", keyserver_id, notif_one_time_key.is_some() ); debug!( "Able to get content one-time key for keyserver {}: {}", keyserver_id, content_one_time_key.is_some() ); let content_prekey = keyserver .get_string(USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME)?; let content_prekey_signature = keyserver.get_string( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?; let notif_prekey = keyserver .get_string(USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME)?; let notif_prekey_signature = keyserver.get_string( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?; let key_payload = keyserver .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME)? .to_string(); let key_payload_signature = keyserver .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME)? .to_string(); let social_proof = keyserver .get(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) .and_then(|s| { s.to_string(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) .ok() }) .map(|s| s.to_owned()); let full_content_prekey = PreKey { prekey: content_prekey.to_string(), prekey_signature: content_prekey_signature.to_string(), }; let full_notif_prekey = PreKey { prekey: notif_prekey.to_string(), prekey_signature: notif_prekey_signature.to_string(), }; let outbound_payload = OutboundKeys { key_payload, key_payload_signature, social_proof, content_prekey: full_content_prekey, notif_prekey: full_notif_prekey, content_one_time_key, notif_one_time_key, }; Ok(Some(outbound_payload)) } /// Will "mint" a single one-time key by attempting to successfully delete a /// key pub async fn get_one_time_key( &self, device_id: &str, account_type: OlmAccountType, ) -> Result, Error> { use crate::constants::one_time_keys_table as otk_table; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; let query_result = self.get_one_time_keys(device_id, account_type).await?; let items = query_result.items(); fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block let device_id = device_id.to_string(); tokio::spawn(async move { debug!("Attempting to request more keys for device: {}", &device_id); let result = crate::tunnelbroker::send_refresh_keys_request(&device_id).await; consume_error(result); }); } // If no one-time keys exist, or if there aren't enough, request more. // Additionally, if no one-time keys exist, return early. let item_vec = if let Some(items_list) = items { if items_list.len() < ONE_TIME_KEY_MINIMUM_THRESHOLD { spawn_refresh_keys_task(device_id); } items_list } else { debug!("Unable to find {:?} one-time key", account_type); spawn_refresh_keys_task(device_id); return Ok(None); }; let mut result = None; // Attempt to delete the one-time keys individually, a successful delete // mints the one-time key to the requester for item in item_vec { let pk = item.get_string(otk_table::PARTITION_KEY)?; let otk = item.get_string(otk_table::SORT_KEY)?; let composite_key = HashMap::from([ ( otk_table::PARTITION_KEY.to_string(), AttributeValue::S(pk.to_string()), ), ( otk_table::SORT_KEY.to_string(), AttributeValue::S(otk.to_string()), ), ]); debug!("Attempting to delete a {:?} one time key", account_type); match self .client .delete_item() .set_key(Some(composite_key)) .table_name(otk_table::NAME) .send() .await { Ok(_) => { result = Some(otk.to_string()); break; } // This err should only happen if a delete occurred between the read // above and this delete Err(e) => { debug!("Unable to delete key: {:?}", e); continue; } } } // Return deleted key Ok(result) } pub async fn get_one_time_keys( &self, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::one_time_keys_table::*; // Add related prefix to partition key to grab the correct result set let partition_key = create_one_time_key_partition_key(device_id, account_type); self .client .query() .table_name(NAME) .key_condition_expression(format!("{} = :pk", PARTITION_KEY)) .expression_attribute_values(":pk", AttributeValue::S(partition_key)) .return_consumed_capacity(ReturnConsumedCapacity::Total) .send() .await .map_err(|e| Error::AwsSdk(e.into())) .map(|response| { let capacity_units = response .consumed_capacity() .and_then(|it| it.capacity_units()); debug!("OTK read consumed capacity: {:?}", capacity_units); response }) } pub async fn set_prekey( &self, user_id: String, device_id: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, ) -> Result<(), Error> { let notif_prekey_av = AttributeValue::S(notif_prekey); let notif_prekey_signature_av = AttributeValue::S(notif_prekey_signature); let content_prekey_av = AttributeValue::S(content_prekey); let content_prekey_signature_av = AttributeValue::S(content_prekey_signature); let update_expression = format!("SET {0}.#{1}.{2} = :n, {0}.#{1}.{3} = :p, {0}.#{1}.{4} = :c, {0}.#{1}.{5} = :d", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID", USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, ); let expression_attribute_names = HashMap::from([ (format!("#{}", "deviceID"), device_id), ( "#user_id".to_string(), USERS_TABLE_PARTITION_KEY.to_string(), ), ]); let expression_attribute_values = HashMap::from([ (":n".to_string(), notif_prekey_av), (":p".to_string(), notif_prekey_signature_av), (":c".to_string(), content_prekey_av), (":d".to_string(), content_prekey_signature_av), ]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .condition_expression("attribute_exists(#user_id)") .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn append_one_time_prekeys( &self, device_id: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, ) -> Result<(), Error> { use crate::constants::one_time_keys_table; let mut otk_requests = into_one_time_put_requests( &device_id, content_one_time_keys, OlmAccountType::Content, ); let notif_otk_requests: Vec = into_one_time_put_requests( &device_id, notif_one_time_keys, OlmAccountType::Notification, ); otk_requests.extend(notif_otk_requests); // BatchWriteItem has a hard limit of 25 writes per call for requests in otk_requests.chunks(25) { self .client .batch_write_item() .request_items(one_time_keys_table::NAME, requests.to_vec()) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } Ok(()) } async fn add_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> Result<(), Error> { // Avoid borrowing from lifetime of flattened_device_key_upload let device_id = flattened_device_key_upload.device_id_key.clone(); let content_one_time_keys = flattened_device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_one_time_keys.clone(); let device_info = create_device_info(flattened_device_key_upload, social_proof); let update_expression = format!("SET {}.#{} = :v", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID",); let expression_attribute_names = HashMap::from([(format!("#{}", "deviceID"), device_id.clone())]); let expression_attribute_values = HashMap::from([(":v".to_string(), AttributeValue::M(device_info))]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( device_id, content_one_time_keys, notif_one_time_keys, ) .await?; Ok(()) } pub async fn remove_device_from_users_table( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { let update_expression = format!("REMOVE {}.{}", USERS_TABLE_DEVICES_ATTRIBUTE, "#deviceID"); let expression_attribute_names = HashMap::from([("#deviceID".to_string(), device_id_key)]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_names(Some(expression_attribute_names)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn delete_user( &self, user_id: String, ) -> Result { debug!(user_id, "Attempting to delete user's devices"); self.delete_devices_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user"); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete user {}", user_id); Err(Error::AwsSdk(e.into())) } } } pub async fn get_access_token_data( &self, user_id: String, signing_public_key: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), signing_public_key.clone(), ), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = parse_date_time_attribute( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, signing_public_key, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and signing public key {} in token table", user_id, signing_public_key ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn verify_access_token( &self, user_id: String, signing_public_key: String, access_token_to_verify: String, ) -> Result { let is_valid = self .get_access_token_data(user_id, signing_public_key) .await? .map(|access_token_data| { constant_time_eq( access_token_data.access_token.as_bytes(), access_token_to_verify.as_bytes(), ) && access_token_data.is_valid() }) .unwrap_or(false); Ok(is_valid) } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.signing_public_key), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn delete_access_token_data( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { self .client .delete_item() .table_name(ACCESS_TOKEN_TABLE) .key( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ) .key( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(device_id_key), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn wallet_address_taken( &self, wallet_address: String, ) -> Result { let result = self .get_user_id_from_user_info(wallet_address, &AuthType::Wallet) .await?; Ok(result.is_some()) } pub async fn username_taken(&self, username: String) -> Result { let result = self .get_user_id_from_user_info(username, &AuthType::Password) .await?; Ok(result.is_some()) } pub async fn filter_out_taken_usernames( &self, usernames: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; let db_usernames_set: HashSet = db_usernames.into_iter().collect(); let usernames_set: HashSet = usernames.into_iter().collect(); let available_usernames: Vec = usernames_set .difference(&db_usernames_set) .cloned() .collect(); Ok(available_usernames) } async fn get_user_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result>, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), - attribute_value: None, + attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), - attribute_value: first_item.get(USERS_TABLE_PARTITION_KEY).cloned(), + attribute_value: first_item + .get(USERS_TABLE_PARTITION_KEY) + .cloned() + .into(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_keys_for_user_id( &self, user_id: &str, get_one_time_keys: bool, ) -> Result, Error> { let Some(user) = self.get_item_from_users_table(user_id).await?.item else { return Ok(None); }; self.get_keys_for_user(user, get_one_time_keys).await } async fn get_keys_for_user( &self, - mut user: HashMap, + mut user: AttributeMap, get_one_time_keys: bool, ) -> Result, Error> { let devices = parse_map_attribute( USERS_TABLE_DEVICES_ATTRIBUTE, user.remove(USERS_TABLE_DEVICES_ATTRIBUTE), )?; let mut devices_response = HashMap::with_capacity(devices.len()); for (device_id_key, device_info) in devices { let device_info_map = parse_map_attribute(&device_id_key, Some(device_info))?; let mut device_info_string_map = HashMap::new(); for (attribute_name, attribute_value) in device_info_map { // Excluding one-time keys since we're moving them to a separate table if attribute_name == USERS_TABLE_DEVICES_MAP_NOTIF_ONE_TIME_KEYS_ATTRIBUTE_NAME || attribute_name == USERS_TABLE_DEVICES_MAP_CONTENT_ONE_TIME_KEYS_ATTRIBUTE_NAME { continue; } let attribute_value_str = parse_string_attribute(&attribute_name, Some(attribute_value))?; device_info_string_map.insert(attribute_name, attribute_value_str); } if get_one_time_keys { if let Some(notif_one_time_key) = self .get_one_time_key(&device_id_key, OlmAccountType::Notification) .await? { device_info_string_map .insert(NOTIF_ONE_TIME_KEY.to_string(), notif_one_time_key); } if let Some(content_one_time_key) = self .get_one_time_key(&device_id_key, OlmAccountType::Content) .await? { device_info_string_map .insert(CONTENT_ONE_TIME_KEY.to_string(), content_one_time_key); } } devices_response.insert(device_id_key, device_info_string_map); } Ok(Some(devices_response)) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => parse_string_attribute( USERS_TABLE_PARTITION_KEY, user.remove(USERS_TABLE_PARTITION_KEY), ) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } pub async fn get_user_id_and_password_file_from_username( &self, username: &str, ) -> Result)>, Error> { match self .get_user_from_user_info(username.to_string(), &AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = parse_string_attribute( USERS_TABLE_PARTITION_KEY, user.remove(USERS_TABLE_PARTITION_KEY), )?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((user_id, password_file))) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let Ok(username) = parse_string_attribute( USERS_TABLE_USERNAME_ATTRIBUTE, attribute.remove(USERS_TABLE_USERNAME_ATTRIBUTE), ) { result.push(username); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = parse_string_attribute( NONCE_TABLE_PARTITION_KEY, item.remove(&NONCE_TABLE_PARTITION_KEY.to_string()), )?; let created = parse_date_time_attribute( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(&NONCE_TABLE_CREATED_ATTRIBUTE.to_string()), )?; let expiration_time = parse_date_time_attribute( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(&NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string()), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, usernames: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for usernames_chunk in usernames.chunks(25) { let write_requests = usernames_chunk .iter() .map(|username| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.to_string()), ) .build(); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn username_in_reserved_usernames_table( &self, username: &str, ) -> Result { match self .client .get_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(username.to_string()), ) .consistent_read(true) .send() .await { Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), Ok(_) => Ok(false), Err(e) => Err(Error::AwsSdk(e.into())), } } } type AttributeName = String; pub type DeviceKeys = HashMap; type Devices = HashMap; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_date_time_attribute( attribute_name: &str, attribute: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(created)) = &attribute { created.parse().map_err(|e| { DBItemError::new( attribute_name.to_string(), - attribute, + attribute.into(), DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( attribute_name.to_string(), - attribute, + attribute.into(), DBItemAttributeError::Missing, )) } } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), - attribute, + attribute.into(), DBItemAttributeError::Missing, )), } } #[allow(dead_code)] fn parse_map_attribute( attribute_name: &str, attribute_value: Option, -) -> Result, DBItemError> { +) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", "Unexpected attribute type when parsing map attribute" ); Err(DBItemError::new( attribute_name.to_string(), - attribute_value, + attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), - attribute_value, + attribute_value.into(), DBItemAttributeError::Missing, )) } } } fn parse_string_attribute( attribute_name: &str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", "Unexpected attribute type when parsing string attribute" ); Err(DBItemError::new( attribute_name.to_string(), - attribute_value, + attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), - attribute_value, + attribute_value.into(), DBItemAttributeError::Missing, )) } } } fn create_device_info( flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, -) -> HashMap { +) -> AttributeMap { let mut device_info = HashMap::from([ ( USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.device_type.to_string()), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload_signature), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME .to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey_signature), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey_signature), ), ]); if let Some(social_proof) = social_proof { device_info.insert( USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), AttributeValue::S(social_proof), ); } device_info } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(&example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } #[test] fn test_int_to_device_type() { let valid_result = DeviceType::try_from(3); assert!(valid_result.is_ok()); assert_eq!(valid_result.unwrap(), DeviceType::Android); let invalid_result = DeviceType::try_from(6); assert!(invalid_result.is_err()); } } diff --git a/services/identity/src/database/device_list.rs b/services/identity/src/database/device_list.rs index 09a0492f0..6619bc4ca 100644 --- a/services/identity/src/database/device_list.rs +++ b/services/identity/src/database/device_list.rs @@ -1,795 +1,792 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use comm_lib::{ aws::ddb::{ operation::{get_item::GetItemOutput, query::builders::QueryFluentBuilder}, types::{ error::TransactionCanceledException, AttributeValue, DeleteRequest, Put, TransactWriteItem, Update, WriteRequest, }, }, - database::{AttributeMap, DynamoDBError}, + database::{AttributeMap, DBItemAttributeError, DBItemError, DynamoDBError}, }; use tracing::{error, warn}; use crate::{ client_service::FlattenedDeviceKeyUpload, constants::{ devices_table::{self, *}, USERS_TABLE, USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, }, database::parse_string_attribute, ddb_utils::AttributesOptionExt, - error::{ - DBItemAttributeError, DBItemError, DeviceListError, Error, - FromAttributeValue, - }, + error::{DeviceListError, Error, FromAttributeValue}, grpc_services::protos::unauth::DeviceType, }; use super::{parse_date_time_attribute, DatabaseClient}; #[derive(Clone, Debug)] pub struct DeviceRow { pub user_id: String, pub device_id: String, pub device_type: DeviceType, pub device_key_info: IdentityKeyInfo, pub content_prekey: PreKey, pub notif_prekey: PreKey, } #[derive(Clone, Debug)] pub struct DeviceListRow { pub user_id: String, pub timestamp: DateTime, pub device_ids: Vec, } #[derive(Clone, Debug)] pub struct IdentityKeyInfo { pub key_payload: String, pub key_payload_signature: String, pub social_proof: Option, } #[derive(Clone, Debug)] pub struct PreKey { pub pre_key: String, pub pre_key_signature: String, } impl DeviceRow { pub fn from_device_key_upload( user_id: impl Into, upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> Self { Self { user_id: user_id.into(), device_id: upload.device_id_key, device_type: DeviceType::from_str_name(upload.device_type.as_str_name()) .expect("DeviceType conversion failed. Identity client and server protos mismatch"), device_key_info: IdentityKeyInfo { key_payload: upload.key_payload, key_payload_signature: upload.key_payload_signature, social_proof, }, content_prekey: PreKey { pre_key: upload.content_prekey, pre_key_signature: upload.content_prekey_signature, }, notif_prekey: PreKey { pre_key: upload.notif_prekey, pre_key_signature: upload.notif_prekey_signature, } } } } impl DeviceListRow { /// Generates new device list row from given devices fn new(user_id: impl Into, device_ids: Vec) -> Self { Self { user_id: user_id.into(), device_ids, timestamp: Utc::now(), } } } // helper structs for converting to/from attribute values for sort key (a.k.a itemID) struct DeviceIDAttribute(String); struct DeviceListKeyAttribute(DateTime); impl From for AttributeValue { fn from(value: DeviceIDAttribute) -> Self { AttributeValue::S(format!("{DEVICE_ITEM_KEY_PREFIX}{}", value.0)) } } impl From for AttributeValue { fn from(value: DeviceListKeyAttribute) -> Self { AttributeValue::S(format!( "{DEVICE_LIST_KEY_PREFIX}{}", value.0.to_rfc3339() )) } } impl TryFrom> for DeviceIDAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = parse_string_attribute(ATTR_ITEM_ID, value)?; // remove the device- prefix let device_id = item_id .strip_prefix(DEVICE_ITEM_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), - attribute_value: Some(AttributeValue::S(item_id.clone())), + attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, })? .to_string(); Ok(Self(device_id)) } } impl TryFrom> for DeviceListKeyAttribute { type Error = DBItemError; fn try_from(value: Option) -> Result { let item_id = parse_string_attribute(ATTR_ITEM_ID, value)?; // remove the device-list- prefix, then parse the timestamp let timestamp: DateTime = item_id .strip_prefix(DEVICE_LIST_KEY_PREFIX) .ok_or_else(|| DBItemError { attribute_name: ATTR_ITEM_ID.to_string(), - attribute_value: Some(AttributeValue::S(item_id.clone())), + attribute_value: item_id.clone().into(), attribute_error: DBItemAttributeError::InvalidValue, }) .and_then(|s| { s.parse().map_err(|e| { DBItemError::new( ATTR_ITEM_ID.to_string(), - Some(AttributeValue::S(item_id.clone())), + item_id.clone().into(), DBItemAttributeError::InvalidTimestamp(e), ) }) })?; Ok(Self(timestamp)) } } impl TryFrom for DeviceRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = parse_string_attribute(ATTR_USER_ID, attrs.remove(ATTR_USER_ID))?; let DeviceIDAttribute(device_id) = attrs.remove(ATTR_ITEM_ID).try_into()?; let raw_device_type = parse_string_attribute(ATTR_DEVICE_TYPE, attrs.remove(ATTR_DEVICE_TYPE))?; let device_type = DeviceType::from_str_name(&raw_device_type).ok_or_else(|| { DBItemError::new( ATTR_DEVICE_TYPE.to_string(), - Some(AttributeValue::S(raw_device_type)), + raw_device_type.into(), DBItemAttributeError::InvalidValue, ) })?; let device_key_info = attrs .remove(ATTR_DEVICE_KEY_INFO) .ok_or_missing(ATTR_DEVICE_KEY_INFO)? .to_hashmap(ATTR_DEVICE_KEY_INFO) .cloned() .and_then(IdentityKeyInfo::try_from)?; let content_prekey = attrs .remove(ATTR_CONTENT_PREKEY) .ok_or_missing(ATTR_CONTENT_PREKEY)? .to_hashmap(ATTR_CONTENT_PREKEY) .cloned() .and_then(PreKey::try_from)?; let notif_prekey = attrs .remove(ATTR_NOTIF_PREKEY) .ok_or_missing(ATTR_NOTIF_PREKEY)? .to_hashmap(ATTR_NOTIF_PREKEY) .cloned() .and_then(PreKey::try_from)?; Ok(Self { user_id, device_id, device_type, device_key_info, content_prekey, notif_prekey, }) } } impl From for AttributeMap { fn from(value: DeviceRow) -> Self { HashMap::from([ (ATTR_USER_ID.to_string(), AttributeValue::S(value.user_id)), ( ATTR_ITEM_ID.to_string(), DeviceIDAttribute(value.device_id).into(), ), ( ATTR_DEVICE_TYPE.to_string(), AttributeValue::S(value.device_type.as_str_name().to_string()), ), ( ATTR_DEVICE_KEY_INFO.to_string(), value.device_key_info.into(), ), (ATTR_CONTENT_PREKEY.to_string(), value.content_prekey.into()), (ATTR_NOTIF_PREKEY.to_string(), value.notif_prekey.into()), ]) } } impl From for AttributeValue { fn from(value: IdentityKeyInfo) -> Self { let mut attrs = HashMap::from([ ( ATTR_KEY_PAYLOAD.to_string(), AttributeValue::S(value.key_payload), ), ( ATTR_KEY_PAYLOAD_SIGNATURE.to_string(), AttributeValue::S(value.key_payload_signature), ), ]); if let Some(social_proof) = value.social_proof { attrs.insert( ATTR_SOCIAL_PROOF.to_string(), AttributeValue::S(social_proof), ); } AttributeValue::M(attrs) } } impl TryFrom for IdentityKeyInfo { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let key_payload = parse_string_attribute(ATTR_KEY_PAYLOAD, attrs.remove(ATTR_KEY_PAYLOAD))?; let key_payload_signature = parse_string_attribute( ATTR_KEY_PAYLOAD_SIGNATURE, attrs.remove(ATTR_KEY_PAYLOAD_SIGNATURE), )?; // social proof is optional let social_proof = attrs .remove(ATTR_SOCIAL_PROOF) .map(|attr| attr.to_string(ATTR_SOCIAL_PROOF).cloned()) .transpose()?; Ok(Self { key_payload, key_payload_signature, social_proof, }) } } impl From for AttributeValue { fn from(value: PreKey) -> Self { let attrs = HashMap::from([ (ATTR_PREKEY.to_string(), AttributeValue::S(value.pre_key)), ( ATTR_PREKEY_SIGNATURE.to_string(), AttributeValue::S(value.pre_key_signature), ), ]); AttributeValue::M(attrs) } } impl TryFrom for PreKey { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let pre_key = parse_string_attribute(ATTR_PREKEY, attrs.remove(ATTR_PREKEY))?; let pre_key_signature = parse_string_attribute( ATTR_PREKEY_SIGNATURE, attrs.remove(ATTR_PREKEY_SIGNATURE), )?; Ok(Self { pre_key, pre_key_signature, }) } } impl TryFrom for DeviceListRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = parse_string_attribute(ATTR_USER_ID, attrs.remove(ATTR_USER_ID))?; let DeviceListKeyAttribute(timestamp) = attrs.remove(ATTR_ITEM_ID).try_into()?; // validate timestamps are in sync let timestamps_match = attrs .remove(ATTR_TIMESTAMP) .and_then(|attr| attr.as_n().ok().cloned()) .and_then(|val| val.parse::().ok()) .filter(|val| *val == timestamp.timestamp_millis()) .is_some(); if !timestamps_match { warn!( "DeviceList timestamp mismatch for (userID={}, itemID={})", &user_id, timestamp.to_rfc3339() ); } // this should be a list of strings let device_ids = attrs .remove(ATTR_DEVICE_IDS) .ok_or_else(|| { DBItemError::new( ATTR_DEVICE_IDS.to_string(), - None, + None.into(), DBItemAttributeError::Missing, ) })? .to_vec(ATTR_DEVICE_IDS)? .iter() .map(|v| v.to_string("device_ids[?]").cloned()) .collect::, DBItemError>>()?; Ok(Self { user_id, timestamp, device_ids, }) } } impl From for AttributeMap { fn from(device_list: DeviceListRow) -> Self { let mut attrs = HashMap::new(); attrs.insert( ATTR_USER_ID.to_string(), AttributeValue::S(device_list.user_id.clone()), ); attrs.insert( ATTR_ITEM_ID.to_string(), DeviceListKeyAttribute(device_list.timestamp).into(), ); attrs.insert( ATTR_TIMESTAMP.to_string(), AttributeValue::N(device_list.timestamp.timestamp_millis().to_string()), ); attrs.insert( ATTR_DEVICE_IDS.to_string(), AttributeValue::L( device_list .device_ids .into_iter() .map(AttributeValue::S) .collect(), ), ); attrs } } impl DatabaseClient { /// Retrieves user's current devices and their full data pub async fn get_current_devices( &self, user_id: impl Into, ) -> Result, Error> { let response = query_rows_with_prefix(self, user_id, DEVICE_ITEM_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to get current devices: {:?}", e); Error::AwsSdk(e.into()) })?; let Some(rows) = response.items else { return Ok(Vec::new()); }; rows .into_iter() .map(DeviceRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Gets user's device list history pub async fn get_device_list_history( &self, user_id: impl Into, since: Option>, ) -> Result, Error> { let rows = if let Some(since) = since { // When timestamp is provided, it's better to query device lists by timestamp LSI self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id AND #timestamp > :since") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#timestamp", ATTR_TIMESTAMP) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .expression_attribute_values( ":since", AttributeValue::N(since.timestamp_millis().to_string()), ) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items } else { // Query all device lists for user query_rows_with_prefix(self, user_id, DEVICE_LIST_KEY_PREFIX) .send() .await .map_err(|e| { error!("Failed to query device list updates (all): {:?}", e); Error::AwsSdk(e.into()) })? .items }; rows .unwrap_or_default() .into_iter() .map(DeviceListRow::try_from) .collect::, DBItemError>>() .map_err(Error::from) } /// Checks if given device exists on user's current device list pub async fn device_exists( &self, user_id: impl Into, device_id: impl Into, ) -> Result { let GetItemOutput { item, .. } = self .client .get_item() .table_name(devices_table::NAME) .key(ATTR_USER_ID, AttributeValue::S(user_id.into())) .key(ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into()) // only fetch the primary key, we don't need the rest .projection_expression(format!("{ATTR_USER_ID}, {ATTR_ITEM_ID}")) .send() .await .map_err(|e| { error!("Failed to check if device exists: {:?}", e); Error::AwsSdk(e.into()) })?; Ok(item.is_some()) } pub async fn get_current_device_list( &self, user_id: impl Into, ) -> Result, Error> { self .client .query() .table_name(devices_table::NAME) .index_name(devices_table::TIMESTAMP_INDEX_NAME) .consistent_read(true) .key_condition_expression("#user_id = :user_id") // sort descending .scan_index_forward(false) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .limit(1) .send() .await .map_err(|e| { error!("Failed to query device list updates by index: {:?}", e); Error::AwsSdk(e.into()) })? .items .and_then(|mut items| items.pop()) .map(DeviceListRow::try_from) .transpose() .map_err(Error::from) } /// Adds new device to user's device list. If the device already exists, the /// operation fails. Transactionally generates new device list version. pub async fn add_device( &self, user_id: impl Into, device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> Result<(), Error> { let user_id: String = user_id.into(); self .transact_update_devicelist(&user_id, |ref mut device_ids| { let new_device = DeviceRow::from_device_key_upload( &user_id, device_key_upload, social_proof, ); if device_ids.iter().any(|id| &new_device.device_id == id) { warn!( "Device already exists in user's device list \ (userID={}, deviceID={})", &user_id, &new_device.device_id ); return Err(Error::DeviceList(DeviceListError::DeviceAlreadyExists)); } device_ids.push(new_device.device_id.clone()); // Put new device let put_device = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_operation = TransactWriteItem::builder().put(put_device).build(); Ok(put_device_operation) }) .await } /// Performs a transactional update of the device list for the user. Afterwards /// generates a new device list and updates the timestamp in the users table. /// This is done in a transaction. Operation fails if the device list has been /// updated concurrently (timestamp mismatch). async fn transact_update_devicelist( &self, user_id: &str, // The closure performing a transactional update of the device list. It receives a mutable // reference to the current device list. The closure should return a transactional DDB // operation to be performed when updating the device list. action: impl FnOnce(&mut Vec) -> Result, ) -> Result<(), Error> { let previous_timestamp = get_current_devicelist_timestamp(self, user_id).await?; let mut device_ids = self .get_current_device_list(user_id) .await? .map(|device_list| device_list.device_ids) .unwrap_or_default(); // Perform the update action, then generate new device list let operation = action(&mut device_ids)?; let new_device_list = DeviceListRow::new(user_id, device_ids); // Update timestamp in users table let timestamp_update_operation = device_list_timestamp_update_operation( user_id, previous_timestamp, new_device_list.timestamp, ); // Put updated device list (a new version) let put_device_list = Put::builder() .table_name(devices_table::NAME) .set_item(Some(new_device_list.into())) .condition_expression( "attribute_not_exists(#user_id) AND attribute_not_exists(#item_id)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .build(); let put_device_list_operation = TransactWriteItem::builder().put(put_device_list).build(); self .client .transact_write_items() .transact_items(operation) .transact_items(put_device_list_operation) .transact_items(timestamp_update_operation) .send() .await .map_err(|e| match DynamoDBError::from(e) { DynamoDBError::TransactionCanceledException( TransactionCanceledException { cancellation_reasons: Some(reasons), .. }, ) if reasons .iter() .any(|reason| reason.code() == Some("ConditionalCheckFailed")) => { Error::DeviceList(DeviceListError::ConcurrentUpdateError) } other => { error!("Device list update transaction failed: {:?}", other); Error::AwsSdk(other) } })?; Ok(()) } /// Deletes all user data from devices table pub async fn delete_devices_table_rows_for_user( &self, user_id: impl Into, ) -> Result<(), Error> { // 1. get all rows // 2. batch write delete all // we project only the primary keys so we can pass these directly to delete requests let primary_keys = self .client .query() .table_name(devices_table::NAME) .projection_expression("#user_id, #item_id") .key_condition_expression("#user_id = :user_id") .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values( ":user_id", AttributeValue::S(user_id.into()), ) .consistent_read(true) .send() .await .map_err(|e| { error!("Failed to list user's items in devices table: {:?}", e); Error::AwsSdk(e.into()) })? .items .unwrap_or_default(); let delete_requests = primary_keys .into_iter() .map(|item| { let request = DeleteRequest::builder().set_key(Some(item)).build(); WriteRequest::builder().delete_request(request).build() }) .collect::>(); // TODO: We can use the batch write helper from comm-services-lib when integrated for batch in delete_requests.chunks(25) { self .client .batch_write_item() .request_items(devices_table::NAME, batch.to_vec()) .send() .await .map_err(|e| { error!("Failed to batch delete items from devices table: {:?}", e); Error::AwsSdk(e.into()) })?; } Ok(()) } } /// Gets timestamp of user's current device list. Returns None if the user /// doesn't have a device list yet. Storing the timestamp in the users table is /// required for consistency. It's used as a condition when updating the device /// list. async fn get_current_devicelist_timestamp( db: &crate::database::DatabaseClient, user_id: impl Into, ) -> Result>, Error> { let response = db .client .get_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .projection_expression(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME) .send() .await .map_err(|e| { error!("Failed to get user's device list timestamp: {:?}", e); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); let raw_datetime = user_item.remove(USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME); // existing records will not have this field when // updating device list for the first time if raw_datetime.is_none() { return Ok(None); } let timestamp = parse_date_time_attribute( USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, raw_datetime, )?; Ok(Some(timestamp)) } /// Generates update expression for current device list timestamp in users table. /// The previous timestamp is used as a condition to ensure that the value hasn't changed /// since we got it. This avoids race conditions when updating the device list. fn device_list_timestamp_update_operation( user_id: impl Into, previous_timestamp: Option>, new_timestamp: DateTime, ) -> TransactWriteItem { let update_builder = match previous_timestamp { Some(previous_timestamp) => Update::builder() .condition_expression("#device_list_timestamp = :previous_timestamp") .expression_attribute_values( ":previous_timestamp", AttributeValue::S(previous_timestamp.to_rfc3339()), ), // If there's no previous timestamp, the attribute shouldn't exist yet None => Update::builder() .condition_expression("attribute_not_exists(#device_list_timestamp)"), }; let update = update_builder .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.into())) .update_expression("SET #device_list_timestamp = :new_timestamp") .expression_attribute_names( "#device_list_timestamp", USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME, ) .expression_attribute_values( ":new_timestamp", AttributeValue::S(new_timestamp.to_rfc3339()), ) .build(); TransactWriteItem::builder().update(update).build() } /// Helper function to query rows by given sort key prefix fn query_rows_with_prefix( db: &crate::database::DatabaseClient, user_id: impl Into, prefix: &'static str, ) -> QueryFluentBuilder { db.client .query() .table_name(devices_table::NAME) .key_condition_expression( "#user_id = :user_id AND begins_with(#item_id, :device_prefix)", ) .expression_attribute_names("#user_id", ATTR_USER_ID) .expression_attribute_names("#item_id", ATTR_ITEM_ID) .expression_attribute_values(":user_id", AttributeValue::S(user_id.into())) .expression_attribute_values( ":device_prefix", AttributeValue::S(prefix.to_string()), ) .consistent_read(true) } diff --git a/services/identity/src/ddb_utils.rs b/services/identity/src/ddb_utils.rs index 3836ca3d4..50504c777 100644 --- a/services/identity/src/ddb_utils.rs +++ b/services/identity/src/ddb_utils.rs @@ -1,92 +1,95 @@ use chrono::{DateTime, NaiveDateTime, Utc}; -use comm_lib::aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}; +use comm_lib::{ + aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}, + database::Value, +}; use std::collections::HashMap; use std::iter::IntoIterator; -use crate::error::{DBItemAttributeError, DBItemError}; +use comm_lib::database::{DBItemAttributeError, DBItemError}; #[derive(Copy, Clone, Debug)] pub enum OlmAccountType { Content, Notification, } // Prefix the one time keys with the olm account variant. This allows for a single // DDB table to contain both notification and content keys for a device. pub fn create_one_time_key_partition_key( device_id: &str, account_type: OlmAccountType, ) -> String { match account_type { OlmAccountType::Content => format!("content_{device_id}"), OlmAccountType::Notification => format!("notification_{device_id}"), } } fn create_one_time_key_put_request( device_id: &str, one_time_key: String, account_type: OlmAccountType, ) -> WriteRequest { use crate::constants::one_time_keys_table::*; let partition_key = create_one_time_key_partition_key(device_id, account_type); let builder = PutRequest::builder(); let attrs = HashMap::from([ (PARTITION_KEY.to_string(), AttributeValue::S(partition_key)), (SORT_KEY.to_string(), AttributeValue::S(one_time_key)), ]); let put_request = builder.set_item(Some(attrs)).build(); WriteRequest::builder().put_request(put_request).build() } pub fn into_one_time_put_requests( device_id: &str, one_time_keys: T, account_type: OlmAccountType, ) -> Vec where T: IntoIterator, ::Item: ToString, { one_time_keys .into_iter() .map(|otk| { create_one_time_key_put_request(device_id, otk.to_string(), account_type) }) .collect() } pub trait AttributesOptionExt { fn ok_or_missing( self, attr_name: impl Into, ) -> Result; } impl AttributesOptionExt for Option { fn ok_or_missing( self, attr_name: impl Into, ) -> Result { self.ok_or_else(|| DBItemError { attribute_name: attr_name.into(), - attribute_value: None, + attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, }) } } pub trait DateTimeExt { fn from_utc_timestamp_millis(timestamp: i64) -> Option>; } impl DateTimeExt for DateTime { fn from_utc_timestamp_millis(timestamp: i64) -> Option { let naive = NaiveDateTime::from_timestamp_millis(timestamp)?; Some(Self::from_utc(naive, Utc)) } } diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs index 0e9a69e31..1486cbf4c 100644 --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,166 +1,126 @@ use comm_lib::aws::ddb::types::AttributeValue; use comm_lib::aws::DynamoDBError; +use comm_lib::database::{DBItemAttributeError, DBItemError, Value}; use std::collections::hash_map::HashMap; use std::fmt::{Display, Formatter, Result as FmtResult}; use tracing::error; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(...)] Transport(tonic::transport::Error), #[display(...)] Status(tonic::Status), #[display(...)] MissingItem, #[display(...)] DeviceList(DeviceListError), } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DeviceListError { DeviceAlreadyExists, ConcurrentUpdateError, } -#[derive(Debug, derive_more::Error, derive_more::Constructor)] -pub struct DBItemError { - pub attribute_name: String, - pub attribute_value: Option, - pub attribute_error: DBItemAttributeError, -} - -impl Display for DBItemError { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - match &self.attribute_error { - DBItemAttributeError::Missing => { - write!(f, "Attribute {} is missing", self.attribute_name) - } - DBItemAttributeError::IncorrectType => write!( - f, - "Value for attribute {} has incorrect type: {:?}", - self.attribute_name, self.attribute_value - ), - error => write!( - f, - "Error regarding attribute {} with value {:?}: {}", - self.attribute_name, self.attribute_value, error - ), - } - } -} - -#[derive(Debug, derive_more::Display, derive_more::Error)] -pub enum DBItemAttributeError { - #[display(...)] - Missing, - #[display(...)] - IncorrectType, - #[display(...)] - InvalidTimestamp(chrono::ParseError), - #[display(...)] - ExpiredTimestamp, - #[display(...)] - InvalidValue, -} - pub trait FromAttributeValue { fn to_vec( &self, attr_name: &str, ) -> Result<&Vec, DBItemError>; fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError>; fn to_hashmap( &self, attr_name: &str, ) -> Result<&HashMap, DBItemError>; } fn handle_attr_failure(value: &AttributeValue, attr_name: &str) -> DBItemError { DBItemError { attribute_name: attr_name.to_string(), - attribute_value: Some(value.clone()), + attribute_value: Value::AttributeValue(Some(value.clone())), attribute_error: DBItemAttributeError::IncorrectType, } } impl FromAttributeValue for AttributeValue { fn to_vec( &self, attr_name: &str, ) -> Result<&Vec, DBItemError> { self.as_l().map_err(|e| handle_attr_failure(e, attr_name)) } fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError> { self.as_s().map_err(|e| handle_attr_failure(e, attr_name)) } fn to_hashmap( &self, attr_name: &str, ) -> Result<&HashMap, DBItemError> { self.as_m().map_err(|e| handle_attr_failure(e, attr_name)) } } pub trait AttributeValueFromHashMap { fn get_string(&self, key: &str) -> Result<&String, DBItemError>; fn get_map( &self, key: &str, ) -> Result<&HashMap, DBItemError>; fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError>; } impl AttributeValueFromHashMap for HashMap { fn get_string(&self, key: &str) -> Result<&String, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), - attribute_value: None, + attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_string(key) } fn get_map( &self, key: &str, ) -> Result<&HashMap, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), - attribute_value: None, + attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_hashmap(key) } fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), - attribute_value: None, + attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_vec(key) } } pub fn consume_error(result: Result) { match result { Ok(_) => (), Err(e) => { error!("{}", e); } } } diff --git a/shared/comm-lib/src/database.rs b/shared/comm-lib/src/database.rs index 0abd0a60e..f29efbe72 100644 --- a/shared/comm-lib/src/database.rs +++ b/shared/comm-lib/src/database.rs @@ -1,764 +1,768 @@ use aws_sdk_dynamodb::types::AttributeValue; pub use aws_sdk_dynamodb::Error as DynamoDBError; use chrono::{DateTime, Utc}; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::num::ParseIntError; use std::str::FromStr; // # Useful type aliases // Rust exports `pub type` only into the so-called "type namespace", but in // order to use them e.g. with the `TryFromAttribute` trait, they also need // to be exported into the "value namespace" which is what `pub use` does. // // To overcome that, a dummy module is created and aliases are re-exported // with `pub use` construct mod aliases { use aws_sdk_dynamodb::types::AttributeValue; use std::collections::HashMap; pub type AttributeMap = HashMap; } pub use self::aliases::AttributeMap; // # Error handling #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(fmt = "Maximum retries exceeded")] MaxRetriesExceeded, } -#[derive(Debug)] +#[derive(Debug, derive_more::From)] pub enum Value { AttributeValue(Option), String(String), } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { - attribute_name: String, - attribute_value: Value, - attribute_error: DBItemAttributeError, + pub attribute_name: String, + pub attribute_value: Value, + pub attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] TimestampOutOfRange, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] InvalidNumberFormat(ParseIntError), + #[display(...)] + ExpiredTimestamp, + #[display(...)] + InvalidValue, } /// Conversion trait for [`AttributeValue`] /// /// Types implementing this trait are able to do the following: /// ```ignore /// use comm_lib::database::{TryFromAttribute, AttributeTryInto}; /// /// let foo = SomeType::try_from_attr("MyAttribute", Some(attribute))?; /// /// // if `AttributeTryInto` is imported, also: /// let bar = Some(attribute).attr_try_into("MyAttribute")?; /// ``` pub trait TryFromAttribute: Sized { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result; } /// Do NOT implement this trait directly. Implement [`TryFromAttribute`] instead pub trait AttributeTryInto { fn attr_try_into( self, attribute_name: impl Into, ) -> Result; } // Automatic attr_try_into() for all attribute values // that have TryFromAttribute implemented impl AttributeTryInto for Option { fn attr_try_into( self, attribute_name: impl Into, ) -> Result { T::try_from_attr(attribute_name, self) } } /// Helper trait for extracting attributes from a collection pub trait AttributeExtractor { /// Gets an attribute from the map and tries to convert it to the given type /// This method does not consume the raw attribute - it gets cloned /// See [`AttributeExtractor::take_attr`] for a non-cloning method fn get_attr( &self, attribute_name: &str, ) -> Result; /// Takes an attribute from the map and tries to convert it to the given type /// This method consumes the raw attribute - it gets removed from the map /// See [`AttributeExtractor::get_attr`] for a non-mutating method fn take_attr( &mut self, attribute_name: &str, ) -> Result; } impl AttributeExtractor for AttributeMap { fn get_attr( &self, attribute_name: &str, ) -> Result { T::try_from_attr(attribute_name, self.get(attribute_name).cloned()) } fn take_attr( &mut self, attribute_name: &str, ) -> Result { T::try_from_attr(attribute_name, self.remove(attribute_name)) } } impl TryFromAttribute for String { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for bool { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Bool(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for DateTime { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { match &attribute { Some(AttributeValue::S(datetime)) => datetime.parse().map_err(|e| { DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::InvalidTimestamp(e), ) }), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for AttributeMap { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for Vec { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::B(data)) => Ok(data.into_inner()), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for HashSet { fn try_from_attr( attribute_name: impl Into, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::Ss(set)) => Ok(set.into_iter().collect()), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } } impl TryFromAttribute for Vec { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attribute_name = attribute_name.into(); match attribute { Some(AttributeValue::L(list)) => Ok( list .into_iter() .map(|attribute| { T::try_from_attr(format!("{attribute_name}[i]"), Some(attribute)) }) .collect::, _>>()?, ), Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute), DBItemAttributeError::Missing, )), } } } #[deprecated = "Use `String::try_from_attr()` instead"] pub fn parse_string_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { String::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `bool::try_from_attr()` instead"] pub fn parse_bool_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { bool::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `DateTime::::try_from_attr()` instead"] pub fn parse_datetime_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { DateTime::::try_from_attr(attribute_name, attribute_value) } #[deprecated = "Use `AttributeMap::try_from_attr()` instead"] pub fn parse_map_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result { attribute_value.attr_try_into(attribute_name) } pub fn parse_int_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result where T: FromStr, { match &attribute_value { Some(AttributeValue::N(numeric_str)) => { parse_integer(attribute_name, numeric_str) } Some(_) => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.into(), Value::AttributeValue(attribute_value), DBItemAttributeError::Missing, )), } } /// Parses the UTC timestamp in milliseconds from a DynamoDB numeric attribute pub fn parse_timestamp_attribute( attribute_name: impl Into, attribute_value: Option, ) -> Result, DBItemError> { let attribute_name: String = attribute_name.into(); let timestamp = parse_int_attribute::( attribute_name.clone(), attribute_value.clone(), )?; let naive_datetime = chrono::NaiveDateTime::from_timestamp_millis(timestamp) .ok_or_else(|| { DBItemError::new( attribute_name, Value::AttributeValue(attribute_value), DBItemAttributeError::TimestampOutOfRange, ) })?; Ok(DateTime::from_naive_utc_and_offset(naive_datetime, Utc)) } pub fn parse_integer( attribute_name: impl Into, attribute_value: &str, ) -> Result where T: FromStr, { attribute_value.parse::().map_err(|e| { DBItemError::new( attribute_name.into(), Value::String(attribute_value.into()), DBItemAttributeError::InvalidNumberFormat(e), ) }) } pub mod batch_operations { use aws_sdk_dynamodb::{ error::SdkError, operation::batch_write_item::BatchWriteItemError, types::{KeysAndAttributes, WriteRequest}, Error as DynamoDBError, }; use rand::Rng; use std::time::Duration; use tracing::{debug, trace}; use super::AttributeMap; /// DynamoDB hard limit for single BatchWriteItem request const SINGLE_BATCH_WRITE_ITEM_LIMIT: usize = 25; const SINGLE_BATCH_GET_ITEM_LIMIT: usize = 100; /// Exponential backoff configuration for batch write operation #[derive(derive_more::Constructor, Debug)] pub struct ExponentialBackoffConfig { /// Maximum retry attempts before the function fails. /// Set this to 0 to disable exponential backoff. /// Defaults to **8**. pub max_attempts: u32, /// Base wait duration before retry. Defaults to **25ms**. /// It is doubled with each attempt: 25ms, 50, 100, 200... pub base_duration: Duration, /// Jitter factor for retry delay. Factor 0.5 for 100ms delay /// means that wait time will be between 50ms and 150ms. /// The value must be in range 0.0 - 1.0. It will be clamped /// if out of these bounds. Defaults to **0.3** pub jitter_factor: f32, /// Retry on [`ProvisionedThroughputExceededException`]. /// Defaults to **true**. /// /// [`ProvisionedThroughputExceededException`]: aws_sdk_dynamodb::Error::ProvisionedThroughputExceededException pub retry_on_provisioned_capacity_exceeded: bool, } impl Default for ExponentialBackoffConfig { fn default() -> Self { ExponentialBackoffConfig { max_attempts: 8, base_duration: Duration::from_millis(25), jitter_factor: 0.3, retry_on_provisioned_capacity_exceeded: true, } } } impl ExponentialBackoffConfig { fn new_counter(&self) -> ExponentialBackoffHelper { ExponentialBackoffHelper::new(self) } fn backoff_enabled(&self) -> bool { self.max_attempts > 0 } fn should_retry_on_capacity_exceeded(&self) -> bool { self.backoff_enabled() && self.retry_on_provisioned_capacity_exceeded } } #[tracing::instrument(name = "batch_get", skip(ddb, primary_keys, config))] pub async fn batch_get( ddb: &aws_sdk_dynamodb::Client, table_name: &str, primary_keys: K, projection_expression: Option, config: ExponentialBackoffConfig, ) -> Result, super::Error> where K: IntoIterator, K::Item: Into, { let mut primary_keys: Vec<_> = primary_keys.into_iter().map(Into::into).collect(); let mut results = Vec::with_capacity(primary_keys.len()); tracing::debug!( ?config, "Starting batch read operation of {} items...", primary_keys.len() ); let mut exponential_backoff = config.new_counter(); let mut backup = Vec::with_capacity(SINGLE_BATCH_GET_ITEM_LIMIT); loop { let items_to_drain = std::cmp::min(primary_keys.len(), SINGLE_BATCH_GET_ITEM_LIMIT); let chunk = primary_keys.drain(..items_to_drain).collect::>(); if chunk.is_empty() { // No more items tracing::trace!("No more items to process. Exiting"); break; } // we don't need the backup when we don't retry if config.should_retry_on_capacity_exceeded() { chunk.clone_into(&mut backup); } tracing::trace!("Attempting to get chunk of {} items...", chunk.len()); let result = ddb .batch_get_item() .request_items( table_name, KeysAndAttributes::builder() .set_keys(Some(chunk)) .consistent_read(true) .set_projection_expression(projection_expression.clone()) .build(), ) .send() .await; match result { Ok(output) => { if let Some(mut responses) = output.responses { if let Some(items) = responses.remove(table_name) { tracing::trace!("Successfully read {} items", items.len()); results.extend(items); } } else { tracing::warn!("Responses was None"); } if let Some(mut unprocessed) = output.unprocessed_keys { let keys_to_retry = match unprocessed.remove(table_name) { Some(KeysAndAttributes { keys: Some(keys), .. }) if !keys.is_empty() => keys, _ => { tracing::trace!("Chunk read successfully. Continuing."); exponential_backoff.reset(); continue; } }; exponential_backoff.sleep_and_retry().await?; tracing::debug!( "Some items failed. Retrying {} requests", keys_to_retry.len() ); primary_keys.extend(keys_to_retry); } else { tracing::trace!("Unprocessed items was None"); } } Err(error) => { let error: DynamoDBError = error.into(); if !matches!( error, DynamoDBError::ProvisionedThroughputExceededException(_) ) { tracing::error!("BatchGetItem failed: {0:?} - {0}", error); return Err(error.into()); } tracing::warn!("Provisioned capacity exceeded!"); if !config.retry_on_provisioned_capacity_exceeded { return Err(error.into()); } exponential_backoff.sleep_and_retry().await?; primary_keys.append(&mut backup); trace!("Retrying now..."); } }; } debug!("Batch read completed."); Ok(results) } /// Performs a single DynamoDB table batch write operation. If the batch /// contains more than 25 items, it is split into chunks. /// /// The function uses exponential backoff retries when AWS throttles /// the request or maximum provisioned capacity is exceeded #[tracing::instrument(name = "batch_write", skip(ddb, requests, config))] pub async fn batch_write( ddb: &aws_sdk_dynamodb::Client, table_name: &str, mut requests: Vec, config: ExponentialBackoffConfig, ) -> Result<(), super::Error> { tracing::debug!( ?config, "Starting batch write operation of {} items...", requests.len() ); let mut exponential_backoff = config.new_counter(); let mut backup = Vec::with_capacity(SINGLE_BATCH_WRITE_ITEM_LIMIT); loop { let items_to_drain = std::cmp::min(requests.len(), SINGLE_BATCH_WRITE_ITEM_LIMIT); let chunk = requests.drain(..items_to_drain).collect::>(); if chunk.is_empty() { // No more items tracing::trace!("No more items to process. Exiting"); break; } // we don't need the backup when we don't retry if config.should_retry_on_capacity_exceeded() { chunk.clone_into(&mut backup); } tracing::trace!("Attempting to write chunk of {} items...", chunk.len()); let result = ddb .batch_write_item() .request_items(table_name, chunk) .send() .await; match result { Ok(output) => { if let Some(mut items) = output.unprocessed_items { let requests_to_retry = items.remove(table_name).unwrap_or_default(); if requests_to_retry.is_empty() { tracing::trace!("Chunk written successfully. Continuing."); exponential_backoff.reset(); continue; } exponential_backoff.sleep_and_retry().await?; tracing::debug!( "Some items failed. Retrying {} requests", requests_to_retry.len() ); requests.extend(requests_to_retry); } else { tracing::trace!("Unprocessed items was None"); } } Err(error) => { if !is_provisioned_capacity_exceeded(&error) { tracing::error!("BatchWriteItem failed: {0:?} - {0}", error); return Err(super::Error::AwsSdk(error.into())); } tracing::warn!("Provisioned capacity exceeded!"); if !config.retry_on_provisioned_capacity_exceeded { return Err(super::Error::AwsSdk(error.into())); } exponential_backoff.sleep_and_retry().await?; requests.append(&mut backup); trace!("Retrying now..."); } }; } debug!("Batch write completed."); Ok(()) } /// internal helper struct struct ExponentialBackoffHelper<'cfg> { config: &'cfg ExponentialBackoffConfig, attempt: u32, } impl<'cfg> ExponentialBackoffHelper<'cfg> { fn new(config: &'cfg ExponentialBackoffConfig) -> Self { ExponentialBackoffHelper { config, attempt: 0 } } /// reset counter after successfull operation fn reset(&mut self) { self.attempt = 0; } /// increase counter and sleep in case of failure async fn sleep_and_retry(&mut self) -> Result<(), super::Error> { let jitter_factor = 1f32.min(0f32.max(self.config.jitter_factor)); let random_multiplier = 1.0 + rand::thread_rng().gen_range(-jitter_factor..=jitter_factor); let backoff_multiplier = 2u32.pow(self.attempt); let base_duration = self.config.base_duration * backoff_multiplier; let sleep_duration = base_duration.mul_f32(random_multiplier); self.attempt += 1; if self.attempt > self.config.max_attempts { tracing::warn!("Retry limit exceeded!"); return Err(super::Error::MaxRetriesExceeded); } tracing::debug!( attempt = self.attempt, "Batch failed. Sleeping for {}ms before retrying...", sleep_duration.as_millis() ); tokio::time::sleep(sleep_duration).await; Ok(()) } } /// Check if transaction failed due to /// `ProvisionedThroughputExceededException` exception fn is_provisioned_capacity_exceeded( err: &SdkError, ) -> bool { let SdkError::ServiceError(service_error) = err else { return false; }; matches!( service_error.err(), BatchWriteItemError::ProvisionedThroughputExceededException(_) ) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_integer() { assert!(parse_integer::("some_attr", "123").is_ok()); assert!(parse_integer::("negative", "-123").is_ok()); assert!(parse_integer::("float", "3.14").is_err()); assert!(parse_integer::("NaN", "foo").is_err()); assert!(parse_integer::("negative_uint", "-123").is_err()); assert!(parse_integer::("too_large", "65536").is_err()); } #[test] fn test_parse_timestamp() { let timestamp = Utc::now().timestamp_millis(); let attr = AttributeValue::N(timestamp.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_ok()); assert_eq!(parsed_timestamp.unwrap().timestamp_millis(), timestamp); } #[test] fn test_parse_invalid_timestamp() { let attr = AttributeValue::N("foo".to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); } #[test] fn test_parse_timestamp_out_of_range() { let attr = AttributeValue::N(i64::MAX.to_string()); let parsed_timestamp = parse_timestamp_attribute("some_attr", Some(attr)); assert!(parsed_timestamp.is_err()); assert!(matches!( parsed_timestamp.unwrap_err().attribute_error, DBItemAttributeError::TimestampOutOfRange )); } }