Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F32197391
D15208.1765103271.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
11 KB
Referenced Files
None
Subscribers
None
D15208.1765103271.diff
View Options
diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs
--- a/services/identity/src/database.rs
+++ b/services/identity/src/database.rs
@@ -315,17 +315,17 @@
);
}
- if let Some(fid) = farcaster_id {
+ if let Some(fid) = &farcaster_id {
user.insert(
USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME.to_string(),
- AttributeValue::S(fid),
+ AttributeValue::S(fid.to_string()),
);
}
- if let Some(token) = farcaster_dcs_token {
+ if let Some(token) = &farcaster_dcs_token {
user.insert(
USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME.to_string(),
- AttributeValue::S(token),
+ AttributeValue::S(token.to_string()),
);
}
@@ -363,13 +363,50 @@
.delete(delete_user_from_reserved_usernames)
.build();
+ let mut transact_items = vec![
+ put_user_operation,
+ delete_user_from_reserved_usernames_operation,
+ ];
+
+ // Add entry to farcaster_tokens table if farcaster data is provided
+ if let (Some(fid), Some(token)) = (farcaster_id, farcaster_dcs_token) {
+ use comm_lib::database::shared_tables::farcaster_tokens;
+
+ let farcaster_item = HashMap::from([
+ (
+ farcaster_tokens::PARTITION_KEY.to_string(),
+ AttributeValue::S(user_id.clone()),
+ ),
+ (
+ farcaster_tokens::FARCASTER_ID.to_string(),
+ AttributeValue::S(fid),
+ ),
+ (
+ farcaster_tokens::FARCASTER_DCS_TOKEN.to_string(),
+ AttributeValue::S(token),
+ ),
+ ]);
+
+ let put_farcaster_token = Put::builder()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .set_item(Some(farcaster_item))
+ // make sure we don't accidentally overwrite existing row
+ .condition_expression("attribute_not_exists(#pk)")
+ .expression_attribute_names("#pk", farcaster_tokens::PARTITION_KEY)
+ .build()
+ .expect("key, update_expression or table_name not set in Put builder");
+
+ let put_farcaster_token_operation = TransactWriteItem::builder()
+ .put(put_farcaster_token)
+ .build();
+
+ transact_items.push(put_farcaster_token_operation);
+ }
+
self
.client
.transact_write_items()
- .set_transact_items(Some(vec![
- put_user_operation,
- delete_user_from_reserved_usernames_operation,
- ]))
+ .set_transact_items(Some(transact_items))
.send()
.await
.map_err(|e| {
@@ -598,6 +635,9 @@
debug!(user_id, "Attempting to delete user's access tokens");
self.delete_all_tokens_for_user(&user_id).await?;
+ debug!(user_id, "Attempting to delete user's farcaster tokens");
+ self.delete_farcaster_tokens_for_user(&user_id).await?;
+
debug!(user_id, "Attempting to delete user");
match self
.client
diff --git a/services/identity/src/database/farcaster.rs b/services/identity/src/database/farcaster.rs
--- a/services/identity/src/database/farcaster.rs
+++ b/services/identity/src/database/farcaster.rs
@@ -1,12 +1,5 @@
-use comm_lib::aws::ddb::types::AttributeValue;
-use comm_lib::aws::ddb::types::ReturnValue;
-use comm_lib::database::AttributeExtractor;
-use comm_lib::database::AttributeMap;
-use comm_lib::database::DBItemAttributeError;
-use comm_lib::database::DBItemError;
-use comm_lib::database::Value;
-use tracing::error;
-
+use super::DatabaseClient;
+use super::Error;
use crate::constants::error_types;
use crate::constants::USERS_TABLE;
use crate::constants::USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME;
@@ -16,9 +9,17 @@
use crate::constants::USERS_TABLE_USERNAME_ATTRIBUTE;
use crate::constants::USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE;
use crate::grpc_services::protos::unauth::FarcasterUser;
-
-use super::DatabaseClient;
-use super::Error;
+use crate::log::redact_sensitive_data;
+use comm_lib::aws::ddb::types::AttributeValue;
+use comm_lib::aws::ddb::types::ReturnValue;
+use comm_lib::aws::DynamoDBError;
+use comm_lib::database::batch_operations::ExponentialBackoffConfig;
+use comm_lib::database::AttributeExtractor;
+use comm_lib::database::DBItemAttributeError;
+use comm_lib::database::DBItemError;
+use comm_lib::database::Value;
+use comm_lib::database::{is_transaction_conflict, AttributeMap};
+use tracing::{error, warn};
pub struct FarcasterUserData(pub FarcasterUser);
@@ -69,9 +70,8 @@
farcaster_id: String,
) -> Result<(), Error> {
let update_expression = format!(
- "SET {0} = if_not_exists({0}, :val) REMOVE {1}",
+ "SET {0} = if_not_exists({0}, :val)",
USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME,
- USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME,
);
let response = self
@@ -114,28 +114,97 @@
user_id: String,
farcaster_dcs_token: String,
) -> Result<(), Error> {
- let update_expression = format!(
- "SET {0} = :val",
- USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME,
- );
+ use comm_lib::aws::ddb::types::{Put, TransactWriteItem, Update};
+ use comm_lib::database::shared_tables::farcaster_tokens;
- self
+ // First, get the current farcaster_id to create farcaster_tokens entry
+ let user_item = self
.client
- .update_item()
+ .get_item()
.table_name(USERS_TABLE)
- .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id))
- .update_expression(update_expression)
+ .key(
+ USERS_TABLE_PARTITION_KEY,
+ AttributeValue::S(user_id.clone()),
+ )
+ .send()
+ .await
+ .map_err(|e| Error::AwsSdk(e.into()))?;
+
+ let Some(mut user) = user_item.item else {
+ return Err(Error::MissingItem);
+ };
+
+ let farcaster_id: String = user
+ .take_attr(USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME)
+ .map_err(|_| {
+ warn!(
+ user_id = redact_sensitive_data(&user_id),
+ "User should have FID defined before assigning DCs token"
+ );
+ Error::MissingItem
+ })?; // User must have farcaster_id first
+
+ // Update users table with DCS token
+ let update_users = Update::builder()
+ .table_name(USERS_TABLE)
+ .key(
+ USERS_TABLE_PARTITION_KEY,
+ AttributeValue::S(user_id.clone()),
+ )
+ .update_expression(format!(
+ "SET {0} = :val",
+ USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME
+ ))
.expression_attribute_values(
":val",
AttributeValue::S(farcaster_dcs_token.clone()),
)
- .return_values(ReturnValue::UpdatedNew)
+ .build()
+ .expect("Failed to build Update for users table");
+
+ let update_users_operation =
+ TransactWriteItem::builder().update(update_users).build();
+
+ // Create farcaster_tokens entry
+ let farcaster_item = std::collections::HashMap::from([
+ (
+ farcaster_tokens::PARTITION_KEY.to_string(),
+ AttributeValue::S(user_id.clone()),
+ ),
+ (
+ farcaster_tokens::FARCASTER_ID.to_string(),
+ AttributeValue::S(farcaster_id),
+ ),
+ (
+ farcaster_tokens::FARCASTER_DCS_TOKEN.to_string(),
+ AttributeValue::S(farcaster_dcs_token),
+ ),
+ ]);
+
+ let put_farcaster_token = Put::builder()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .set_item(Some(farcaster_item))
+ .build()
+ .expect("Failed to build Put for farcaster_tokens table");
+
+ let put_farcaster_token_operation = TransactWriteItem::builder()
+ .put(put_farcaster_token)
+ .build();
+
+ // Execute both operations atomically
+ self
+ .client
+ .transact_write_items()
+ .set_transact_items(Some(vec![
+ update_users_operation,
+ put_farcaster_token_operation,
+ ]))
.send()
.await
.map_err(|e| {
error!(
errorType = error_types::FARCASTER_DB_LOG,
- "DDB client failed to add Farcaster DCs token: {:?}", e
+ "Transaction failed to add Farcaster DCs token: {:?}", e
);
Error::AwsSdk(e.into())
})?;
@@ -144,29 +213,106 @@
}
pub async fn unlink_farcaster(&self, user_id: String) -> Result<(), Error> {
+ use comm_lib::aws::ddb::types::{Delete, TransactWriteItem, Update};
+ use comm_lib::database::shared_tables::farcaster_tokens;
+
+ // Remove farcaster data from users table
let update_expression = format!(
"REMOVE {}, {}",
USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME,
USERS_TABLE_FARCASTER_DCS_TOKEN_ATTRIBUTE_NAME
);
- self
- .client
- .update_item()
+ let update_users = Update::builder()
.table_name(USERS_TABLE)
- .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id))
+ .key(
+ USERS_TABLE_PARTITION_KEY,
+ AttributeValue::S(user_id.clone()),
+ )
.update_expression(update_expression)
+ .build()
+ .expect("Failed to build Update for users table");
+
+ let update_users_operation =
+ TransactWriteItem::builder().update(update_users).build();
+
+ // Remove from farcaster_tokens table
+ let delete_farcaster_token = Delete::builder()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .key(farcaster_tokens::PARTITION_KEY, AttributeValue::S(user_id))
+ .build()
+ .expect("Failed to build Delete for farcaster_tokens table");
+
+ let delete_farcaster_token_operation = TransactWriteItem::builder()
+ .delete(delete_farcaster_token)
+ .build();
+
+ // Execute both operations atomically
+ let transaction = self
+ .client
+ .transact_write_items()
+ .transact_items(update_users_operation)
+ .transact_items(delete_farcaster_token_operation);
+
+ let retry_config = ExponentialBackoffConfig::default();
+ let mut exponential_backoff = retry_config.new_counter();
+
+ loop {
+ let result = transaction.clone().send().await;
+ match result {
+ Ok(_) => return Ok(()),
+ Err(err) => match DynamoDBError::from(err) {
+ ref conflict_err if is_transaction_conflict(conflict_err) => {
+ warn!("Encountered transaction conflict while while unlinking farcaster - retrying");
+ exponential_backoff.sleep_and_retry().await?;
+ }
+ error => {
+ error!(
+ errorType = error_types::FARCASTER_DB_LOG,
+ "Transaction failed to unlink farcaster: {:?}", error
+ );
+ return Err(error.into());
+ }
+ },
+ }
+ }
+ }
+
+ #[tracing::instrument(skip_all)]
+ pub async fn delete_farcaster_tokens_for_user(
+ &self,
+ user_id: &str,
+ ) -> Result<(), Error> {
+ use comm_lib::database::shared_tables::farcaster_tokens;
+
+ match self
+ .client
+ .delete_item()
+ .table_name(farcaster_tokens::TABLE_NAME)
+ .key(
+ farcaster_tokens::PARTITION_KEY,
+ AttributeValue::S(user_id.to_string()),
+ )
.send()
.await
- .map_err(|e| {
+ {
+ Ok(_) => {
+ tracing::debug!(
+ "Farcaster token has been deleted for user {}",
+ user_id
+ );
+ Ok(())
+ }
+ Err(e) => {
error!(
errorType = error_types::FARCASTER_DB_LOG,
- "DDB client failed to remove farcasterID: {:?}", e
+ "DynamoDB client failed to delete farcaster token for user {}: {:?}",
+ user_id,
+ e
);
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
+ Err(Error::AwsSdk(e.into()))
+ }
+ }
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Dec 7, 10:27 AM (21 h, 21 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5843530
Default Alt Text
D15208.1765103271.diff (11 KB)
Attached To
Mode
D15208: [identity] update Farcaster tokens table when updating users table
Attached
Detach File
Event Timeline
Log In to Comment