Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3361155
D11674.id39182.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Referenced Files
None
Subscribers
None
D11674.id39182.diff
View Options
diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs
--- a/services/identity/src/database.rs
+++ b/services/identity/src/database.rs
@@ -1,48 +1,31 @@
-use comm_lib::aws::ddb::types::Delete;
+use comm_lib::aws::ddb::{
+ operation::{
+ delete_item::DeleteItemOutput, get_item::GetItemOutput,
+ put_item::PutItemOutput, query::QueryOutput,
+ },
+ primitives::Blob,
+ types::{AttributeValue, PutRequest, WriteRequest},
+};
use comm_lib::aws::{AwsConfig, DynamoDBClient};
use comm_lib::database::{
AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError,
TryFromAttribute,
};
-use comm_lib::{
- aws::{
- ddb::{
- operation::{
- delete_item::DeleteItemOutput, get_item::GetItemOutput,
- put_item::PutItemOutput, query::QueryOutput,
- },
- primitives::Blob,
- types::{
- AttributeValue, PutRequest, TransactWriteItem, Update, WriteRequest,
- },
- },
- DynamoDBError,
- },
- database::parse_int_attribute,
-};
use constant_time_eq::constant_time_eq;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
pub use crate::database::device_list::DeviceIDAttribute;
-use crate::ddb_utils::into_one_time_update_requests;
-use crate::olm::is_valid_olm_key;
use crate::{
constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME,
ddb_utils::EthereumIdentity, reserved_users::UserDetail, siwe::SocialProof,
};
use crate::{
- ddb_utils::{
- create_one_time_key_partition_key, into_one_time_put_requests,
- is_transaction_retryable, Identifier, OlmAccountType,
- },
+ ddb_utils::{Identifier, OlmAccountType},
grpc_services::protos,
};
-use crate::{
- error::{consume_error, Error},
- grpc_utils::DeviceKeysInfo,
-};
+use crate::{error::Error, grpc_utils::DeviceKeysInfo};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, warn};
@@ -56,8 +39,7 @@
ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE,
NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE,
NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY,
- ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, RESERVED_USERNAMES_TABLE,
- RESERVED_USERNAMES_TABLE_PARTITION_KEY,
+ RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY,
RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE,
USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME,
USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY,
@@ -72,6 +54,7 @@
mod device_list;
mod farcaster;
+mod one_time_keys;
mod workflows;
pub use device_list::{DeviceListRow, DeviceListUpdate, DeviceRow};
@@ -428,292 +411,6 @@
Ok(maybe_keyserver_device_id)
}
- /// Will "mint" a single one-time key by attempting to successfully delete a
- /// key
- pub async fn get_one_time_key(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<Option<String>, Error> {
- use crate::constants::devices_table;
- use crate::constants::one_time_keys_table as otk_table;
- use crate::constants::retry;
- use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD;
-
- let attr_otk_count = match account_type {
- OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
- OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
- };
-
- fn spawn_refresh_keys_task(device_id: &str) {
- // Clone the string slice to move into the async block
- let device_id = device_id.to_string();
- tokio::spawn(async move {
- debug!("Attempting to request more keys for device: {}", &device_id);
- let result =
- crate::tunnelbroker::send_refresh_keys_request(&device_id).await;
- consume_error(result);
- });
- }
-
- // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
- // in `comm-lib` to handle transactions with retries
- let mut attempt = 0;
-
- loop {
- attempt += 1;
- if attempt > retry::MAX_ATTEMPTS {
- return Err(Error::MaxRetriesExceeded);
- }
-
- let otk_count =
- self.get_otk_count(user_id, device_id, account_type).await?;
- if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD {
- spawn_refresh_keys_task(device_id);
- }
- if otk_count < 1 {
- return Ok(None);
- }
-
- let query_result = self
- .get_one_time_keys(user_id, device_id, account_type)
- .await?;
- let mut items = query_result.items.unwrap_or_default();
- let mut item = items.pop().unwrap_or_default();
- let pk = item.take_attr(otk_table::PARTITION_KEY)?;
- let sk = item.take_attr(otk_table::SORT_KEY)?;
- let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?;
-
- let delete_otk = Delete::builder()
- .table_name(otk_table::NAME)
- .key(otk_table::PARTITION_KEY, AttributeValue::S(pk))
- .key(otk_table::SORT_KEY, AttributeValue::S(sk))
- .build();
-
- let delete_otk_operation =
- TransactWriteItem::builder().delete(delete_otk).build();
-
- let update_otk_count = Update::builder()
- .table_name(devices_table::NAME)
- .key(
- devices_table::ATTR_USER_ID,
- AttributeValue::S(user_id.to_string()),
- )
- .key(
- devices_table::ATTR_ITEM_ID,
- DeviceIDAttribute(device_id.into()).into(),
- )
- .update_expression(format!("ADD {} :decrement_val", attr_otk_count))
- .expression_attribute_values(
- ":decrement_val",
- AttributeValue::N("-1".to_string()),
- )
- .condition_expression(format!("{} = :old_val", attr_otk_count))
- .expression_attribute_values(
- ":old_val",
- AttributeValue::N(otk_count.to_string()),
- )
- .build();
-
- let update_otk_count_operation = TransactWriteItem::builder()
- .update(update_otk_count)
- .build();
-
- let transaction = self
- .client
- .transact_write_items()
- .set_transact_items(Some(vec![
- delete_otk_operation,
- update_otk_count_operation,
- ]))
- .send()
- .await;
-
- match transaction {
- Ok(_) => return Ok(Some(otk)),
- Err(e) => {
- let dynamo_db_error = DynamoDBError::from(e);
- let retryable_codes = HashSet::from([
- retry::CONDITIONAL_CHECK_FAILED,
- retry::TRANSACTION_CONFLICT,
- ]);
- if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
- info!("Encountered transaction conflict while retrieving one-time key - retrying");
- } else {
- error!(
- "One-time key retrieval transaction failed: {:?}",
- dynamo_db_error
- );
- return Err(Error::AwsSdk(dynamo_db_error));
- }
- }
- }
- }
- }
-
- pub async fn get_one_time_keys(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<QueryOutput, Error> {
- use crate::constants::one_time_keys_table::*;
-
- let partition_key =
- create_one_time_key_partition_key(user_id, device_id, account_type);
-
- self
- .client
- .query()
- .table_name(NAME)
- .key_condition_expression("#pk = :pk")
- .expression_attribute_names("#pk", PARTITION_KEY)
- .expression_attribute_values(":pk", AttributeValue::S(partition_key))
- .limit(1)
- .send()
- .await
- .map_err(|e| Error::AwsSdk(e.into()))
- }
-
- pub async fn append_one_time_prekeys(
- &self,
- user_id: &str,
- device_id: &str,
- content_one_time_keys: &Vec<String>,
- notif_one_time_keys: &Vec<String>,
- ) -> Result<(), Error> {
- use crate::constants::retry;
-
- let num_content_keys = content_one_time_keys.len();
- let num_notif_keys = notif_one_time_keys.len();
-
- if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
- || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
- {
- return Err(Error::OneTimeKeyUploadLimitExceeded);
- }
-
- if content_one_time_keys
- .iter()
- .any(|otk| !is_valid_olm_key(otk))
- || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk))
- {
- debug!("Invalid one-time key format");
- return Err(Error::InvalidFormat);
- }
-
- let current_time = chrono::Utc::now();
-
- let content_otk_requests = into_one_time_put_requests(
- user_id,
- device_id,
- content_one_time_keys,
- OlmAccountType::Content,
- current_time,
- );
- let notif_otk_requests = into_one_time_put_requests(
- user_id,
- device_id,
- notif_one_time_keys,
- OlmAccountType::Notification,
- current_time,
- );
-
- let update_otk_count_operation = into_one_time_update_requests(
- user_id,
- device_id,
- num_content_keys,
- num_notif_keys,
- );
-
- let mut operations = Vec::new();
- operations.extend_from_slice(&content_otk_requests);
- operations.extend_from_slice(¬if_otk_requests);
- operations.push(update_otk_count_operation);
-
- // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
- // in `comm-lib` to handle transactions with retries
- let mut attempt = 0;
-
- loop {
- attempt += 1;
- if attempt > retry::MAX_ATTEMPTS {
- return Err(Error::MaxRetriesExceeded);
- }
-
- let transaction = self
- .client
- .transact_write_items()
- .set_transact_items(Some(operations.clone()))
- .send()
- .await;
-
- match transaction {
- Ok(_) => break,
- Err(e) => {
- let dynamo_db_error = DynamoDBError::from(e);
- let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]);
- if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
- info!("Encountered transaction conflict while uploading one-time keys - retrying");
- } else {
- error!(
- "One-time key upload transaction failed: {:?}",
- dynamo_db_error
- );
- return Err(Error::AwsSdk(dynamo_db_error));
- }
- }
- }
- }
-
- Ok(())
- }
-
- async fn get_otk_count(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<usize, Error> {
- use crate::constants::devices_table;
-
- let attr_name = match account_type {
- OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
- OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
- };
-
- let response = self
- .client
- .get_item()
- .table_name(devices_table::NAME)
- .projection_expression(attr_name)
- .key(
- devices_table::ATTR_USER_ID,
- AttributeValue::S(user_id.to_string()),
- )
- .key(
- devices_table::ATTR_ITEM_ID,
- DeviceIDAttribute(device_id.into()).into(),
- )
- .send()
- .await
- .map_err(|e| {
- error!("Failed to get user's OTK count: {:?}", e);
- Error::AwsSdk(e.into())
- })?;
-
- let mut user_item = response.item.unwrap_or_default();
- match parse_int_attribute(attr_name, user_item.remove(attr_name)) {
- Ok(num) => Ok(num),
- Err(DBItemError {
- attribute_error: DBItemAttributeError::Missing,
- ..
- }) => Ok(0),
- Err(e) => Err(Error::Attribute(e)),
- }
- }
-
pub async fn update_user_password(
&self,
user_id: String,
diff --git a/services/identity/src/database/one_time_keys.rs b/services/identity/src/database/one_time_keys.rs
new file mode 100644
--- /dev/null
+++ b/services/identity/src/database/one_time_keys.rs
@@ -0,0 +1,316 @@
+use std::collections::HashSet;
+
+use comm_lib::{
+ aws::{
+ ddb::{
+ operation::query::QueryOutput,
+ types::{AttributeValue, Delete, TransactWriteItem, Update},
+ },
+ DynamoDBError,
+ },
+ database::{
+ parse_int_attribute, AttributeExtractor, DBItemAttributeError, DBItemError,
+ },
+};
+use tracing::{debug, error, info};
+
+use crate::{
+ constants::ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT,
+ database::DeviceIDAttribute,
+ ddb_utils::{
+ create_one_time_key_partition_key, into_one_time_put_requests,
+ into_one_time_update_requests, is_transaction_retryable, OlmAccountType,
+ },
+ error::{consume_error, Error},
+ olm::is_valid_olm_key,
+};
+
+use super::DatabaseClient;
+
+impl DatabaseClient {
+ /// Will "mint" a single one-time key by attempting to successfully delete a
+ /// key
+ pub(super) async fn get_one_time_key(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<Option<String>, Error> {
+ use crate::constants::devices_table;
+ use crate::constants::one_time_keys_table as otk_table;
+ use crate::constants::retry;
+ use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD;
+
+ let attr_otk_count = match account_type {
+ OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
+ OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
+ };
+
+ fn spawn_refresh_keys_task(device_id: &str) {
+ // Clone the string slice to move into the async block
+ let device_id = device_id.to_string();
+ tokio::spawn(async move {
+ debug!("Attempting to request more keys for device: {}", &device_id);
+ let result =
+ crate::tunnelbroker::send_refresh_keys_request(&device_id).await;
+ consume_error(result);
+ });
+ }
+
+ // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
+ // in `comm-lib` to handle transactions with retries
+ let mut attempt = 0;
+
+ loop {
+ attempt += 1;
+ if attempt > retry::MAX_ATTEMPTS {
+ return Err(Error::MaxRetriesExceeded);
+ }
+
+ let otk_count =
+ self.get_otk_count(user_id, device_id, account_type).await?;
+ if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD {
+ spawn_refresh_keys_task(device_id);
+ }
+ if otk_count < 1 {
+ return Ok(None);
+ }
+
+ let query_result = self
+ .get_one_time_keys(user_id, device_id, account_type)
+ .await?;
+ let mut items = query_result.items.unwrap_or_default();
+ let mut item = items.pop().unwrap_or_default();
+ let pk = item.take_attr(otk_table::PARTITION_KEY)?;
+ let sk = item.take_attr(otk_table::SORT_KEY)?;
+ let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?;
+
+ let delete_otk = Delete::builder()
+ .table_name(otk_table::NAME)
+ .key(otk_table::PARTITION_KEY, AttributeValue::S(pk))
+ .key(otk_table::SORT_KEY, AttributeValue::S(sk))
+ .build();
+
+ let delete_otk_operation =
+ TransactWriteItem::builder().delete(delete_otk).build();
+
+ let update_otk_count = Update::builder()
+ .table_name(devices_table::NAME)
+ .key(
+ devices_table::ATTR_USER_ID,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .key(
+ devices_table::ATTR_ITEM_ID,
+ DeviceIDAttribute(device_id.into()).into(),
+ )
+ .update_expression(format!("ADD {} :decrement_val", attr_otk_count))
+ .expression_attribute_values(
+ ":decrement_val",
+ AttributeValue::N("-1".to_string()),
+ )
+ .condition_expression(format!("{} = :old_val", attr_otk_count))
+ .expression_attribute_values(
+ ":old_val",
+ AttributeValue::N(otk_count.to_string()),
+ )
+ .build();
+
+ let update_otk_count_operation = TransactWriteItem::builder()
+ .update(update_otk_count)
+ .build();
+
+ let transaction = self
+ .client
+ .transact_write_items()
+ .set_transact_items(Some(vec![
+ delete_otk_operation,
+ update_otk_count_operation,
+ ]))
+ .send()
+ .await;
+
+ match transaction {
+ Ok(_) => return Ok(Some(otk)),
+ Err(e) => {
+ let dynamo_db_error = DynamoDBError::from(e);
+ let retryable_codes = HashSet::from([
+ retry::CONDITIONAL_CHECK_FAILED,
+ retry::TRANSACTION_CONFLICT,
+ ]);
+ if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
+ info!("Encountered transaction conflict while retrieving one-time key - retrying");
+ } else {
+ error!(
+ "One-time key retrieval transaction failed: {:?}",
+ dynamo_db_error
+ );
+ return Err(Error::AwsSdk(dynamo_db_error));
+ }
+ }
+ }
+ }
+ }
+
+ async fn get_one_time_keys(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<QueryOutput, Error> {
+ use crate::constants::one_time_keys_table::*;
+
+ let partition_key =
+ create_one_time_key_partition_key(user_id, device_id, account_type);
+
+ self
+ .client
+ .query()
+ .table_name(NAME)
+ .key_condition_expression("#pk = :pk")
+ .expression_attribute_names("#pk", PARTITION_KEY)
+ .expression_attribute_values(":pk", AttributeValue::S(partition_key))
+ .limit(1)
+ .send()
+ .await
+ .map_err(|e| Error::AwsSdk(e.into()))
+ }
+
+ pub async fn append_one_time_prekeys(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ content_one_time_keys: &Vec<String>,
+ notif_one_time_keys: &Vec<String>,
+ ) -> Result<(), Error> {
+ use crate::constants::retry;
+
+ let num_content_keys = content_one_time_keys.len();
+ let num_notif_keys = notif_one_time_keys.len();
+
+ if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
+ || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
+ {
+ return Err(Error::OneTimeKeyUploadLimitExceeded);
+ }
+
+ if content_one_time_keys
+ .iter()
+ .any(|otk| !is_valid_olm_key(otk))
+ || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk))
+ {
+ debug!("Invalid one-time key format");
+ return Err(Error::InvalidFormat);
+ }
+
+ let current_time = chrono::Utc::now();
+
+ let content_otk_requests = into_one_time_put_requests(
+ user_id,
+ device_id,
+ content_one_time_keys,
+ OlmAccountType::Content,
+ current_time,
+ );
+ let notif_otk_requests = into_one_time_put_requests(
+ user_id,
+ device_id,
+ notif_one_time_keys,
+ OlmAccountType::Notification,
+ current_time,
+ );
+
+ let update_otk_count_operation = into_one_time_update_requests(
+ user_id,
+ device_id,
+ num_content_keys,
+ num_notif_keys,
+ );
+
+ let mut operations = Vec::new();
+ operations.extend_from_slice(&content_otk_requests);
+ operations.extend_from_slice(¬if_otk_requests);
+ operations.push(update_otk_count_operation);
+
+ // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
+ // in `comm-lib` to handle transactions with retries
+ let mut attempt = 0;
+
+ loop {
+ attempt += 1;
+ if attempt > retry::MAX_ATTEMPTS {
+ return Err(Error::MaxRetriesExceeded);
+ }
+
+ let transaction = self
+ .client
+ .transact_write_items()
+ .set_transact_items(Some(operations.clone()))
+ .send()
+ .await;
+
+ match transaction {
+ Ok(_) => break,
+ Err(e) => {
+ let dynamo_db_error = DynamoDBError::from(e);
+ let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]);
+ if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
+ info!("Encountered transaction conflict while uploading one-time keys - retrying");
+ } else {
+ error!(
+ "One-time key upload transaction failed: {:?}",
+ dynamo_db_error
+ );
+ return Err(Error::AwsSdk(dynamo_db_error));
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn get_otk_count(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<usize, Error> {
+ use crate::constants::devices_table;
+
+ let attr_name = match account_type {
+ OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
+ OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
+ };
+
+ let response = self
+ .client
+ .get_item()
+ .table_name(devices_table::NAME)
+ .projection_expression(attr_name)
+ .key(
+ devices_table::ATTR_USER_ID,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .key(
+ devices_table::ATTR_ITEM_ID,
+ DeviceIDAttribute(device_id.into()).into(),
+ )
+ .send()
+ .await
+ .map_err(|e| {
+ error!("Failed to get user's OTK count: {:?}", e);
+ Error::AwsSdk(e.into())
+ })?;
+
+ let mut user_item = response.item.unwrap_or_default();
+ match parse_int_attribute(attr_name, user_item.remove(attr_name)) {
+ Ok(num) => Ok(num),
+ Err(DBItemError {
+ attribute_error: DBItemAttributeError::Missing,
+ ..
+ }) => Ok(0),
+ Err(e) => Err(Error::Attribute(e)),
+ }
+ }
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Nov 25, 3:32 PM (20 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2579926
Default Alt Text
D11674.id39182.diff (21 KB)
Attached To
Mode
D11674: [identity] move otk db stuff to new sub module
Attached
Detach File
Event Timeline
Log In to Comment