Page MenuHomePhabricator

D11674.diff
No OneTemporary

D11674.diff

diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs
--- a/services/identity/src/database.rs
+++ b/services/identity/src/database.rs
@@ -1,48 +1,31 @@
-use comm_lib::aws::ddb::types::Delete;
+use comm_lib::aws::ddb::{
+ operation::{
+ delete_item::DeleteItemOutput, get_item::GetItemOutput,
+ put_item::PutItemOutput, query::QueryOutput,
+ },
+ primitives::Blob,
+ types::{AttributeValue, PutRequest, WriteRequest},
+};
use comm_lib::aws::{AwsConfig, DynamoDBClient};
use comm_lib::database::{
AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError,
TryFromAttribute,
};
-use comm_lib::{
- aws::{
- ddb::{
- operation::{
- delete_item::DeleteItemOutput, get_item::GetItemOutput,
- put_item::PutItemOutput, query::QueryOutput,
- },
- primitives::Blob,
- types::{
- AttributeValue, PutRequest, TransactWriteItem, Update, WriteRequest,
- },
- },
- DynamoDBError,
- },
- database::parse_int_attribute,
-};
use constant_time_eq::constant_time_eq;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
pub use crate::database::device_list::DeviceIDAttribute;
-use crate::ddb_utils::into_one_time_update_requests;
-use crate::olm::is_valid_olm_key;
use crate::{
constants::USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME,
ddb_utils::EthereumIdentity, reserved_users::UserDetail, siwe::SocialProof,
};
use crate::{
- ddb_utils::{
- create_one_time_key_partition_key, into_one_time_put_requests,
- is_transaction_retryable, Identifier, OlmAccountType,
- },
+ ddb_utils::{Identifier, OlmAccountType},
grpc_services::protos,
};
-use crate::{
- error::{consume_error, Error},
- grpc_utils::DeviceKeysInfo,
-};
+use crate::{error::Error, grpc_utils::DeviceKeysInfo};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, warn};
@@ -56,8 +39,7 @@
ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE,
NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE,
NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY,
- ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT, RESERVED_USERNAMES_TABLE,
- RESERVED_USERNAMES_TABLE_PARTITION_KEY,
+ RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY,
RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE, USERS_TABLE,
USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME,
USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY,
@@ -72,6 +54,7 @@
mod device_list;
mod farcaster;
+mod one_time_keys;
mod workflows;
pub use device_list::{DeviceListRow, DeviceListUpdate, DeviceRow};
@@ -428,292 +411,6 @@
Ok(maybe_keyserver_device_id)
}
- /// Will "mint" a single one-time key by attempting to successfully delete a
- /// key
- pub async fn get_one_time_key(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<Option<String>, Error> {
- use crate::constants::devices_table;
- use crate::constants::one_time_keys_table as otk_table;
- use crate::constants::retry;
- use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD;
-
- let attr_otk_count = match account_type {
- OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
- OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
- };
-
- fn spawn_refresh_keys_task(device_id: &str) {
- // Clone the string slice to move into the async block
- let device_id = device_id.to_string();
- tokio::spawn(async move {
- debug!("Attempting to request more keys for device: {}", &device_id);
- let result =
- crate::tunnelbroker::send_refresh_keys_request(&device_id).await;
- consume_error(result);
- });
- }
-
- // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
- // in `comm-lib` to handle transactions with retries
- let mut attempt = 0;
-
- loop {
- attempt += 1;
- if attempt > retry::MAX_ATTEMPTS {
- return Err(Error::MaxRetriesExceeded);
- }
-
- let otk_count =
- self.get_otk_count(user_id, device_id, account_type).await?;
- if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD {
- spawn_refresh_keys_task(device_id);
- }
- if otk_count < 1 {
- return Ok(None);
- }
-
- let query_result = self
- .get_one_time_keys(user_id, device_id, account_type)
- .await?;
- let mut items = query_result.items.unwrap_or_default();
- let mut item = items.pop().unwrap_or_default();
- let pk = item.take_attr(otk_table::PARTITION_KEY)?;
- let sk = item.take_attr(otk_table::SORT_KEY)?;
- let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?;
-
- let delete_otk = Delete::builder()
- .table_name(otk_table::NAME)
- .key(otk_table::PARTITION_KEY, AttributeValue::S(pk))
- .key(otk_table::SORT_KEY, AttributeValue::S(sk))
- .build();
-
- let delete_otk_operation =
- TransactWriteItem::builder().delete(delete_otk).build();
-
- let update_otk_count = Update::builder()
- .table_name(devices_table::NAME)
- .key(
- devices_table::ATTR_USER_ID,
- AttributeValue::S(user_id.to_string()),
- )
- .key(
- devices_table::ATTR_ITEM_ID,
- DeviceIDAttribute(device_id.into()).into(),
- )
- .update_expression(format!("ADD {} :decrement_val", attr_otk_count))
- .expression_attribute_values(
- ":decrement_val",
- AttributeValue::N("-1".to_string()),
- )
- .condition_expression(format!("{} = :old_val", attr_otk_count))
- .expression_attribute_values(
- ":old_val",
- AttributeValue::N(otk_count.to_string()),
- )
- .build();
-
- let update_otk_count_operation = TransactWriteItem::builder()
- .update(update_otk_count)
- .build();
-
- let transaction = self
- .client
- .transact_write_items()
- .set_transact_items(Some(vec![
- delete_otk_operation,
- update_otk_count_operation,
- ]))
- .send()
- .await;
-
- match transaction {
- Ok(_) => return Ok(Some(otk)),
- Err(e) => {
- let dynamo_db_error = DynamoDBError::from(e);
- let retryable_codes = HashSet::from([
- retry::CONDITIONAL_CHECK_FAILED,
- retry::TRANSACTION_CONFLICT,
- ]);
- if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
- info!("Encountered transaction conflict while retrieving one-time key - retrying");
- } else {
- error!(
- "One-time key retrieval transaction failed: {:?}",
- dynamo_db_error
- );
- return Err(Error::AwsSdk(dynamo_db_error));
- }
- }
- }
- }
- }
-
- pub async fn get_one_time_keys(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<QueryOutput, Error> {
- use crate::constants::one_time_keys_table::*;
-
- let partition_key =
- create_one_time_key_partition_key(user_id, device_id, account_type);
-
- self
- .client
- .query()
- .table_name(NAME)
- .key_condition_expression("#pk = :pk")
- .expression_attribute_names("#pk", PARTITION_KEY)
- .expression_attribute_values(":pk", AttributeValue::S(partition_key))
- .limit(1)
- .send()
- .await
- .map_err(|e| Error::AwsSdk(e.into()))
- }
-
- pub async fn append_one_time_prekeys(
- &self,
- user_id: &str,
- device_id: &str,
- content_one_time_keys: &Vec<String>,
- notif_one_time_keys: &Vec<String>,
- ) -> Result<(), Error> {
- use crate::constants::retry;
-
- let num_content_keys = content_one_time_keys.len();
- let num_notif_keys = notif_one_time_keys.len();
-
- if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
- || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
- {
- return Err(Error::OneTimeKeyUploadLimitExceeded);
- }
-
- if content_one_time_keys
- .iter()
- .any(|otk| !is_valid_olm_key(otk))
- || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk))
- {
- debug!("Invalid one-time key format");
- return Err(Error::InvalidFormat);
- }
-
- let current_time = chrono::Utc::now();
-
- let content_otk_requests = into_one_time_put_requests(
- user_id,
- device_id,
- content_one_time_keys,
- OlmAccountType::Content,
- current_time,
- );
- let notif_otk_requests = into_one_time_put_requests(
- user_id,
- device_id,
- notif_one_time_keys,
- OlmAccountType::Notification,
- current_time,
- );
-
- let update_otk_count_operation = into_one_time_update_requests(
- user_id,
- device_id,
- num_content_keys,
- num_notif_keys,
- );
-
- let mut operations = Vec::new();
- operations.extend_from_slice(&content_otk_requests);
- operations.extend_from_slice(&notif_otk_requests);
- operations.push(update_otk_count_operation);
-
- // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
- // in `comm-lib` to handle transactions with retries
- let mut attempt = 0;
-
- loop {
- attempt += 1;
- if attempt > retry::MAX_ATTEMPTS {
- return Err(Error::MaxRetriesExceeded);
- }
-
- let transaction = self
- .client
- .transact_write_items()
- .set_transact_items(Some(operations.clone()))
- .send()
- .await;
-
- match transaction {
- Ok(_) => break,
- Err(e) => {
- let dynamo_db_error = DynamoDBError::from(e);
- let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]);
- if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
- info!("Encountered transaction conflict while uploading one-time keys - retrying");
- } else {
- error!(
- "One-time key upload transaction failed: {:?}",
- dynamo_db_error
- );
- return Err(Error::AwsSdk(dynamo_db_error));
- }
- }
- }
- }
-
- Ok(())
- }
-
- async fn get_otk_count(
- &self,
- user_id: &str,
- device_id: &str,
- account_type: OlmAccountType,
- ) -> Result<usize, Error> {
- use crate::constants::devices_table;
-
- let attr_name = match account_type {
- OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
- OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
- };
-
- let response = self
- .client
- .get_item()
- .table_name(devices_table::NAME)
- .projection_expression(attr_name)
- .key(
- devices_table::ATTR_USER_ID,
- AttributeValue::S(user_id.to_string()),
- )
- .key(
- devices_table::ATTR_ITEM_ID,
- DeviceIDAttribute(device_id.into()).into(),
- )
- .send()
- .await
- .map_err(|e| {
- error!("Failed to get user's OTK count: {:?}", e);
- Error::AwsSdk(e.into())
- })?;
-
- let mut user_item = response.item.unwrap_or_default();
- match parse_int_attribute(attr_name, user_item.remove(attr_name)) {
- Ok(num) => Ok(num),
- Err(DBItemError {
- attribute_error: DBItemAttributeError::Missing,
- ..
- }) => Ok(0),
- Err(e) => Err(Error::Attribute(e)),
- }
- }
-
pub async fn update_user_password(
&self,
user_id: String,
diff --git a/services/identity/src/database/one_time_keys.rs b/services/identity/src/database/one_time_keys.rs
new file mode 100644
--- /dev/null
+++ b/services/identity/src/database/one_time_keys.rs
@@ -0,0 +1,316 @@
+use std::collections::HashSet;
+
+use comm_lib::{
+ aws::{
+ ddb::{
+ operation::query::QueryOutput,
+ types::{AttributeValue, Delete, TransactWriteItem, Update},
+ },
+ DynamoDBError,
+ },
+ database::{
+ parse_int_attribute, AttributeExtractor, DBItemAttributeError, DBItemError,
+ },
+};
+use tracing::{debug, error, info};
+
+use crate::{
+ constants::ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT,
+ database::DeviceIDAttribute,
+ ddb_utils::{
+ create_one_time_key_partition_key, into_one_time_put_requests,
+ into_one_time_update_requests, is_transaction_retryable, OlmAccountType,
+ },
+ error::{consume_error, Error},
+ olm::is_valid_olm_key,
+};
+
+use super::DatabaseClient;
+
+impl DatabaseClient {
+ /// Will "mint" a single one-time key by attempting to successfully delete a
+ /// key
+ pub(super) async fn get_one_time_key(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<Option<String>, Error> {
+ use crate::constants::devices_table;
+ use crate::constants::one_time_keys_table as otk_table;
+ use crate::constants::retry;
+ use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD;
+
+ let attr_otk_count = match account_type {
+ OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
+ OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
+ };
+
+ fn spawn_refresh_keys_task(device_id: &str) {
+ // Clone the string slice to move into the async block
+ let device_id = device_id.to_string();
+ tokio::spawn(async move {
+ debug!("Attempting to request more keys for device: {}", &device_id);
+ let result =
+ crate::tunnelbroker::send_refresh_keys_request(&device_id).await;
+ consume_error(result);
+ });
+ }
+
+ // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
+ // in `comm-lib` to handle transactions with retries
+ let mut attempt = 0;
+
+ loop {
+ attempt += 1;
+ if attempt > retry::MAX_ATTEMPTS {
+ return Err(Error::MaxRetriesExceeded);
+ }
+
+ let otk_count =
+ self.get_otk_count(user_id, device_id, account_type).await?;
+ if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD {
+ spawn_refresh_keys_task(device_id);
+ }
+ if otk_count < 1 {
+ return Ok(None);
+ }
+
+ let query_result = self
+ .get_one_time_keys(user_id, device_id, account_type)
+ .await?;
+ let mut items = query_result.items.unwrap_or_default();
+ let mut item = items.pop().unwrap_or_default();
+ let pk = item.take_attr(otk_table::PARTITION_KEY)?;
+ let sk = item.take_attr(otk_table::SORT_KEY)?;
+ let otk: String = item.take_attr(otk_table::ATTR_ONE_TIME_KEY)?;
+
+ let delete_otk = Delete::builder()
+ .table_name(otk_table::NAME)
+ .key(otk_table::PARTITION_KEY, AttributeValue::S(pk))
+ .key(otk_table::SORT_KEY, AttributeValue::S(sk))
+ .build();
+
+ let delete_otk_operation =
+ TransactWriteItem::builder().delete(delete_otk).build();
+
+ let update_otk_count = Update::builder()
+ .table_name(devices_table::NAME)
+ .key(
+ devices_table::ATTR_USER_ID,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .key(
+ devices_table::ATTR_ITEM_ID,
+ DeviceIDAttribute(device_id.into()).into(),
+ )
+ .update_expression(format!("ADD {} :decrement_val", attr_otk_count))
+ .expression_attribute_values(
+ ":decrement_val",
+ AttributeValue::N("-1".to_string()),
+ )
+ .condition_expression(format!("{} = :old_val", attr_otk_count))
+ .expression_attribute_values(
+ ":old_val",
+ AttributeValue::N(otk_count.to_string()),
+ )
+ .build();
+
+ let update_otk_count_operation = TransactWriteItem::builder()
+ .update(update_otk_count)
+ .build();
+
+ let transaction = self
+ .client
+ .transact_write_items()
+ .set_transact_items(Some(vec![
+ delete_otk_operation,
+ update_otk_count_operation,
+ ]))
+ .send()
+ .await;
+
+ match transaction {
+ Ok(_) => return Ok(Some(otk)),
+ Err(e) => {
+ let dynamo_db_error = DynamoDBError::from(e);
+ let retryable_codes = HashSet::from([
+ retry::CONDITIONAL_CHECK_FAILED,
+ retry::TRANSACTION_CONFLICT,
+ ]);
+ if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
+ info!("Encountered transaction conflict while retrieving one-time key - retrying");
+ } else {
+ error!(
+ "One-time key retrieval transaction failed: {:?}",
+ dynamo_db_error
+ );
+ return Err(Error::AwsSdk(dynamo_db_error));
+ }
+ }
+ }
+ }
+ }
+
+ async fn get_one_time_keys(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<QueryOutput, Error> {
+ use crate::constants::one_time_keys_table::*;
+
+ let partition_key =
+ create_one_time_key_partition_key(user_id, device_id, account_type);
+
+ self
+ .client
+ .query()
+ .table_name(NAME)
+ .key_condition_expression("#pk = :pk")
+ .expression_attribute_names("#pk", PARTITION_KEY)
+ .expression_attribute_values(":pk", AttributeValue::S(partition_key))
+ .limit(1)
+ .send()
+ .await
+ .map_err(|e| Error::AwsSdk(e.into()))
+ }
+
+ pub async fn append_one_time_prekeys(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ content_one_time_keys: &Vec<String>,
+ notif_one_time_keys: &Vec<String>,
+ ) -> Result<(), Error> {
+ use crate::constants::retry;
+
+ let num_content_keys = content_one_time_keys.len();
+ let num_notif_keys = notif_one_time_keys.len();
+
+ if num_content_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
+ || num_notif_keys > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT
+ {
+ return Err(Error::OneTimeKeyUploadLimitExceeded);
+ }
+
+ if content_one_time_keys
+ .iter()
+ .any(|otk| !is_valid_olm_key(otk))
+ || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk))
+ {
+ debug!("Invalid one-time key format");
+ return Err(Error::InvalidFormat);
+ }
+
+ let current_time = chrono::Utc::now();
+
+ let content_otk_requests = into_one_time_put_requests(
+ user_id,
+ device_id,
+ content_one_time_keys,
+ OlmAccountType::Content,
+ current_time,
+ );
+ let notif_otk_requests = into_one_time_put_requests(
+ user_id,
+ device_id,
+ notif_one_time_keys,
+ OlmAccountType::Notification,
+ current_time,
+ );
+
+ let update_otk_count_operation = into_one_time_update_requests(
+ user_id,
+ device_id,
+ num_content_keys,
+ num_notif_keys,
+ );
+
+ let mut operations = Vec::new();
+ operations.extend_from_slice(&content_otk_requests);
+ operations.extend_from_slice(&notif_otk_requests);
+ operations.push(update_otk_count_operation);
+
+ // TODO: Introduce `transact_write_helper` similar to `batch_write_helper`
+ // in `comm-lib` to handle transactions with retries
+ let mut attempt = 0;
+
+ loop {
+ attempt += 1;
+ if attempt > retry::MAX_ATTEMPTS {
+ return Err(Error::MaxRetriesExceeded);
+ }
+
+ let transaction = self
+ .client
+ .transact_write_items()
+ .set_transact_items(Some(operations.clone()))
+ .send()
+ .await;
+
+ match transaction {
+ Ok(_) => break,
+ Err(e) => {
+ let dynamo_db_error = DynamoDBError::from(e);
+ let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]);
+ if is_transaction_retryable(&dynamo_db_error, &retryable_codes) {
+ info!("Encountered transaction conflict while uploading one-time keys - retrying");
+ } else {
+ error!(
+ "One-time key upload transaction failed: {:?}",
+ dynamo_db_error
+ );
+ return Err(Error::AwsSdk(dynamo_db_error));
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn get_otk_count(
+ &self,
+ user_id: &str,
+ device_id: &str,
+ account_type: OlmAccountType,
+ ) -> Result<usize, Error> {
+ use crate::constants::devices_table;
+
+ let attr_name = match account_type {
+ OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT,
+ OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT,
+ };
+
+ let response = self
+ .client
+ .get_item()
+ .table_name(devices_table::NAME)
+ .projection_expression(attr_name)
+ .key(
+ devices_table::ATTR_USER_ID,
+ AttributeValue::S(user_id.to_string()),
+ )
+ .key(
+ devices_table::ATTR_ITEM_ID,
+ DeviceIDAttribute(device_id.into()).into(),
+ )
+ .send()
+ .await
+ .map_err(|e| {
+ error!("Failed to get user's OTK count: {:?}", e);
+ Error::AwsSdk(e.into())
+ })?;
+
+ let mut user_item = response.item.unwrap_or_default();
+ match parse_int_attribute(attr_name, user_item.remove(attr_name)) {
+ Ok(num) => Ok(num),
+ Err(DBItemError {
+ attribute_error: DBItemAttributeError::Missing,
+ ..
+ }) => Ok(0),
+ Err(e) => Err(Error::Attribute(e)),
+ }
+ }
+}

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 25, 1:31 PM (21 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2579926
Default Alt Text
D11674.diff (21 KB)

Event Timeline