diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -315,14 +315,14 @@ ); } - if let Some(fid) = farcaster_id { + if let Some(fid) = farcaster_id.clone() { user.insert( USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME.to_string(), AttributeValue::S(fid), ); } - if let Some(token) = farcaster_dcs_token { + if let Some(token) = farcaster_dcs_token.clone() { user.insert( USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME.to_string(), AttributeValue::S(token), @@ -363,13 +363,50 @@ .delete(delete_user_from_reserved_usernames) .build(); + let mut transact_items = vec![ + put_user_operation, + delete_user_from_reserved_usernames_operation, + ]; + + // Add entry to farcaster_tokens table if farcaster data is provided + if let (Some(fid), Some(token)) = (farcaster_id, farcaster_dcs_token) { + use comm_lib::database::shared_tables::farcaster_tokens; + + let farcaster_item = HashMap::from([ + ( + farcaster_tokens::PARTITION_KEY.to_string(), + AttributeValue::S(user_id.clone()), + ), + ( + farcaster_tokens::FARCASTER_ID.to_string(), + AttributeValue::S(fid), + ), + ( + farcaster_tokens::FARCASTER_DCS_TOKEN.to_string(), + AttributeValue::S(token), + ), + ]); + + let put_farcaster_token = Put::builder() + .table_name(farcaster_tokens::TABLE_NAME) + .set_item(Some(farcaster_item)) + // make sure we don't accidentally overwrite existing row + .condition_expression("attribute_not_exists(#pk)") + .expression_attribute_names("#pk", farcaster_tokens::PARTITION_KEY) + .build() + .expect("key, update_expression or table_name not set in Put builder"); + + let put_farcaster_token_operation = TransactWriteItem::builder() + .put(put_farcaster_token) + .build(); + + transact_items.push(put_farcaster_token_operation); + } + self .client .transact_write_items() - .set_transact_items(Some(vec![ - put_user_operation, - delete_user_from_reserved_usernames_operation, - ])) + .set_transact_items(Some(transact_items)) .send() .await .map_err(|e| { @@ -598,6 +635,9 @@ debug!(user_id, "Attempting to delete user's access tokens"); self.delete_all_tokens_for_user(&user_id).await?; + debug!(user_id, "Attempting to delete user's farcaster tokens"); + self.delete_farcaster_tokens_for_user(&user_id).await?; + debug!(user_id, "Attempting to delete user"); match self .client diff --git a/services/identity/src/database/farcaster.rs b/services/identity/src/database/farcaster.rs --- a/services/identity/src/database/farcaster.rs +++ b/services/identity/src/database/farcaster.rs @@ -114,28 +114,91 @@ user_id: String, farcaster_dcs_token: String, ) -> Result<(), Error> { - let update_expression = format!( - "SET {0} = :val", - USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME, - ); + use comm_lib::aws::ddb::types::{Put, TransactWriteItem, Update}; + use comm_lib::database::shared_tables::farcaster_tokens; - self + // First, get the current farcaster_id to create farcaster_tokens entry + let user_item = self .client - .update_item() + .get_item() .table_name(USERS_TABLE) - .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) - .update_expression(update_expression) + .key( + USERS_TABLE_PARTITION_KEY, + AttributeValue::S(user_id.clone()), + ) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))?; + + let Some(mut user) = user_item.item else { + return Err(Error::MissingItem); + }; + + let farcaster_id: String = user + .take_attr(USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME) + .map_err(|_| Error::MissingItem)?; // User must have farcaster_id first + + // Update users table with DCS token + let update_users = Update::builder() + .table_name(USERS_TABLE) + .key( + USERS_TABLE_PARTITION_KEY, + AttributeValue::S(user_id.clone()), + ) + .update_expression(format!( + "SET {0} = :val", + USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME + )) .expression_attribute_values( ":val", AttributeValue::S(farcaster_dcs_token.clone()), ) - .return_values(ReturnValue::UpdatedNew) + .build() + .expect("Failed to build Update for users table"); + + let update_users_operation = + TransactWriteItem::builder().update(update_users).build(); + + // Create farcaster_tokens entry + let farcaster_item = std::collections::HashMap::from([ + ( + farcaster_tokens::PARTITION_KEY.to_string(), + AttributeValue::S(user_id.clone()), + ), + ( + farcaster_tokens::FARCASTER_ID.to_string(), + AttributeValue::S(farcaster_id), + ), + ( + farcaster_tokens::FARCASTER_DCS_TOKEN.to_string(), + AttributeValue::S(farcaster_dcs_token), + ), + ]); + + let put_farcaster_token = Put::builder() + .table_name(farcaster_tokens::TABLE_NAME) + .set_item(Some(farcaster_item)) + .build() + .expect("Failed to build Put for farcaster_tokens table"); + + let put_farcaster_token_operation = TransactWriteItem::builder() + .put(put_farcaster_token) + .build(); + + // Execute both operations atomically + self + .client + .transact_write_items() + .set_transact_items(Some(vec![ + update_users_operation, + put_farcaster_token_operation, + ])) .send() .await .map_err(|e| { error!( errorType = error_types::FARCASTER_DB_LOG, - "DDB client failed to add Farcaster DCs token: {:?}", e + "Transaction failed to add Farcaster DCs token: {:?}", e ); Error::AwsSdk(e.into()) })?; @@ -144,30 +207,100 @@ } pub async fn unlink_farcaster(&self, user_id: String) -> Result<(), Error> { + use comm_lib::aws::ddb::types::{Delete, TransactWriteItem, Update}; + use comm_lib::database::shared_tables::farcaster_tokens; + + // Remove farcaster data from users table let update_expression = format!( "REMOVE {}, {}", USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME ); - self - .client - .update_item() + let update_users = Update::builder() .table_name(USERS_TABLE) - .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) + .key( + USERS_TABLE_PARTITION_KEY, + AttributeValue::S(user_id.clone()), + ) .update_expression(update_expression) + .build() + .expect("Failed to build Update for users table"); + + let update_users_operation = + TransactWriteItem::builder().update(update_users).build(); + + // Remove from farcaster_tokens table + let delete_farcaster_token = Delete::builder() + .table_name(farcaster_tokens::TABLE_NAME) + .key( + farcaster_tokens::PARTITION_KEY, + AttributeValue::S(user_id.clone()), + ) + .build() + .expect("Failed to build Delete for farcaster_tokens table"); + + let delete_farcaster_token_operation = TransactWriteItem::builder() + .delete(delete_farcaster_token) + .build(); + + // Execute both operations atomically + self + .client + .transact_write_items() + .set_transact_items(Some(vec![ + update_users_operation, + delete_farcaster_token_operation, + ])) .send() .await .map_err(|e| { error!( errorType = error_types::FARCASTER_DB_LOG, - "DDB client failed to remove farcasterID: {:?}", e + "Transaction failed to unlink farcaster: {:?}", e ); Error::AwsSdk(e.into()) })?; Ok(()) } + + #[tracing::instrument(skip_all)] + pub async fn delete_farcaster_tokens_for_user( + &self, + user_id: &str, + ) -> Result<(), Error> { + use comm_lib::database::shared_tables::farcaster_tokens; + + match self + .client + .delete_item() + .table_name(farcaster_tokens::TABLE_NAME) + .key( + farcaster_tokens::PARTITION_KEY, + AttributeValue::S(user_id.to_string()), + ) + .send() + .await + { + Ok(_) => { + tracing::debug!( + "Farcaster token has been deleted for user {}", + user_id + ); + Ok(()) + } + Err(e) => { + error!( + errorType = error_types::FARCASTER_DB_LOG, + "DynamoDB client failed to delete farcaster token for user {}: {}", + user_id, + e + ); + Err(Error::AwsSdk(e.into())) + } + } + } } impl TryFrom for FarcasterUserData {