diff --git a/keyserver/addons/rust-node-addon/src/identity_client/get_inbound_keys_for_user.rs b/keyserver/addons/rust-node-addon/src/identity_client/get_inbound_keys_for_user.rs index 6dc0d05b9..16a0b91fc 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client/get_inbound_keys_for_user.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client/get_inbound_keys_for_user.rs @@ -1,38 +1,66 @@ -use grpc_clients::identity::protos::authenticated::InboundKeysForUserRequest; +use grpc_clients::identity::protos::authenticated::{ + identity::IdentityInfo, EthereumIdentity, Identity, InboundKeysForUserRequest, +}; use super::*; #[napi] #[instrument(skip_all)] pub async fn get_inbound_keys_for_user_device( auth_user_id: String, auth_device_id: String, auth_access_token: String, user_id: String, device_id: String, ) -> Result { // Set up the gRPC client that will be used to talk to the Identity service let mut identity_client = get_authenticated_identity_client( auth_user_id, auth_device_id, auth_access_token, ) .await?; let inbound_keys_for_user_request = InboundKeysForUserRequest { user_id }; let mut response = identity_client .get_inbound_keys_for_user(inbound_keys_for_user_request) .await .map_err(|e| Error::new(Status::GenericFailure, e.to_string()))? .into_inner(); - let inbound_key_info = InboundKeyInfoResponse::try_from( + let device_inbound_key_info = DeviceInboundKeyInfo::try_from( response .devices .remove(&device_id) .ok_or(Error::from_status(Status::GenericFailure))?, )?; - Ok(inbound_key_info) + let (username, wallet_address) = match response.identity { + Some(Identity { + identity_info: Some(IdentityInfo::Username(u)), + }) => (Some(u), None), + Some(Identity { + identity_info: + Some(IdentityInfo::EthIdentity(EthereumIdentity { + wallet_address: w, + .. // We ignore the social proof for now + })), + }) => (None, Some(w)), + _ => (None, None), + }; + + let inbound_key_info_response = InboundKeyInfoResponse { + payload: device_inbound_key_info.payload, + payload_signature: device_inbound_key_info.payload_signature, + social_proof: device_inbound_key_info.social_proof, + content_prekey: device_inbound_key_info.content_prekey, + content_prekey_signature: device_inbound_key_info.content_prekey_signature, + notif_prekey: device_inbound_key_info.notif_prekey, + notif_prekey_signature: device_inbound_key_info.notif_prekey_signature, + username, + wallet_address, + }; + + Ok(inbound_key_info_response) } diff --git a/keyserver/addons/rust-node-addon/src/identity_client/mod.rs b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs index 80cc9845a..bf164fb5b 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client/mod.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs @@ -1,211 +1,223 @@ pub mod add_reserved_usernames; pub mod config; pub mod get_inbound_keys_for_user; pub mod login; pub mod prekey; pub mod register_user; pub mod remove_reserved_usernames; pub mod upload_one_time_keys; use client_proto::identity_client_service_client::IdentityClientServiceClient; use client_proto::{ AddReservedUsernamesRequest, DeviceKeyUpload, DeviceType, IdentityKeyInfo, Prekey, RegistrationFinishRequest, RegistrationStartRequest, RemoveReservedUsernameRequest, }; use config::get_identity_service_config; use generated::CODE_VERSION; use grpc_clients::identity::authenticated::ChainedInterceptedAuthClient; use grpc_clients::identity::protos::authenticated::{ InboundKeyInfo, UploadOneTimeKeysRequest, }; use grpc_clients::identity::protos::unauthenticated as client_proto; use grpc_clients::identity::shared::CodeVersionLayer; use grpc_clients::identity::{ REQUEST_METADATA_COOKIE_KEY, RESPONSE_METADATA_COOKIE_KEY, }; use lazy_static::lazy_static; use napi::bindgen_prelude::*; use serde::{Deserialize, Serialize}; use std::env::var; use tonic::codegen::InterceptedService; use tonic::{transport::Channel, Request}; use tracing::{self, info, instrument, warn, Level}; use tracing_subscriber::EnvFilter; mod generated { // We get the CODE_VERSION from this generated file include!(concat!(env!("OUT_DIR"), "/version.rs")); } const DEVICE_TYPE: &str = "keyserver"; const ENV_NODE_ENV: &str = "NODE_ENV"; const ENV_DEVELOPMENT: &str = "development"; lazy_static! { static ref IDENTITY_SERVICE_CONFIG: IdentityServiceConfig = { let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(EnvFilter::DEFAULT_ENV) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber) .expect("Unable to configure tracing"); get_identity_service_config() .unwrap_or_else(|_| IdentityServiceConfig::default()) }; } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct IdentityServiceConfig { identity_socket_addr: String, } impl Default for IdentityServiceConfig { fn default() -> Self { info!("Using default identity configuration based on NODE_ENV"); const DEV_SOCKET_ADDR: &str = "https://identity.staging.commtechnologies.org:50054"; const PROD_SOCKET_ADDR: &str = "https://identity.commtechnologies.org:50054"; let default_socket_addr = match var(ENV_NODE_ENV) { Ok(val) if val == ENV_DEVELOPMENT => DEV_SOCKET_ADDR, _ => PROD_SOCKET_ADDR, } .to_string(); Self { identity_socket_addr: default_socket_addr, } } } async fn get_identity_client() -> Result< IdentityClientServiceClient>, > { info!("Connecting to identity service"); grpc_clients::identity::get_unauthenticated_client( &IDENTITY_SERVICE_CONFIG.identity_socket_addr, CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .map_err(|_| { Error::new( Status::GenericFailure, "Unable to connect to identity service".to_string(), ) }) } async fn get_authenticated_identity_client( user_id: String, device_id: String, access_token: String, ) -> Result { info!("Connecting to identity service"); grpc_clients::identity::get_auth_client( &IDENTITY_SERVICE_CONFIG.identity_socket_addr, user_id, device_id, access_token, CODE_VERSION, DEVICE_TYPE.to_string(), ) .await .map_err(|_| { Error::new( Status::GenericFailure, "Unable to connect to identity service".to_string(), ) }) } #[napi(object)] pub struct SignedIdentityKeysBlob { pub payload: String, pub signature: String, } #[napi(object)] pub struct UserLoginInfo { pub user_id: String, pub access_token: String, } -#[napi(object)] -pub struct InboundKeyInfoResponse { +pub struct DeviceInboundKeyInfo { pub payload: String, pub payload_signature: String, pub social_proof: Option, pub content_prekey: String, pub content_prekey_signature: String, pub notif_prekey: String, pub notif_prekey_signature: String, } -impl TryFrom for InboundKeyInfoResponse { +impl TryFrom for DeviceInboundKeyInfo { type Error = Error; fn try_from(key_info: InboundKeyInfo) -> Result { let identity_info = key_info .identity_info .ok_or(Error::from_status(Status::GenericFailure))?; let IdentityKeyInfo { payload, payload_signature, social_proof, } = identity_info; let content_prekey = key_info .content_prekey .ok_or(Error::from_status(Status::GenericFailure))?; let Prekey { prekey: content_prekey_value, prekey_signature: content_prekey_signature, } = content_prekey; let notif_prekey = key_info .notif_prekey .ok_or(Error::from_status(Status::GenericFailure))?; let Prekey { prekey: notif_prekey_value, prekey_signature: notif_prekey_signature, } = notif_prekey; Ok(Self { payload, payload_signature, social_proof, content_prekey: content_prekey_value, content_prekey_signature, notif_prekey: notif_prekey_value, notif_prekey_signature, }) } } +#[napi(object)] +pub struct InboundKeyInfoResponse { + pub payload: String, + pub payload_signature: String, + pub social_proof: Option, + pub content_prekey: String, + pub content_prekey_signature: String, + pub notif_prekey: String, + pub notif_prekey_signature: String, + pub username: Option, + pub wallet_address: Option, +} + pub fn handle_grpc_error(error: tonic::Status) -> napi::Error { warn!("Received error: {}", error.message()); Error::new(Status::GenericFailure, error.message()) } #[cfg(test)] mod tests { use super::CODE_VERSION; #[test] fn test_code_version_exists() { assert!(CODE_VERSION > 0); } } diff --git a/lib/types/identity-service-types.js b/lib/types/identity-service-types.js index 1fe2a3162..2631c0197 100644 --- a/lib/types/identity-service-types.js +++ b/lib/types/identity-service-types.js @@ -1,46 +1,48 @@ // @flow export type UserLoginResponse = { +userId: string, +accessToken: string, }; // This type should not be altered without also updating // OutboundKeyInfoResponse in native/native_rust_library/src/lib.rs export type OutboundKeyInfoResponse = { +payload: string, +payloadSignature: string, +socialProof: ?string, +contentPrekey: string, +contentPrekeySignature: string, +notifPrekey: string, +notifPrekeySignature: string, +oneTimeContentPrekey: ?string, +oneTimeNotifPrekey: ?string, }; export interface IdentityServiceClient { +deleteUser: ( userID: string, deviceID: string, accessToken: string, ) => Promise; } export type IdentityServiceAuthLayer = { +userID: string, +deviceID: string, +commServicesAccessToken: string, }; // This type should not be altered without also updating // InboundKeyInfoResponse in native/native_rust_library/src/lib.rs export type InboundKeyInfoResponse = { +payload: string, +payloadSignature: string, +socialProof?: ?string, +contentPrekey: string, +contentPrekeySignature: string, +notifPrekey: string, +notifPrekeySignature: string, + +username?: ?string, + +walletAddress?: ?string, }; diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index d3cba44f0..2f05dc49a 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1541 +1,1556 @@ use comm_lib::aws::ddb::{ operation::{ delete_item::DeleteItemOutput, get_item::GetItemOutput, put_item::PutItemOutput, query::QueryOutput, }, primitives::Blob, types::{AttributeValue, PutRequest, ReturnConsumedCapacity, WriteRequest}, }; use comm_lib::aws::{AwsConfig, DynamoDBClient}; use comm_lib::database::{ AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, TryFromAttribute, }; use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use crate::ddb_utils::{ - create_one_time_key_partition_key, into_one_time_put_requests, OlmAccountType, + create_one_time_key_partition_key, into_one_time_put_requests, + EthereumIdentity, Identifier, OlmAccountType, }; use crate::error::{consume_error, Error}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, CONTENT_ONE_TIME_KEY, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, NOTIF_ONE_TIME_KEY, RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, USERS_TABLE, USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_CONTENT_ONE_TIME_KEYS_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_ONE_TIME_KEYS_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; use crate::error::{AttributeValueFromHashMap, FromAttributeValue}; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::{AccessTokenData, AuthType}; pub use grpc_clients::identity::DeviceType; mod device_list; pub use device_list::DeviceListRow; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } pub struct DBDeviceTypeInt(pub i32); impl TryFrom for DeviceType { type Error = crate::error::Error; fn try_from(value: DBDeviceTypeInt) -> Result { let device_result = DeviceType::try_from(value.0); device_result.map_err(|_| { Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), attribute_value: Some(AttributeValue::N(value.0.to_string())).into(), attribute_error: DBItemAttributeError::InvalidValue, }) }) } } // This is very similar to the protobuf definitions, however, // coupling the protobuf schema to the database API should be avoided. pub struct PreKey { pub prekey: String, pub prekey_signature: String, } pub struct OutboundKeys { pub key_payload: String, pub key_payload_signature: String, pub social_proof: Option, pub content_prekey: PreKey, pub notif_prekey: PreKey, pub content_one_time_key: Option, pub notif_one_time_key: Option, } #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &AwsConfig) -> Self { let client = match &CONFIG.localstack_endpoint { Some(endpoint) => { info!( "Configuring DynamoDB client to use LocalStack endpoint: {}", endpoint ); let ddb_config_builder = comm_lib::aws::ddb::config::Builder::from(aws_config) .endpoint_url(endpoint); DynamoDBClient::from_conf(ddb_config_builder.build()) } None => DynamoDBClient::new(aws_config), }; DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, ) -> Result { self .add_user_to_users_table( registration_state.flattened_device_key_upload, Some((registration_state.username, Blob::new(password_file))), None, None, registration_state.user_id, ) .await } pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: String, user_id: Option, ) -> Result { self .add_user_to_users_table( flattened_device_key_upload, None, Some(wallet_address), Some(social_proof), user_id, ) .await } async fn add_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, username_and_password_file: Option<(String, Blob)>, wallet_address: Option, social_proof: Option, user_id: Option, ) -> Result { let user_id = user_id.unwrap_or_else(generate_uuid); let device_info = create_device_info(flattened_device_key_upload.clone(), social_proof); let devices = HashMap::from([( flattened_device_key_upload.device_id_key.clone(), AttributeValue::M(device_info), )]); let mut user = HashMap::from([ ( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), ), ( USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), AttributeValue::M(devices), ), ]); if let Some((username, password_file)) = username_and_password_file { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); } if let Some(address) = wallet_address { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(address), ); } self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(user)) // make sure we don't accidentaly overwrite existing row .condition_expression("attribute_not_exists(#pk)") .expression_attribute_names("#pk", USERS_TABLE_PARTITION_KEY) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( flattened_device_key_upload.device_id_key, flattened_device_key_upload.content_one_time_keys, flattened_device_key_upload.notif_one_time_keys, ) .await?; Ok(user_id) } pub async fn add_password_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, ) -> Result<(), Error> { self .add_device_to_users_table(user_id, flattened_device_key_upload, None) .await } pub async fn add_wallet_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: String, ) -> Result<(), Error> { self .add_device_to_users_table( user_id, flattened_device_key_upload, Some(social_proof), ) .await } pub async fn get_keyserver_keys_for_user( &self, user_id: &str, ) -> Result, Error> { // DynamoDB doesn't have a way to "pop" a value from a list, so we must // first read in user info, then update one_time_keys with value we // gave to requester let user_info = self .get_item_from_users_table(user_id) .await? .item .ok_or(Error::MissingItem)?; let devices = user_info .get(USERS_TABLE_DEVICES_ATTRIBUTE) .ok_or(Error::MissingItem)? .to_hashmap(USERS_TABLE_DEVICES_ATTRIBUTE)?; let mut maybe_keyserver_id = None; for (device_id, device_info) in devices { let device_type = device_info .to_hashmap("device_id")? .get(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME) .ok_or(Error::MissingItem)? .to_string(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME)?; if device_type == "keyserver" { maybe_keyserver_id = Some(device_id); break; } } // Assert that the user has a keyserver, if they don't return None let keyserver_id = match maybe_keyserver_id { None => return Ok(None), Some(id) => id, }; let keyserver = devices.get_map(keyserver_id)?; let notif_one_time_key: Option = self .get_one_time_key(keyserver_id, OlmAccountType::Notification) .await?; let content_one_time_key: Option = self .get_one_time_key(keyserver_id, OlmAccountType::Content) .await?; debug!( "Able to get notif one-time key for keyserver {}: {}", keyserver_id, notif_one_time_key.is_some() ); debug!( "Able to get content one-time key for keyserver {}: {}", keyserver_id, content_one_time_key.is_some() ); let content_prekey = keyserver .get_string(USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME)?; let content_prekey_signature = keyserver.get_string( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?; let notif_prekey = keyserver .get_string(USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME)?; let notif_prekey_signature = keyserver.get_string( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?; let key_payload = keyserver .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME)? .to_string(); let key_payload_signature = keyserver .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME)? .to_string(); let social_proof = keyserver .get(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) .and_then(|s| { s.to_string(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) .ok() }) .map(|s| s.to_owned()); let full_content_prekey = PreKey { prekey: content_prekey.to_string(), prekey_signature: content_prekey_signature.to_string(), }; let full_notif_prekey = PreKey { prekey: notif_prekey.to_string(), prekey_signature: notif_prekey_signature.to_string(), }; let outbound_payload = OutboundKeys { key_payload, key_payload_signature, social_proof, content_prekey: full_content_prekey, notif_prekey: full_notif_prekey, content_one_time_key, notif_one_time_key, }; Ok(Some(outbound_payload)) } /// Will "mint" a single one-time key by attempting to successfully delete a /// key pub async fn get_one_time_key( &self, device_id: &str, account_type: OlmAccountType, ) -> Result, Error> { use crate::constants::one_time_keys_table as otk_table; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; let query_result = self.get_one_time_keys(device_id, account_type).await?; let items = query_result.items(); fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block let device_id = device_id.to_string(); tokio::spawn(async move { debug!("Attempting to request more keys for device: {}", &device_id); let result = crate::tunnelbroker::send_refresh_keys_request(&device_id).await; consume_error(result); }); } // If no one-time keys exist, or if there aren't enough, request more. // Additionally, if no one-time keys exist, return early. let item_vec = if let Some(items_list) = items { if items_list.len() < ONE_TIME_KEY_MINIMUM_THRESHOLD { spawn_refresh_keys_task(device_id); } items_list } else { debug!("Unable to find {:?} one-time key", account_type); spawn_refresh_keys_task(device_id); return Ok(None); }; let mut result = None; // Attempt to delete the one-time keys individually, a successful delete // mints the one-time key to the requester for item in item_vec { let pk = item.get_string(otk_table::PARTITION_KEY)?; let otk = item.get_string(otk_table::SORT_KEY)?; let composite_key = HashMap::from([ ( otk_table::PARTITION_KEY.to_string(), AttributeValue::S(pk.to_string()), ), ( otk_table::SORT_KEY.to_string(), AttributeValue::S(otk.to_string()), ), ]); debug!("Attempting to delete a {:?} one time key", account_type); match self .client .delete_item() .set_key(Some(composite_key)) .table_name(otk_table::NAME) .send() .await { Ok(_) => { result = Some(otk.to_string()); break; } // This err should only happen if a delete occurred between the read // above and this delete Err(e) => { debug!("Unable to delete key: {:?}", e); continue; } } } // Return deleted key Ok(result) } pub async fn get_one_time_keys( &self, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::one_time_keys_table::*; // Add related prefix to partition key to grab the correct result set let partition_key = create_one_time_key_partition_key(device_id, account_type); self .client .query() .table_name(NAME) .key_condition_expression(format!("{} = :pk", PARTITION_KEY)) .expression_attribute_values(":pk", AttributeValue::S(partition_key)) .return_consumed_capacity(ReturnConsumedCapacity::Total) .send() .await .map_err(|e| Error::AwsSdk(e.into())) .map(|response| { let capacity_units = response .consumed_capacity() .and_then(|it| it.capacity_units()); debug!("OTK read consumed capacity: {:?}", capacity_units); response }) } pub async fn set_prekey( &self, user_id: String, device_id: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, ) -> Result<(), Error> { let notif_prekey_av = AttributeValue::S(notif_prekey); let notif_prekey_signature_av = AttributeValue::S(notif_prekey_signature); let content_prekey_av = AttributeValue::S(content_prekey); let content_prekey_signature_av = AttributeValue::S(content_prekey_signature); let update_expression = format!("SET {0}.#{1}.{2} = :n, {0}.#{1}.{3} = :p, {0}.#{1}.{4} = :c, {0}.#{1}.{5} = :d", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID", USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, ); let expression_attribute_names = HashMap::from([ (format!("#{}", "deviceID"), device_id), ( "#user_id".to_string(), USERS_TABLE_PARTITION_KEY.to_string(), ), ]); let expression_attribute_values = HashMap::from([ (":n".to_string(), notif_prekey_av), (":p".to_string(), notif_prekey_signature_av), (":c".to_string(), content_prekey_av), (":d".to_string(), content_prekey_signature_av), ]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .condition_expression("attribute_exists(#user_id)") .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn append_one_time_prekeys( &self, device_id: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, ) -> Result<(), Error> { use crate::constants::one_time_keys_table; let mut otk_requests = into_one_time_put_requests( &device_id, content_one_time_keys, OlmAccountType::Content, ); let notif_otk_requests: Vec = into_one_time_put_requests( &device_id, notif_one_time_keys, OlmAccountType::Notification, ); otk_requests.extend(notif_otk_requests); // BatchWriteItem has a hard limit of 25 writes per call for requests in otk_requests.chunks(25) { self .client .batch_write_item() .request_items(one_time_keys_table::NAME, requests.to_vec()) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } Ok(()) } async fn add_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> Result<(), Error> { // Avoid borrowing from lifetime of flattened_device_key_upload let device_id = flattened_device_key_upload.device_id_key.clone(); let content_one_time_keys = flattened_device_key_upload.content_one_time_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_one_time_keys.clone(); let device_info = create_device_info(flattened_device_key_upload, social_proof); let update_expression = format!("SET {}.#{} = :v", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID",); let expression_attribute_names = HashMap::from([(format!("#{}", "deviceID"), device_id.clone())]); let expression_attribute_values = HashMap::from([(":v".to_string(), AttributeValue::M(device_info))]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( device_id, content_one_time_keys, notif_one_time_keys, ) .await?; Ok(()) } pub async fn remove_device_from_users_table( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { let update_expression = format!("REMOVE {}.{}", USERS_TABLE_DEVICES_ATTRIBUTE, "#deviceID"); let expression_attribute_names = HashMap::from([("#deviceID".to_string(), device_id_key)]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_names(Some(expression_attribute_names)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn delete_user( &self, user_id: String, ) -> Result { debug!(user_id, "Attempting to delete user's devices"); self.delete_devices_table_rows_for_user(&user_id).await?; debug!(user_id, "Attempting to delete user"); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete user {}", user_id); Err(Error::AwsSdk(e.into())) } } } pub async fn get_access_token_data( &self, user_id: String, signing_public_key: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), signing_public_key.clone(), ), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = DateTime::::try_from_attr( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, signing_public_key, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and signing public key {} in token table", user_id, signing_public_key ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn verify_access_token( &self, user_id: String, signing_public_key: String, access_token_to_verify: String, ) -> Result { let is_valid = self .get_access_token_data(user_id, signing_public_key) .await? .map(|access_token_data| { constant_time_eq( access_token_data.access_token.as_bytes(), access_token_to_verify.as_bytes(), ) && access_token_data.is_valid() }) .unwrap_or(false); Ok(is_valid) } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.signing_public_key), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn delete_access_token_data( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { self .client .delete_item() .table_name(ACCESS_TOKEN_TABLE) .key( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ) .key( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(device_id_key), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn wallet_address_taken( &self, wallet_address: String, ) -> Result { let result = self .get_user_id_from_user_info(wallet_address, &AuthType::Wallet) .await?; Ok(result.is_some()) } pub async fn username_taken(&self, username: String) -> Result { let result = self .get_user_id_from_user_info(username, &AuthType::Password) .await?; Ok(result.is_some()) } pub async fn filter_out_taken_usernames( &self, usernames: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; let db_usernames_set: HashSet = db_usernames.into_iter().collect(); let usernames_set: HashSet = usernames.into_iter().collect(); let available_usernames: Vec = usernames_set .difference(&db_usernames_set) .cloned() .collect(); Ok(available_usernames) } async fn get_user_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result>, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: first_item .get(USERS_TABLE_PARTITION_KEY) .cloned() .into(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_keys_for_user_id( &self, user_id: &str, get_one_time_keys: bool, ) -> Result, Error> { - let Some(user) = - self.get_item_from_users_table(user_id).await?.item - else { + let Some(user) = self.get_item_from_users_table(user_id).await?.item else { return Ok(None); }; self.get_keys_for_user(user, get_one_time_keys).await } async fn get_keys_for_user( &self, mut user: AttributeMap, get_one_time_keys: bool, ) -> Result, Error> { let devices: AttributeMap = user.take_attr(USERS_TABLE_DEVICES_ATTRIBUTE)?; let mut devices_response = HashMap::with_capacity(devices.len()); for (device_id_key, device_info) in devices { let device_info_map = AttributeMap::try_from_attr(&device_id_key, Some(device_info))?; let mut device_info_string_map = HashMap::new(); for (attribute_name, attribute_value) in device_info_map { // Excluding one-time keys since we're moving them to a separate table if attribute_name == USERS_TABLE_DEVICES_MAP_NOTIF_ONE_TIME_KEYS_ATTRIBUTE_NAME || attribute_name == USERS_TABLE_DEVICES_MAP_CONTENT_ONE_TIME_KEYS_ATTRIBUTE_NAME { continue; } let attribute_value_str = String::try_from_attr(&attribute_name, Some(attribute_value))?; device_info_string_map.insert(attribute_name, attribute_value_str); } if get_one_time_keys { if let Some(notif_one_time_key) = self .get_one_time_key(&device_id_key, OlmAccountType::Notification) .await? { device_info_string_map .insert(NOTIF_ONE_TIME_KEY.to_string(), notif_one_time_key); } if let Some(content_one_time_key) = self .get_one_time_key(&device_id_key, OlmAccountType::Content) .await? { device_info_string_map .insert(CONTENT_ONE_TIME_KEY.to_string(), content_one_time_key); } } devices_response.insert(device_id_key, device_info_string_map); } Ok(Some(devices_response)) } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: &AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => user .take_attr(USERS_TABLE_PARTITION_KEY) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } pub async fn get_user_id_and_password_file_from_username( &self, username: &str, ) -> Result)>, Error> { match self .get_user_from_user_info(username.to_string(), &AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = user.take_attr(USERS_TABLE_PARTITION_KEY)?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((user_id, password_file))) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } + pub async fn get_user_identifier( + &self, + user_id: &str, + ) -> Result { + let user_info = self + .get_item_from_users_table(user_id) + .await? + .item + .ok_or(Error::MissingItem)?; + + Identifier::try_from(user_info).map_err(|e| { + error!(user_id, "Database item is missing an identifier"); + e + }) + } + async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let Ok(username) = attribute.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE) { result.push(username); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = item.take_attr(NONCE_TABLE_PARTITION_KEY)?; let created = DateTime::::try_from_attr( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(NONCE_TABLE_CREATED_ATTRIBUTE), )?; let expiration_time = DateTime::::try_from_attr( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, usernames: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for usernames_chunk in usernames.chunks(25) { let write_requests = usernames_chunk .iter() .map(|username| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.to_string()), ) .build(); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn username_in_reserved_usernames_table( &self, username: &str, ) -> Result { match self .client .get_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(username.to_string()), ) .consistent_read(true) .send() .await { Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), Ok(_) => Ok(false), Err(e) => Err(Error::AwsSdk(e.into())), } } } type AttributeName = String; pub type DeviceKeys = HashMap; type Devices = HashMap; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute.into(), DBItemAttributeError::Missing, )), } } #[deprecated(note = "Use `comm_lib` counterpart instead")] #[allow(dead_code)] fn parse_map_attribute( attribute_name: &str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => { error!( attribute = attribute_name, value = ?attribute_value, error_type = "IncorrectType", "Unexpected attribute type when parsing map attribute" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::IncorrectType, )) } None => { error!( attribute = attribute_name, error_type = "Missing", "Attribute is missing" ); Err(DBItemError::new( attribute_name.to_string(), attribute_value.into(), DBItemAttributeError::Missing, )) } } } fn create_device_info( flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> AttributeMap { let mut device_info = HashMap::from([ ( USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.device_type.to_string()), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload_signature), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME .to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey_signature), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey_signature), ), ]); if let Some(social_proof) = social_proof { device_info.insert( USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), AttributeValue::S(social_proof), ); } device_info } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } #[test] fn test_int_to_device_type() { let valid_result = DeviceType::try_from(3); assert!(valid_result.is_ok()); assert_eq!(valid_result.unwrap(), DeviceType::Android); let invalid_result = DeviceType::try_from(6); assert!(invalid_result.is_err()); } } diff --git a/services/identity/src/ddb_utils.rs b/services/identity/src/ddb_utils.rs index 50504c777..f09143d75 100644 --- a/services/identity/src/ddb_utils.rs +++ b/services/identity/src/ddb_utils.rs @@ -1,95 +1,138 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use comm_lib::{ aws::ddb::types::{AttributeValue, PutRequest, WriteRequest}, - database::Value, + database::{AttributeExtractor, AttributeMap, Value}, }; use std::collections::HashMap; use std::iter::IntoIterator; use comm_lib::database::{DBItemAttributeError, DBItemError}; +use crate::constants::{ + USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, + USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, +}; + #[derive(Copy, Clone, Debug)] pub enum OlmAccountType { Content, Notification, } // Prefix the one time keys with the olm account variant. This allows for a single // DDB table to contain both notification and content keys for a device. pub fn create_one_time_key_partition_key( device_id: &str, account_type: OlmAccountType, ) -> String { match account_type { OlmAccountType::Content => format!("content_{device_id}"), OlmAccountType::Notification => format!("notification_{device_id}"), } } fn create_one_time_key_put_request( device_id: &str, one_time_key: String, account_type: OlmAccountType, ) -> WriteRequest { use crate::constants::one_time_keys_table::*; let partition_key = create_one_time_key_partition_key(device_id, account_type); let builder = PutRequest::builder(); let attrs = HashMap::from([ (PARTITION_KEY.to_string(), AttributeValue::S(partition_key)), (SORT_KEY.to_string(), AttributeValue::S(one_time_key)), ]); let put_request = builder.set_item(Some(attrs)).build(); WriteRequest::builder().put_request(put_request).build() } pub fn into_one_time_put_requests( device_id: &str, one_time_keys: T, account_type: OlmAccountType, ) -> Vec where T: IntoIterator, ::Item: ToString, { one_time_keys .into_iter() .map(|otk| { create_one_time_key_put_request(device_id, otk.to_string(), account_type) }) .collect() } pub trait AttributesOptionExt { fn ok_or_missing( self, attr_name: impl Into, ) -> Result; } impl AttributesOptionExt for Option { fn ok_or_missing( self, attr_name: impl Into, ) -> Result { self.ok_or_else(|| DBItemError { attribute_name: attr_name.into(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, }) } } pub trait DateTimeExt { fn from_utc_timestamp_millis(timestamp: i64) -> Option>; } impl DateTimeExt for DateTime { fn from_utc_timestamp_millis(timestamp: i64) -> Option { let naive = NaiveDateTime::from_timestamp_millis(timestamp)?; Some(Self::from_utc(naive, Utc)) } } + +pub enum Identifier { + Username(String), + WalletAddress(EthereumIdentity), +} + +pub struct EthereumIdentity { + pub wallet_address: String, + pub social_proof: String, +} + +impl TryFrom for Identifier { + type Error = crate::error::Error; + + fn try_from(mut value: AttributeMap) -> Result { + let username_result = value.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE); + + if let Ok(username) = username_result { + return Ok(Identifier::Username(username)); + } + + let wallet_address_result = + value.take_attr(USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE); + let social_proof_result = + value.take_attr(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME); + + if let (Ok(wallet_address), Ok(social_proof)) = + (wallet_address_result, social_proof_result) + { + Ok(Identifier::WalletAddress(EthereumIdentity { + wallet_address, + social_proof, + })) + } else { + Err(Self::Error::MalformedItem) + } + } +} diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs index 5351036c6..44e93c561 100644 --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,127 +1,129 @@ use comm_lib::aws::ddb::types::AttributeValue; use comm_lib::aws::DynamoDBError; use comm_lib::database::{DBItemAttributeError, DBItemError, Value}; use std::collections::hash_map::HashMap; use std::fmt::{Display, Formatter, Result as FmtResult}; use tracing::error; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(...)] Transport(tonic::transport::Error), #[display(...)] Status(tonic::Status), #[display(...)] MissingItem, #[display(...)] DeviceList(DeviceListError), + #[display(...)] + MalformedItem, } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DeviceListError { DeviceAlreadyExists, ConcurrentUpdateError, } #[deprecated(note = "Use comm-lib traits instead")] pub trait FromAttributeValue { fn to_vec( &self, attr_name: &str, ) -> Result<&Vec, DBItemError>; fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError>; fn to_hashmap( &self, attr_name: &str, ) -> Result<&HashMap, DBItemError>; } fn handle_attr_failure(value: &AttributeValue, attr_name: &str) -> DBItemError { DBItemError { attribute_name: attr_name.to_string(), attribute_value: Value::AttributeValue(Some(value.clone())), attribute_error: DBItemAttributeError::IncorrectType, } } impl FromAttributeValue for AttributeValue { fn to_vec( &self, attr_name: &str, ) -> Result<&Vec, DBItemError> { self.as_l().map_err(|e| handle_attr_failure(e, attr_name)) } fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError> { self.as_s().map_err(|e| handle_attr_failure(e, attr_name)) } fn to_hashmap( &self, attr_name: &str, ) -> Result<&HashMap, DBItemError> { self.as_m().map_err(|e| handle_attr_failure(e, attr_name)) } } pub trait AttributeValueFromHashMap { fn get_string(&self, key: &str) -> Result<&String, DBItemError>; fn get_map( &self, key: &str, ) -> Result<&HashMap, DBItemError>; fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError>; } impl AttributeValueFromHashMap for HashMap { fn get_string(&self, key: &str) -> Result<&String, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_string(key) } fn get_map( &self, key: &str, ) -> Result<&HashMap, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_hashmap(key) } fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError> { self .get(key) .ok_or(DBItemError { attribute_name: key.to_string(), attribute_value: None.into(), attribute_error: DBItemAttributeError::Missing, })? .to_vec(key) } } pub fn consume_error(result: Result) { match result { Ok(_) => (), Err(e) => { error!("{}", e); } } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index bf6565ddf..175c9af68 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,508 +1,522 @@ use std::collections::HashMap; use crate::config::CONFIG; use crate::database::DeviceListRow; use crate::grpc_utils::DeviceInfoWithAuth; use crate::{ client_service::{ handle_db_error, CacheExt, UpdateState, WorkflowInProgress, }, constants::request_metadata, database::DatabaseClient, ddb_utils::DateTimeExt, grpc_services::shared::get_value, token::AuthType, }; use chrono::{DateTime, Utc}; use comm_opaque2::grpc::protocol_error_to_grpc_status; use moka::future::Cache; use tonic::{Request, Response, Status}; use tracing::{debug, error}; use super::protos::auth::{ - find_user_id_request, identity_client_service_server::IdentityClientService, + find_user_id_request, identity, + identity_client_service_server::IdentityClientService, EthereumIdentity, FindUserIdRequest, FindUserIdResponse, GetDeviceListRequest, - GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, + GetDeviceListResponse, Identity, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, RefreshUserPrekeysRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, }; use super::protos::unauth::{Empty, IdentityKeyInfo, Prekey}; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, cache: Cache, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { debug!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID) .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, request_metadata::DEVICE_ID) .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_prekeys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_prekeys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .set_prekey( user_id, device_id, content_keys.prekey, content_keys.prekey_signature, notif_keys.prekey, notif_keys.prekey_signature, ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let devices_map = self .db_client .get_keys_for_user_id(&message.user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .filter_map(|(key, device_info)| { let device_info_with_auth = DeviceInfoWithAuth { device_info, auth_type: None, }; match OutboundKeyInfo::try_from(device_info_with_auth) { Ok(key_info) => Some((key, key_info)), Err(_) => { error!("Failed to transform device info for key {}", key); None } } }) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { + use identity::IdentityInfo; + let message = request.into_inner(); let devices_map = self .db_client .get_keys_for_user_id(&message.user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .filter_map(|(key, device_info)| { let device_info_with_auth = DeviceInfoWithAuth { device_info, auth_type: None, }; match InboundKeyInfo::try_from(device_info_with_auth) { Ok(key_info) => Some((key, key_info)), Err(_) => { error!("Failed to transform device info for key {}", key); None } } }) .collect::>(); + let identifier = self + .db_client + .get_user_identifier(&message.user_id) + .await + .map_err(handle_db_error)?; + + let identity_info = IdentityInfo::try_from(identifier)?; + Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, + identity: Some(Identity { + identity_info: Some(identity_info), + }), })) } async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let inner_response = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? .map(|db_keys| OutboundKeyInfo { identity_info: Some(IdentityKeyInfo { payload: db_keys.key_payload, payload_signature: db_keys.key_payload_signature, social_proof: db_keys.social_proof, }), content_prekey: Some(Prekey { prekey: db_keys.content_prekey.prekey, prekey_signature: db_keys.content_prekey.prekey_signature, }), notif_prekey: Some(Prekey { prekey: db_keys.notif_prekey.prekey, prekey_signature: db_keys.notif_prekey.prekey_signature, }), one_time_content_prekey: db_keys.content_one_time_key, one_time_notif_prekey: db_keys.notif_one_time_key, }); let response = Response::new(KeyserverKeysResponse { keyserver_info: inner_response, }); return Ok(response); } async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( device_id, message.content_one_time_prekeys, message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } async fn find_user_id( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); use find_user_id_request::Identifier; let (user_ident, auth_type) = match message.identifier { None => { return Err(tonic::Status::invalid_argument("no identifier provided")) } Some(Identifier::Username(username)) => (username, AuthType::Password), Some(Identifier::WalletAddress(address)) => (address, AuthType::Wallet), }; let (is_reserved_result, user_id_result) = tokio::join!( self .db_client .username_in_reserved_usernames_table(&user_ident), self .db_client .get_user_id_from_user_info(user_ident.clone(), &auth_type), ); let is_reserved = is_reserved_result.map_err(handle_db_error)?; let user_id = user_id_result.map_err(handle_db_error)?; Ok(Response::new(FindUserIdResponse { user_id, is_reserved, })) } async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, user_id.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdateState { user_id }; let session_id = self .cache .insert_with_uuid_key(WorkflowInProgress::Update(update_state)) .await; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self.cache.get(&message.session_id) else { return Err(tonic::Status::not_found("session not found")); }; self.cache.invalidate(&message.session_id).await; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(state.user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device_from_users_table(user_id.clone(), device_id.clone()) .await .map_err(handle_db_error)?; self .db_client .delete_access_token_data(user_id, device_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn delete_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::::from_utc_timestamp_millis(timestamp) .ok_or_else(|| tonic::Status::invalid_argument("Invalid timestamp")) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(RawDeviceList::from) .map(SignedDeviceList::try_from_raw) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(serde_json::to_string) .collect::, _>>() .map_err(|err| { error!("Failed to serialize device list updates: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } } // raw device list that can be serialized to JSON (and then signed in the future) #[derive(serde::Serialize)] struct RawDeviceList { devices: Vec, timestamp: i64, } impl From for RawDeviceList { fn from(row: DeviceListRow) -> Self { Self { devices: row.device_ids, timestamp: row.timestamp.timestamp_millis(), } } } #[derive(serde::Serialize)] #[serde(rename_all = "camelCase")] struct SignedDeviceList { /// JSON-stringified [`RawDeviceList`] raw_device_list: String, } impl SignedDeviceList { /// Serialize (and sign in the future) a [`RawDeviceList`] fn try_from_raw(raw: RawDeviceList) -> Result { let stringified_list = serde_json::to_string(&raw).map_err(|err| { error!("Failed to serialize raw device list: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Self { raw_device_list: stringified_list, }) } } #[cfg(test)] mod tests { use super::*; #[test] fn serialize_device_list_updates() { let raw_updates = vec![ RawDeviceList { devices: vec!["device1".into()], timestamp: 111111111, }, RawDeviceList { devices: vec!["device1".into(), "device2".into()], timestamp: 222222222, }, ]; let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_raw_list2 = r#"{"devices":["device1","device2"],"timestamp":222222222}"#; let signed_updates = raw_updates .into_iter() .map(SignedDeviceList::try_from_raw) .collect::, _>>() .expect("signing device list updates failed"); assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); let stringified_updates = signed_updates .iter() .map(serde_json::to_string) .collect::, _>>() .expect("serialize signed device lists failed"); let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; assert_eq!(stringified_updates[0], expected_stringified_list1); assert_eq!(stringified_updates[1], expected_stringified_list2); } } diff --git a/services/identity/src/grpc_utils.rs b/services/identity/src/grpc_utils.rs index 4c8fe5da2..0dbe7dde4 100644 --- a/services/identity/src/grpc_utils.rs +++ b/services/identity/src/grpc_utils.rs @@ -1,258 +1,277 @@ use std::collections::HashMap; use tonic::Status; use tracing::error; use crate::{ constants::{ CONTENT_ONE_TIME_KEY, NOTIF_ONE_TIME_KEY, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, }, database::DeviceKeys, + ddb_utils::Identifier as DBIdentifier, grpc_services::protos::{ - auth::{InboundKeyInfo, OutboundKeyInfo}, + auth::{ + identity::IdentityInfo, EthereumIdentity, InboundKeyInfo, OutboundKeyInfo, + }, unauth::{ DeviceKeyUpload, IdentityKeyInfo, OpaqueLoginStartRequest, Prekey, RegistrationStartRequest, ReservedRegistrationStartRequest, ReservedWalletLoginRequest, WalletLoginRequest, }, }, token::AuthType, }; pub struct DeviceInfoWithAuth<'a> { pub device_info: HashMap, pub auth_type: Option<&'a AuthType>, } impl TryFrom> for InboundKeyInfo { type Error = Status; fn try_from(data: DeviceInfoWithAuth) -> Result { let mut device_info = data.device_info; let identity_info = extract_identity_info(&mut device_info, data.auth_type)?; Ok(InboundKeyInfo { identity_info: Some(identity_info), content_prekey: Some(create_prekey( &mut device_info, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?), notif_prekey: Some(create_prekey( &mut device_info, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?), }) } } impl TryFrom> for OutboundKeyInfo { type Error = Status; fn try_from(data: DeviceInfoWithAuth) -> Result { let mut device_info = data.device_info; let identity_info = extract_identity_info(&mut device_info, data.auth_type)?; let content_one_time_key = device_info.remove(CONTENT_ONE_TIME_KEY); let notif_one_time_key = device_info.remove(NOTIF_ONE_TIME_KEY); Ok(OutboundKeyInfo { identity_info: Some(identity_info), content_prekey: Some(create_prekey( &mut device_info, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?), notif_prekey: Some(create_prekey( &mut device_info, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, )?), one_time_content_prekey: content_one_time_key, one_time_notif_prekey: notif_one_time_key, }) } } fn extract_key( device_info: &mut DeviceKeys, key: &str, ) -> Result { device_info.remove(key).ok_or_else(|| { error!("{} missing from device info", key); Status::failed_precondition("Database item malformed") }) } fn extract_identity_info( device_info: &mut HashMap, auth_type: Option<&AuthType>, ) -> Result { let payload = extract_key( device_info, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, )?; let payload_signature = extract_key( device_info, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, )?; let social_proof = device_info.remove(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME); if social_proof.is_none() && auth_type == Some(&AuthType::Wallet) { error!("Social proof missing for wallet user"); return Err(Status::failed_precondition("Database item malformed")); } Ok(IdentityKeyInfo { payload, payload_signature, social_proof, }) } fn create_prekey( device_info: &mut HashMap, key_attr: &str, signature_attr: &str, ) -> Result { Ok(Prekey { prekey: extract_key(device_info, key_attr)?, prekey_signature: extract_key(device_info, signature_attr)?, }) } pub trait DeviceKeyUploadData { fn device_key_upload(&self) -> Option<&DeviceKeyUpload>; } impl DeviceKeyUploadData for RegistrationStartRequest { fn device_key_upload(&self) -> Option<&DeviceKeyUpload> { self.device_key_upload.as_ref() } } impl DeviceKeyUploadData for ReservedRegistrationStartRequest { fn device_key_upload(&self) -> Option<&DeviceKeyUpload> { self.device_key_upload.as_ref() } } impl DeviceKeyUploadData for OpaqueLoginStartRequest { fn device_key_upload(&self) -> Option<&DeviceKeyUpload> { self.device_key_upload.as_ref() } } impl DeviceKeyUploadData for WalletLoginRequest { fn device_key_upload(&self) -> Option<&DeviceKeyUpload> { self.device_key_upload.as_ref() } } impl DeviceKeyUploadData for ReservedWalletLoginRequest { fn device_key_upload(&self) -> Option<&DeviceKeyUpload> { self.device_key_upload.as_ref() } } pub trait DeviceKeyUploadActions { fn payload(&self) -> Result; fn payload_signature(&self) -> Result; fn content_prekey(&self) -> Result; fn content_prekey_signature(&self) -> Result; fn notif_prekey(&self) -> Result; fn notif_prekey_signature(&self) -> Result; fn one_time_content_prekeys(&self) -> Result, Status>; fn one_time_notif_prekeys(&self) -> Result, Status>; fn device_type(&self) -> Result; fn social_proof(&self) -> Result, Status>; } impl DeviceKeyUploadActions for T { fn payload(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.device_key_info.as_ref()) .map(|info| info.payload.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn payload_signature(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.device_key_info.as_ref()) .map(|info| info.payload_signature.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn content_prekey(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.content_upload.as_ref()) .map(|prekey| prekey.prekey.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn content_prekey_signature(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.content_upload.as_ref()) .map(|prekey| prekey.prekey_signature.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn notif_prekey(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.notif_upload.as_ref()) .map(|prekey| prekey.prekey.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn notif_prekey_signature(&self) -> Result { self .device_key_upload() .and_then(|upload| upload.notif_upload.as_ref()) .map(|prekey| prekey.prekey_signature.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn one_time_content_prekeys(&self) -> Result, Status> { self .device_key_upload() .map(|upload| upload.one_time_content_prekeys.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn one_time_notif_prekeys(&self) -> Result, Status> { self .device_key_upload() .map(|upload| upload.one_time_notif_prekeys.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn device_type(&self) -> Result { self .device_key_upload() .map(|upload| upload.device_type) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } fn social_proof(&self) -> Result, Status> { self .device_key_upload() .and_then(|upload| upload.device_key_info.as_ref()) .map(|info| info.social_proof.clone()) .ok_or_else(|| Status::invalid_argument("unexpected message data")) } } + +impl TryFrom for IdentityInfo { + type Error = Status; + + fn try_from(value: DBIdentifier) -> Result { + match value { + DBIdentifier::Username(username) => Ok(IdentityInfo::Username(username)), + DBIdentifier::WalletAddress(eth_identity) => { + Ok(IdentityInfo::EthIdentity(EthereumIdentity { + wallet_address: eth_identity.wallet_address, + social_proof: eth_identity.social_proof, + })) + } + } + } +} diff --git a/shared/protos/identity_auth.proto b/shared/protos/identity_auth.proto index af60151f6..94ddc8c2f 100644 --- a/shared/protos/identity_auth.proto +++ b/shared/protos/identity_auth.proto @@ -1,184 +1,198 @@ syntax = "proto3"; import "identity_unauth.proto"; package identity.auth; // RPCs from a client (iOS, Android, or web) to identity service // // This service will assert authenticity of a device by verifying the access // token through an interceptor, thus avoiding the need to explicitly pass // the credentials on every request service IdentityClientService { // X3DH actions // Replenish one-time preKeys rpc UploadOneTimeKeys(UploadOneTimeKeysRequest) returns (identity.unauth.Empty) {} // Rotate a device's prekey and prekey signature // Rotated for deniability of older messages rpc RefreshUserPrekeys(RefreshUserPrekeysRequest) returns (identity.unauth.Empty) {} // Called by clients to get all device keys associated with a user in order // to open a new channel of communication on any of their devices. // Specially, this will return the following per device: // - Identity keys (both Content and Notif Keys) // - Prekey (including prekey signature) // - One-time Prekey rpc GetOutboundKeysForUser(OutboundKeysForUserRequest) returns (OutboundKeysForUserResponse) {} - // Called by receivers of a communication request. The reponse will only - // return identity keys (both content and notif keys) and related prekeys per - // device, but will not contain one-time keys. + // Called by receivers of a communication request. The reponse will return + // identity keys (both content and notif keys) and related prekeys per device, + // but will not contain one-time keys. Additionally, the response will contain + // the other user's username. rpc GetInboundKeysForUser(InboundKeysForUserRequest) returns (InboundKeysForUserResponse) {} // Called by user to update password and receive new access token rpc UpdateUserPasswordStart(UpdateUserPasswordStartRequest) returns (UpdateUserPasswordStartResponse) {} rpc UpdateUserPasswordFinish(UpdateUserPasswordFinishRequest) returns (identity.unauth.Empty) {} // Called by user to log out (clears device's keys and access token) rpc LogOutUser(identity.unauth.Empty) returns (identity.unauth.Empty) {} // Called by a user to delete their own account rpc DeleteUser(identity.unauth.Empty) returns (identity.unauth.Empty) {} // Called by clients to get required keys for opening a connection // to a user's keyserver rpc GetKeyserverKeys(OutboundKeysForUserRequest) returns (KeyserverKeysResponse) {} // Returns userID for given username or wallet address rpc FindUserID(FindUserIDRequest) returns (FindUserIDResponse) {} // Returns device list history rpc GetDeviceListForUser(GetDeviceListRequest) returns (GetDeviceListResponse) {} } // Helper types +message EthereumIdentity { + string wallet_address = 1; + string social_proof = 2; +} + +message Identity { + oneof identity_info { + string username = 1; + EthereumIdentity eth_identity = 2; + } +} + // UploadOneTimeKeys // As OPKs get exhausted, they need to be refreshed message UploadOneTimeKeysRequest { repeated string content_one_time_prekeys = 1; repeated string notif_one_time_prekeys = 2; } // RefreshUserPreKeys message RefreshUserPrekeysRequest { identity.unauth.Prekey new_content_prekeys = 1; identity.unauth.Prekey new_notif_prekeys = 2; } // Information needed when establishing communication to someone else's device message OutboundKeyInfo { identity.unauth.IdentityKeyInfo identity_info = 1; identity.unauth.Prekey content_prekey = 2; identity.unauth.Prekey notif_prekey = 3; optional string one_time_content_prekey = 4; optional string one_time_notif_prekey = 5; } message KeyserverKeysResponse { optional OutboundKeyInfo keyserver_info = 1; } // GetOutboundKeysForUser message OutboundKeysForUserResponse { // Map is keyed on devices' public ed25519 key used for signing map devices = 1; } // Information needed by a device to establish communcation when responding // to a request. // The device receiving a request only needs the content key and prekey. message OutboundKeysForUserRequest { string user_id = 1; } // GetInboundKeysForUser message InboundKeyInfo { identity.unauth.IdentityKeyInfo identity_info = 1; identity.unauth.Prekey content_prekey = 2; identity.unauth.Prekey notif_prekey = 3; } message InboundKeysForUserResponse { // Map is keyed on devices' public ed25519 key used for signing map devices = 1; + Identity identity = 2; } message InboundKeysForUserRequest { string user_id = 1; } // FindUserID message FindUserIDRequest { oneof identifier { string username = 1; string wallet_address = 2; } } message FindUserIDResponse { // userID if the user is registered with Identity Service, null otherwise optional string user_id = 1; // true if the identifier (username or wallet address) exists in the // reserved usernames list, false otherwise. It doesn't take into account // whether the user is registered with Identity Service (userID != null). bool is_reserved = 2; } // UpdateUserPassword // Request for updating a user, similar to registration but need a // access token to validate user before updating password message UpdateUserPasswordStartRequest { // Message sent to initiate PAKE registration (step 1) bytes opaque_registration_request = 1; } // Do a user registration, but overwrite the existing credentials // after validation of user message UpdateUserPasswordFinishRequest { // Identifier used to correlate start and finish request string session_id = 1; // Opaque client registration upload (step 3) bytes opaque_registration_upload = 2; } message UpdateUserPasswordStartResponse { // Identifier used to correlate start request with finish request string session_id = 1; bytes opaque_registration_response = 2; } // GetDeviceListForUser message GetDeviceListRequest { // User whose device lists we want to retrieve string user_id = 1; // UTC timestamp in milliseconds // If none, whole device list history will be retrieved optional int64 since_timestamp = 2; } message GetDeviceListResponse { // A list of stringified JSON objects of the following format: // { // "rawDeviceList": JSON.stringify({ // "devices": [, ...] // "timestamp": , // }) // } repeated string device_list_updates = 1; }