diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs index 3091aad3a..dacb9a6fb 100644 --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -1,93 +1,93 @@ use comm_opaque2::client::Registration; use rand::{distributions::Alphanumeric, Rng}; mod proto { tonic::include_proto!("identity.client"); } +use proto as client; use proto::{ identity_client_service_client::IdentityClientServiceClient, DeviceKeyUpload, - DeviceType, IdentityKeyInfo, PreKey, RegistrationFinishRequest, - RegistrationStartRequest, + IdentityKeyInfo, PreKey, RegistrationFinishRequest, RegistrationStartRequest, }; pub struct DeviceInfo { pub username: String, pub user_id: String, pub device_id: String, pub access_token: String, } pub async fn create_device() -> DeviceInfo { let password = "pass"; let username: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); // TODO: Generate dynamic valid olm account info let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; // The ed25519 value from the olm payload let device_id = r#"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg"#; let mut client_registration = Registration::new(); let opaque_registration_request = client_registration.start(&password).unwrap(); let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), social_proof: None, }), content_upload: Some(PreKey { pre_key: "content_prekey".to_string(), pre_key_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(PreKey { pre_key: "notif_prekey".to_string(), pre_key_signature: "notif_prekey_sig".to_string(), }), onetime_content_prekeys: Vec::new(), onetime_notif_prekeys: Vec::new(), - device_type: DeviceType::Keyserver.into(), + device_type: client::DeviceType::Keyserver.into(), }), }; // TODO: allow endpoint to be configured let mut identity_client = IdentityClientServiceClient::connect("http://127.0.0.1:50054") .await .expect("Couldn't connect to identitiy service"); let registration_start_response = identity_client .register_password_user_start(registration_start_request) .await .unwrap() .into_inner(); let opaque_registration_upload = client_registration .finish( &password, ®istration_start_response.opaque_registration_response, ) .unwrap(); let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let registration_finish_response = identity_client .register_password_user_finish(registration_finish_request) .await .unwrap() .into_inner(); return DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: registration_finish_response.user_id, access_token: registration_finish_response.access_token, }; } diff --git a/services/commtest/tests/identity_keyserver_tests.rs b/services/commtest/tests/identity_keyserver_tests.rs new file mode 100644 index 000000000..a220cdada --- /dev/null +++ b/services/commtest/tests/identity_keyserver_tests.rs @@ -0,0 +1,85 @@ +mod proto { + tonic::include_proto!("identity.client"); +} +use proto as client; +mod auth_proto { + tonic::include_proto!("identity.authenticated"); +} +use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; +use auth_proto::OutboundKeysForUserRequest; +use client::UploadOneTimeKeysRequest; +use commtest::identity::device::create_device; +use tonic::{transport::Endpoint, Request}; + +#[tokio::test] +async fn set_prekey() { + let device_info = create_device().await; + + let channel = Endpoint::from_static("http://[::1]:50054") + .connect() + .await + .unwrap(); + + let mut client = + AuthClient::with_interceptor(channel, |mut request: Request<()>| { + let metadata = request.metadata_mut(); + metadata.insert("user_id", device_info.user_id.parse().unwrap()); + metadata.insert("device_id", device_info.device_id.parse().unwrap()); + metadata + .insert("access_token", device_info.access_token.parse().unwrap()); + Ok(request) + }); + + let upload_request = UploadOneTimeKeysRequest { + user_id: device_info.user_id.to_string(), + device_id: device_info.device_id.to_string(), + access_token: device_info.access_token.to_string(), + content_one_time_pre_keys: vec!["content1".to_string()], + notif_one_time_pre_keys: vec!["notif1".to_string()], + }; + + let mut unauthenticated_client = + proto::identity_client_service_client::IdentityClientServiceClient::connect("http://127.0.0.1:50054") + .await + .expect("Couldn't connect to identitiy service"); + + unauthenticated_client + .upload_one_time_keys(upload_request) + .await + .expect("Failed to upload keys"); + + // Currently allowed to request your own outbound keys + let keyserver_request = OutboundKeysForUserRequest { + user_id: device_info.user_id.clone(), + }; + + println!("Getting keyserver info for user, {}", device_info.user_id); + let first_reponse = client + .get_keyserver_keys(keyserver_request.clone()) + .await + .expect("Second keyserver keys request failed") + .into_inner() + .keyserver_info + .unwrap(); + + assert_eq!( + first_reponse.onetime_content_prekey, + Some("content1".to_string()) + ); + assert_eq!( + first_reponse.onetime_notif_prekey, + Some("notif1".to_string()) + ); + + let second_reponse = client + .get_keyserver_keys(keyserver_request) + .await + .expect("Second keyserver keys request failed") + .into_inner() + .keyserver_info + .unwrap(); + + // The one time keys should be exhausted + assert_eq!(second_reponse.onetime_content_prekey, None); + assert_eq!(second_reponse.onetime_notif_prekey, None); +} diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs index a19863baf..ea4c4ac1b 100644 --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -1,1247 +1,1457 @@ use constant_time_eq::constant_time_eq; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::str::FromStr; use std::sync::Arc; -use crate::ddb_utils::{into_one_time_put_requests, OlmAccountType}; +use crate::ddb_utils::{ + create_one_time_key_partition_key, into_one_time_put_requests, OlmAccountType, +}; use crate::error::{DBItemAttributeError, DBItemError, Error}; use aws_config::SdkConfig; use aws_sdk_dynamodb::model::{AttributeValue, PutRequest, WriteRequest}; use aws_sdk_dynamodb::output::{ DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, }; use aws_sdk_dynamodb::{types::Blob, Client}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, warn}; use crate::client_service::{FlattenedDeviceKeyUpload, UserRegistrationInfo}; use crate::config::CONFIG; use crate::constants::{ ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, RESERVED_USERNAMES_TABLE, RESERVED_USERNAMES_TABLE_PARTITION_KEY, USERS_TABLE, USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, }; +use crate::error::{AttributeValueFromHashMap, FromAttributeValue}; use crate::id::generate_uuid; use crate::nonce::NonceData; use crate::token::{AccessTokenData, AuthType}; #[derive(Serialize, Deserialize)] pub struct OlmKeys { pub curve25519: String, pub ed25519: String, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct KeyPayload { pub notification_identity_public_keys: OlmKeys, pub primary_identity_public_keys: OlmKeys, } impl FromStr for KeyPayload { type Err = serde_json::Error; // The payload is held in the database as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to serialize fn from_str(payload: &str) -> Result { serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) } } #[derive(Clone, Copy)] pub enum Device { // Numeric values should match the protobuf definition Keyserver = 0, Native, Web, } impl TryFrom for Device { type Error = crate::error::Error; fn try_from(value: i32) -> Result { match value { 0 => Ok(Device::Keyserver), 1 => Ok(Device::Native), 2 => Ok(Device::Web), _ => Err(Error::Attribute(DBItemError { attribute_name: USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME .to_string(), attribute_value: Some(AttributeValue::N(value.to_string())), attribute_error: DBItemAttributeError::InvalidValue, })), } } } impl Display for Device { fn fmt(&self, f: &mut Formatter) -> FmtResult { match self { Device::Keyserver => write!(f, "keyserver"), Device::Native => write!(f, "native"), Device::Web => write!(f, "web"), } } } +// This is very similar to the protobuf definitions, however, +// coupling the protobuf schema to the database API should be avoided. +pub struct PreKey { + pub prekey: String, + pub prekey_signature: String, +} +pub struct OutboundKeys { + pub key_payload: String, + pub key_payload_signature: String, + pub social_proof: Option, + pub content_prekey: PreKey, + pub notif_prekey: PreKey, + pub content_one_time_key: Option, + pub notif_one_time_key: Option, +} + #[derive(Clone)] pub struct DatabaseClient { client: Arc, } impl DatabaseClient { pub fn new(aws_config: &SdkConfig) -> Self { let client = match &CONFIG.localstack_endpoint { Some(endpoint) => { info!( "Configuring DynamoDB client to use LocalStack endpoint: {}", endpoint ); let ddb_config_builder = aws_sdk_dynamodb::config::Builder::from(aws_config) .endpoint_url(endpoint); Client::from_conf(ddb_config_builder.build()) } None => Client::new(aws_config), }; DatabaseClient { client: Arc::new(client), } } pub async fn add_password_user_to_users_table( &self, registration_state: UserRegistrationInfo, password_file: Vec, ) -> Result { self .add_user_to_users_table( registration_state.flattened_device_key_upload, Some((registration_state.username, Blob::new(password_file))), None, None, ) .await } pub async fn add_wallet_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, wallet_address: String, social_proof: String, ) -> Result { self .add_user_to_users_table( flattened_device_key_upload, None, Some(wallet_address), Some(social_proof), ) .await } async fn add_user_to_users_table( &self, flattened_device_key_upload: FlattenedDeviceKeyUpload, username_and_password_file: Option<(String, Blob)>, wallet_address: Option, social_proof: Option, ) -> Result { let user_id = generate_uuid(); let device_info = create_device_info(flattened_device_key_upload.clone(), social_proof); let devices = HashMap::from([( flattened_device_key_upload.device_id_key.clone(), AttributeValue::M(device_info), )]); let mut user = HashMap::from([ ( USERS_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id.clone()), ), ( USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), AttributeValue::M(devices), ), ]); if let Some((username, password_file)) = username_and_password_file { user.insert( USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), AttributeValue::S(username), ); user.insert( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), AttributeValue::B(password_file), ); } if let Some(address) = wallet_address { user.insert( USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE.to_string(), AttributeValue::S(address), ); } self .client .put_item() .table_name(USERS_TABLE) .set_item(Some(user)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( flattened_device_key_upload.device_id_key, flattened_device_key_upload.content_onetime_keys, flattened_device_key_upload.notif_onetime_keys, ) .await?; Ok(user_id) } pub async fn add_password_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, ) -> Result<(), Error> { self .add_device_to_users_table(user_id, flattened_device_key_upload, None) .await } pub async fn add_wallet_user_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: String, ) -> Result<(), Error> { self .add_device_to_users_table( user_id, flattened_device_key_upload, Some(social_proof), ) .await } + pub async fn get_keyserver_keys_for_user( + &self, + user_id: &str, + ) -> Result, Error> { + // DynamoDB doesn't have a way to "pop" a value from a list, so we must + // first read in user info, then update one_time_keys with value we + // gave to requester + let user_info = self + .get_item_from_users_table(&user_id) + .await? + .item + .ok_or(Error::MissingItem)?; + + let devices = user_info + .get(USERS_TABLE_DEVICES_ATTRIBUTE) + .ok_or(Error::MissingItem)? + .to_hashmap(USERS_TABLE_DEVICES_ATTRIBUTE)?; + + let mut maybe_keyserver_id = None; + for (device_id, device_info) in devices { + let device_type = device_info + .to_hashmap("device_id")? + .get(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME) + .ok_or(Error::MissingItem)? + .to_string(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME)?; + + if device_type == "keyserver" { + maybe_keyserver_id = Some(device_id); + break; + } + } + + // Assert that the user has a keyserver, if they don't return None + let keyserver_id = match maybe_keyserver_id { + None => return Ok(None), + Some(id) => id, + }; + + let keyserver = devices.get_map(keyserver_id)?; + let notif_one_time_key: Option = self + .get_onetime_key(keyserver_id, OlmAccountType::Notification) + .await?; + let content_one_time_key: Option = self + .get_onetime_key(keyserver_id, OlmAccountType::Content) + .await?; + + debug!( + "Able to get notif key for keyserver {}: {}", + keyserver_id, + notif_one_time_key.is_some() + ); + debug!( + "Able to get content key for keyserver {}: {}", + keyserver_id, + content_one_time_key.is_some() + ); + + let content_prekey = keyserver + .get_string(USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME)?; + let content_prekey_signature = keyserver.get_string( + USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, + )?; + let notif_prekey = keyserver + .get_string(USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME)?; + let notif_prekey_signature = keyserver.get_string( + USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, + )?; + let key_payload = keyserver + .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME)? + .to_string(); + let key_payload_signature = keyserver + .get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME)? + .to_string(); + let social_proof = keyserver + .get(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) + .map(|s| { + s.to_string(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) + .ok() + }) + .flatten() + .map(|s| s.to_owned()); + + let full_content_prekey = PreKey { + prekey: content_prekey.to_string(), + prekey_signature: content_prekey_signature.to_string(), + }; + + let full_notif_prekey = PreKey { + prekey: notif_prekey.to_string(), + prekey_signature: notif_prekey_signature.to_string(), + }; + + let outbound_payload = OutboundKeys { + key_payload, + key_payload_signature, + social_proof, + content_prekey: full_content_prekey, + notif_prekey: full_notif_prekey, + content_one_time_key, + notif_one_time_key, + }; + + return Ok(Some(outbound_payload)); + } + + /// Will "mint" a single onetime key by attempting to successfully deleting + /// a key + pub async fn get_onetime_key( + &self, + device_id: &str, + account_type: OlmAccountType, + ) -> Result, Error> { + use crate::constants::one_time_keys_table as otk_table; + + let query_result = self.get_onetime_keys(device_id, account_type).await?; + let items = query_result.items(); + + // If no onetime keys exists, return none early + if items.is_none() { + debug!("Unable to find {:?} onetime-key", account_type); + return Ok(None); + } + + let mut result = None; + + // "items" was checked to be None above, will be safe to unwrap here. + // Attempt to delete the onetime keys individually, a successful delete + // mints the onetime key to the requester + for item in items.unwrap() { + let pk = item.get_string(otk_table::PARTITION_KEY)?; + let otk = item.get_string(otk_table::SORT_KEY)?; + + let composite_key = HashMap::from([ + ( + otk_table::PARTITION_KEY.to_string(), + AttributeValue::S(pk.to_string()), + ), + ( + otk_table::SORT_KEY.to_string(), + AttributeValue::S(otk.to_string()), + ), + ]); + + debug!("Attempting to delete a {:?} onetime-key", account_type); + match self + .client + .delete_item() + .set_key(Some(composite_key)) + .table_name(otk_table::NAME) + .send() + .await + { + Ok(_) => { + result = Some(otk.to_string()); + break; + } + // This err should only happen if a delete occurred between the read + // above and this delete + Err(e) => { + debug!("Unable to delete key: {:?}", e); + continue; + } + } + } + + // Return deleted key + Ok(result) + } + + pub async fn get_onetime_keys( + &self, + device_id: &str, + account_type: OlmAccountType, + ) -> Result { + use crate::constants::one_time_keys_table::*; + + // Add related prefix to partition key to grab the correct result set + let partition_key = + create_one_time_key_partition_key(device_id, account_type); + + self + .client + .query() + .table_name(NAME) + .key_condition_expression(format!("{} = :pk", PARTITION_KEY)) + .expression_attribute_values(":pk", AttributeValue::S(partition_key)) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into())) + } + pub async fn set_prekey( &self, user_id: String, device_id: String, content_prekey: String, content_prekey_signature: String, notif_prekey: String, notif_prekey_signature: String, ) -> Result<(), Error> { let notif_prekey_av = AttributeValue::S(notif_prekey); let notif_prekey_signature_av = AttributeValue::S(notif_prekey_signature); let content_prekey_av = AttributeValue::S(content_prekey); let content_prekey_signature_av = AttributeValue::S(content_prekey_signature); let update_expression = format!("SET {0}.#{1}.{2} = :n, {0}.#{1}.{3} = :p, {0}.#{1}.{4} = :c, {0}.#{1}.{5} = :d", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID", USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME, USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, ); let expression_attribute_names = HashMap::from([ (format!("#{}", "deviceID"), device_id), ( "#user_id".to_string(), USERS_TABLE_PARTITION_KEY.to_string(), ), ]); let expression_attribute_values = HashMap::from([ (":n".to_string(), notif_prekey_av), (":p".to_string(), notif_prekey_signature_av), (":c".to_string(), content_prekey_av), (":d".to_string(), content_prekey_signature_av), ]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .condition_expression("attribute_exists(#user_id)") .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn append_one_time_prekeys( &self, device_id: String, content_one_time_keys: Vec, notif_one_time_keys: Vec, ) -> Result<(), Error> { use crate::constants::one_time_keys_table; let mut otk_requests = into_one_time_put_requests( &device_id, content_one_time_keys, OlmAccountType::Content, ); let notif_otk_requests: Vec = into_one_time_put_requests( &device_id, notif_one_time_keys, OlmAccountType::Notification, ); otk_requests.extend(notif_otk_requests); // BatchWriteItem has a hard limit of 25 writes per call for requests in otk_requests.chunks(25) { self .client .batch_write_item() .request_items(one_time_keys_table::NAME, requests.to_vec()) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } Ok(()) } async fn add_device_to_users_table( &self, user_id: String, flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> Result<(), Error> { // Avoid borrowing from lifetime of flattened_device_key_upload let device_id = flattened_device_key_upload.device_id_key.clone(); let content_one_time_keys = flattened_device_key_upload.content_onetime_keys.clone(); let notif_one_time_keys = flattened_device_key_upload.notif_onetime_keys.clone(); let device_info = create_device_info(flattened_device_key_upload, social_proof); let update_expression = format!("SET {}.#{} = :v", USERS_TABLE_DEVICES_ATTRIBUTE, "deviceID",); let expression_attribute_names = HashMap::from([(format!("#{}", "deviceID"), device_id.clone())]); let expression_attribute_values = HashMap::from([(":v".to_string(), AttributeValue::M(device_info))]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_names(Some(expression_attribute_names)) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; self .append_one_time_prekeys( device_id, content_one_time_keys, notif_one_time_keys, ) .await?; Ok(()) } pub async fn remove_device_from_users_table( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { let update_expression = format!("REMOVE {}.{}", USERS_TABLE_DEVICES_ATTRIBUTE, ":deviceID"); let expression_attribute_values = HashMap::from([( ":deviceID".to_string(), AttributeValue::S(device_id_key), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn update_user_password( &self, user_id: String, password_file: Vec, ) -> Result<(), Error> { let update_expression = format!("SET {} = :p", USERS_TABLE_REGISTRATION_ATTRIBUTE); let expression_attribute_values = HashMap::from([( ":p".to_string(), AttributeValue::B(Blob::new(password_file)), )]); self .client .update_item() .table_name(USERS_TABLE) .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) .update_expression(update_expression) .set_expression_attribute_values(Some(expression_attribute_values)) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn delete_user( &self, user_id: String, ) -> Result { debug!("Attempting to delete user: {}", user_id); match self .client .delete_item() .table_name(USERS_TABLE) .key( USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id.clone()), ) .send() .await { Ok(out) => { info!("User has been deleted {}", user_id); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete user {}", user_id); Err(Error::AwsSdk(e.into())) } } } pub async fn get_access_token_data( &self, user_id: String, signing_public_key: String, ) -> Result, Error> { let primary_key = create_composite_primary_key( ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), user_id.clone(), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), signing_public_key.clone(), ), ); let get_item_result = self .client .get_item() .table_name(ACCESS_TOKEN_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await; match get_item_result { Ok(GetItemOutput { item: Some(mut item), .. }) => { let created = parse_date_time_attribute( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, item.remove(ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE), )?; let auth_type = parse_auth_type_attribute( item.remove(ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE), )?; let valid = parse_valid_attribute( item.remove(ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE), )?; let access_token = parse_token_attribute( item.remove(ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE), )?; Ok(Some(AccessTokenData { user_id, signing_public_key, access_token, created, auth_type, valid, })) } Ok(_) => { info!( "No item found for user {} and signing public key {} in token table", user_id, signing_public_key ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get token for user {} with signing public key {}: {}", user_id, signing_public_key, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn verify_access_token( &self, user_id: String, signing_public_key: String, access_token_to_verify: String, ) -> Result { let is_valid = self .get_access_token_data(user_id, signing_public_key) .await? .map(|access_token_data| { constant_time_eq( access_token_data.access_token.as_bytes(), access_token_to_verify.as_bytes(), ) && access_token_data.is_valid() }) .unwrap_or(false); Ok(is_valid) } pub async fn put_access_token_data( &self, access_token_data: AccessTokenData, ) -> Result { let item = HashMap::from([ ( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(access_token_data.user_id), ), ( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(access_token_data.signing_public_key), ), ( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.access_token), ), ( ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(access_token_data.created.to_rfc3339()), ), ( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), AttributeValue::S(match access_token_data.auth_type { AuthType::Password => "password".to_string(), AuthType::Wallet => "wallet".to_string(), }), ), ( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), AttributeValue::Bool(access_token_data.valid), ), ]); self .client .put_item() .table_name(ACCESS_TOKEN_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn delete_access_token_data( &self, user_id: String, device_id_key: String, ) -> Result<(), Error> { self .client .delete_item() .table_name(ACCESS_TOKEN_TABLE) .key( ACCESS_TOKEN_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(user_id), ) .key( ACCESS_TOKEN_SORT_KEY.to_string(), AttributeValue::S(device_id_key), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn username_taken(&self, username: String) -> Result { let result = self .get_user_id_from_user_info(username, AuthType::Password) .await?; Ok(result.is_some()) } pub async fn filter_out_taken_usernames( &self, usernames: Vec, ) -> Result, Error> { let db_usernames = self.get_all_usernames().await?; let db_usernames_set: HashSet = db_usernames.into_iter().collect(); let usernames_set: HashSet = usernames.into_iter().collect(); let available_usernames: Vec = usernames_set .difference(&db_usernames_set) .cloned() .collect(); Ok(available_usernames) } async fn get_user_from_user_info( &self, user_info: String, auth_type: AuthType, ) -> Result>, Error> { let (index, attribute_name) = match auth_type { AuthType::Password => { (USERS_TABLE_USERNAME_INDEX, USERS_TABLE_USERNAME_ATTRIBUTE) } AuthType::Wallet => ( USERS_TABLE_WALLET_ADDRESS_INDEX, USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, ), }; match self .client .query() .table_name(USERS_TABLE) .index_name(index) .key_condition_expression(format!("{} = :u", attribute_name)) .expression_attribute_values(":u", AttributeValue::S(user_info.clone())) .send() .await { Ok(QueryOutput { items: Some(items), .. }) => { let num_items = items.len(); if num_items == 0 { return Ok(None); } if num_items > 1 { warn!( "{} user IDs associated with {} {}: {:?}", num_items, attribute_name, user_info, items ); } let first_item = items[0].clone(); let user_id = first_item .get(USERS_TABLE_PARTITION_KEY) .ok_or(DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: None, attribute_error: DBItemAttributeError::Missing, })? .as_s() .map_err(|_| DBItemError { attribute_name: USERS_TABLE_PARTITION_KEY.to_string(), attribute_value: first_item.get(USERS_TABLE_PARTITION_KEY).cloned(), attribute_error: DBItemAttributeError::IncorrectType, })?; let result = self.get_item_from_users_table(user_id).await?; Ok(result.item) } Ok(_) => { info!( "No item found for {} {} in users table", attribute_name, user_info ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get user from {} {}: {}", attribute_name, user_info, e ); Err(Error::AwsSdk(e.into())) } } } pub async fn get_user_id_from_user_info( &self, user_info: String, auth_type: AuthType, ) -> Result, Error> { match self .get_user_from_user_info(user_info.clone(), auth_type) .await { Ok(Some(mut user)) => parse_string_attribute( USERS_TABLE_PARTITION_KEY, user.remove(USERS_TABLE_PARTITION_KEY), ) .map(Some) .map_err(Error::Attribute), Ok(_) => Ok(None), Err(e) => Err(e), } } pub async fn get_user_id_and_password_file_from_username( &self, username: &str, ) -> Result)>, Error> { match self .get_user_from_user_info(username.to_string(), AuthType::Password) .await { Ok(Some(mut user)) => { let user_id = parse_string_attribute( USERS_TABLE_PARTITION_KEY, user.remove(USERS_TABLE_PARTITION_KEY), )?; let password_file = parse_registration_data_attribute( user.remove(USERS_TABLE_REGISTRATION_ATTRIBUTE), )?; Ok(Some((user_id, password_file))) } Ok(_) => { info!( "No item found for user {} in PAKE registration table", username ); Ok(None) } Err(e) => { error!( "DynamoDB client failed to get registration data for user {}: {}", username, e ); Err(e) } } } pub async fn get_item_from_users_table( &self, user_id: &str, ) -> Result { let primary_key = create_simple_primary_key(( USERS_TABLE_PARTITION_KEY.to_string(), user_id.to_string(), )); self .client .get_item() .table_name(USERS_TABLE) .set_key(Some(primary_key)) .consistent_read(true) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } async fn get_all_usernames(&self) -> Result, Error> { let scan_output = self .client .scan() .table_name(USERS_TABLE) .projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let mut result = Vec::new(); if let Some(attributes) = scan_output.items { for mut attribute in attributes { if let Ok(username) = parse_string_attribute( USERS_TABLE_USERNAME_ATTRIBUTE, attribute.remove(USERS_TABLE_USERNAME_ATTRIBUTE), ) { result.push(username); } } } Ok(result) } pub async fn add_nonce_to_nonces_table( &self, nonce_data: NonceData, ) -> Result { let item = HashMap::from([ ( NONCE_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(nonce_data.nonce), ), ( NONCE_TABLE_CREATED_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.created.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string(), AttributeValue::S(nonce_data.expiration_time.to_rfc3339()), ), ( NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE.to_string(), AttributeValue::N(nonce_data.expiration_time.timestamp().to_string()), ), ]); self .client .put_item() .table_name(NONCE_TABLE) .set_item(Some(item)) .send() .await .map_err(|e| Error::AwsSdk(e.into())) } pub async fn get_nonce_from_nonces_table( &self, nonce_value: impl Into, ) -> Result, Error> { let get_response = self .client .get_item() .table_name(NONCE_TABLE) .key( NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce_value.into()), ) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; let Some(mut item) = get_response.item else { return Ok(None); }; let nonce = parse_string_attribute( NONCE_TABLE_PARTITION_KEY, item.remove(&NONCE_TABLE_PARTITION_KEY.to_string()), )?; let created = parse_date_time_attribute( NONCE_TABLE_CREATED_ATTRIBUTE, item.remove(&NONCE_TABLE_CREATED_ATTRIBUTE.to_string()), )?; let expiration_time = parse_date_time_attribute( NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE, item.remove(&NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE.to_string()), )?; Ok(Some(NonceData { nonce, created, expiration_time, })) } pub async fn remove_nonce_from_nonces_table( &self, nonce: impl Into, ) -> Result<(), Error> { self .client .delete_item() .table_name(NONCE_TABLE) .key(NONCE_TABLE_PARTITION_KEY, AttributeValue::S(nonce.into())) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; Ok(()) } pub async fn add_usernames_to_reserved_usernames_table( &self, usernames: Vec, ) -> Result<(), Error> { // A single call to BatchWriteItem can consist of up to 25 operations for usernames_chunk in usernames.chunks(25) { let write_requests = usernames_chunk .iter() .map(|username| { let put_request = PutRequest::builder() .item( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.to_string()), ) .build(); WriteRequest::builder().put_request(put_request).build() }) .collect(); self .client .batch_write_item() .request_items(RESERVED_USERNAMES_TABLE, write_requests) .send() .await .map_err(|e| Error::AwsSdk(e.into()))?; } info!("Batch write item to reserved usernames table succeeded"); Ok(()) } pub async fn delete_username_from_reserved_usernames_table( &self, username: String, ) -> Result { debug!( "Attempting to delete username {} from reserved usernames table", username ); match self .client .delete_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY, AttributeValue::S(username.clone()), ) .send() .await { Ok(out) => { info!( "Username {} has been deleted from reserved usernames table", username ); Ok(out) } Err(e) => { error!("DynamoDB client failed to delete username {} from reserved usernames table", username); Err(Error::AwsSdk(e.into())) } } } pub async fn username_in_reserved_usernames_table( &self, username: &str, ) -> Result { match self .client .get_item() .table_name(RESERVED_USERNAMES_TABLE) .key( RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(), AttributeValue::S(username.to_string()), ) .consistent_read(true) .send() .await { Ok(GetItemOutput { item: Some(_), .. }) => Ok(true), Ok(_) => Ok(false), Err(e) => Err(Error::AwsSdk(e.into())), } } } type AttributeName = String; fn create_simple_primary_key( partition_key: (AttributeName, String), ) -> HashMap { HashMap::from([(partition_key.0, AttributeValue::S(partition_key.1))]) } fn create_composite_primary_key( partition_key: (AttributeName, String), sort_key: (AttributeName, String), ) -> HashMap { let mut primary_key = create_simple_primary_key(partition_key); primary_key.insert(sort_key.0, AttributeValue::S(sort_key.1)); primary_key } fn parse_date_time_attribute( attribute_name: &str, attribute: Option, ) -> Result, DBItemError> { if let Some(AttributeValue::S(created)) = &attribute { created.parse().map_err(|e| { DBItemError::new( attribute_name.to_string(), attribute, DBItemAttributeError::InvalidTimestamp(e), ) }) } else { Err(DBItemError::new( attribute_name.to_string(), attribute, DBItemAttributeError::Missing, )) } } fn parse_auth_type_attribute( attribute: Option, ) -> Result { if let Some(AttributeValue::S(auth_type)) = &attribute { match auth_type.as_str() { "password" => Ok(AuthType::Password), "wallet" => Ok(AuthType::Wallet), _ => Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::IncorrectType, )), } } else { Err(DBItemError::new( ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::Missing, )) } } fn parse_valid_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::Bool(valid)) => Ok(valid), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::Missing, )), } } fn parse_token_attribute( attribute: Option, ) -> Result { match attribute { Some(AttributeValue::S(token)) => Ok(token), Some(_) => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::Missing, )), } } fn parse_registration_data_attribute( attribute: Option, ) -> Result, DBItemError> { match attribute { Some(AttributeValue::B(server_registration_bytes)) => { Ok(server_registration_bytes.into_inner()) } Some(_) => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), attribute, DBItemAttributeError::Missing, )), } } #[allow(dead_code)] fn parse_map_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result, DBItemError> { match attribute_value { Some(AttributeValue::M(map)) => Ok(map), Some(_) => Err(DBItemError::new( attribute_name.to_string(), attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.to_string(), attribute_value, DBItemAttributeError::Missing, )), } } fn parse_string_attribute( attribute_name: &'static str, attribute_value: Option, ) -> Result { match attribute_value { Some(AttributeValue::S(value)) => Ok(value), Some(_) => Err(DBItemError::new( attribute_name.to_string(), attribute_value, DBItemAttributeError::IncorrectType, )), None => Err(DBItemError::new( attribute_name.to_string(), attribute_value, DBItemAttributeError::Missing, )), } } fn create_device_info( flattened_device_key_upload: FlattenedDeviceKeyUpload, social_proof: Option, ) -> HashMap { let mut device_info = HashMap::from([ ( USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.device_type.to_string()), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload), ), ( USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.key_payload_signature), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey), ), ( USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME .to_string(), AttributeValue::S(flattened_device_key_upload.content_prekey_signature), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey), ), ( USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME.to_string(), AttributeValue::S(flattened_device_key_upload.notif_prekey_signature), ), ]); if let Some(social_proof) = social_proof { device_info.insert( USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME.to_string(), AttributeValue::S(social_proof), ); } device_info } #[cfg(test)] mod tests { use super::*; #[test] fn test_create_simple_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let mut primary_key = create_simple_primary_key(partition_key); assert_eq!(primary_key.len(), 1); let attribute = primary_key.remove(&partition_key_name); assert!(attribute.is_some()); assert_eq!(attribute, Some(AttributeValue::S(partition_key_value))); } #[test] fn test_create_composite_primary_key() { let partition_key_name = "userID".to_string(); let partition_key_value = "12345".to_string(); let partition_key = (partition_key_name.clone(), partition_key_value.clone()); let sort_key_name = "deviceID".to_string(); let sort_key_value = "54321".to_string(); let sort_key = (sort_key_name.clone(), sort_key_value.clone()); let mut primary_key = create_composite_primary_key(partition_key, sort_key); assert_eq!(primary_key.len(), 2); let partition_key_attribute = primary_key.remove(&partition_key_name); assert!(partition_key_attribute.is_some()); assert_eq!( partition_key_attribute, Some(AttributeValue::S(partition_key_value)) ); let sort_key_attribute = primary_key.remove(&sort_key_name); assert!(sort_key_attribute.is_some()); assert_eq!(sort_key_attribute, Some(AttributeValue::S(sort_key_value))) } #[test] fn validate_keys() { // Taken from test user let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; let serialized_payload = KeyPayload::from_str(&example_payload).unwrap(); assert_eq!( serialized_payload .notification_identity_public_keys .curve25519, "DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo" ); } } diff --git a/services/identity/src/ddb_utils.rs b/services/identity/src/ddb_utils.rs index 1fd4e623b..dbd50a537 100644 --- a/services/identity/src/ddb_utils.rs +++ b/services/identity/src/ddb_utils.rs @@ -1,58 +1,58 @@ use aws_sdk_dynamodb::model::{AttributeValue, PutRequest, WriteRequest}; use std::collections::HashMap; use std::iter::IntoIterator; #[derive(Copy, Clone, Debug)] pub enum OlmAccountType { Content, Notification, } // Prefix the one time keys with the olm account variant. This allows for a single // DDB table to contain both notification and content keys for a device. -fn create_one_time_key_partition_key( +pub fn create_one_time_key_partition_key( device_id: &str, account_type: OlmAccountType, ) -> String { match account_type { OlmAccountType::Content => format!("content_{device_id}"), OlmAccountType::Notification => format!("notification_{device_id}"), } } fn create_one_time_key_put_request( device_id: &str, one_time_key: String, account_type: OlmAccountType, ) -> WriteRequest { use crate::constants::one_time_keys_table::*; let partition_key = create_one_time_key_partition_key(device_id, account_type); let builder = PutRequest::builder(); let attrs = HashMap::from([ (PARTITION_KEY.to_string(), AttributeValue::S(partition_key)), (SORT_KEY.to_string(), AttributeValue::S(one_time_key)), ]); let put_request = builder.set_item(Some(attrs)).build(); WriteRequest::builder().put_request(put_request).build() } pub fn into_one_time_put_requests( device_id: &str, one_time_keys: T, account_type: OlmAccountType, ) -> Vec where T: IntoIterator, ::Item: ToString, { one_time_keys .into_iter() .map(|otk| { create_one_time_key_put_request(device_id, otk.to_string(), account_type) }) .collect() } diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs index 30fa3107c..c46974c9c 100644 --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,55 +1,143 @@ use aws_sdk_dynamodb::{model::AttributeValue, Error as DynamoDBError}; +use std::collections::hash_map::HashMap; use std::fmt::{Display, Formatter, Result as FmtResult}; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(...)] MissingItem, } #[derive(Debug, derive_more::Error, derive_more::Constructor)] pub struct DBItemError { pub attribute_name: String, pub attribute_value: Option, pub attribute_error: DBItemAttributeError, } impl Display for DBItemError { fn fmt(&self, f: &mut Formatter) -> FmtResult { match &self.attribute_error { DBItemAttributeError::Missing => { write!(f, "Attribute {} is missing", self.attribute_name) } DBItemAttributeError::IncorrectType => write!( f, "Value for attribute {} has incorrect type: {:?}", self.attribute_name, self.attribute_value ), error => write!( f, "Error regarding attribute {} with value {:?}: {}", self.attribute_name, self.attribute_value, error ), } } } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DBItemAttributeError { #[display(...)] Missing, #[display(...)] IncorrectType, #[display(...)] InvalidTimestamp(chrono::ParseError), #[display(...)] ExpiredTimestamp, #[display(...)] InvalidValue, } + +pub trait FromAttributeValue { + fn to_vec( + &self, + attr_name: &str, + ) -> Result<&Vec, DBItemError>; + fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError>; + fn to_hashmap( + &self, + attr_name: &str, + ) -> Result<&HashMap, DBItemError>; +} + +fn handle_attr_failure(value: &AttributeValue, attr_name: &str) -> DBItemError { + DBItemError { + attribute_name: attr_name.to_string(), + attribute_value: Some(value.clone()), + attribute_error: DBItemAttributeError::IncorrectType, + } +} + +impl FromAttributeValue for AttributeValue { + fn to_vec( + &self, + attr_name: &str, + ) -> Result<&Vec, DBItemError> { + self.as_l().map_err(|e| handle_attr_failure(e, attr_name)) + } + + fn to_string(&self, attr_name: &str) -> Result<&String, DBItemError> { + self.as_s().map_err(|e| handle_attr_failure(e, attr_name)) + } + + fn to_hashmap( + &self, + attr_name: &str, + ) -> Result<&HashMap, DBItemError> { + self.as_m().map_err(|e| handle_attr_failure(e, attr_name)) + } +} + +pub trait AttributeValueFromHashMap { + fn get_string(&self, key: &str) -> Result<&String, DBItemError>; + fn get_map( + &self, + key: &str, + ) -> Result<&HashMap, DBItemError>; + fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError>; +} + +impl AttributeValueFromHashMap for HashMap { + fn get_string(&self, key: &str) -> Result<&String, DBItemError> { + self + .get(key) + .ok_or(DBItemError { + attribute_name: key.to_string(), + attribute_value: None, + attribute_error: DBItemAttributeError::Missing, + })? + .to_string(key) + } + + fn get_map( + &self, + key: &str, + ) -> Result<&HashMap, DBItemError> { + self + .get(key) + .ok_or(DBItemError { + attribute_name: key.to_string(), + attribute_value: None, + attribute_error: DBItemAttributeError::Missing, + })? + .to_hashmap(key) + } + + fn get_vec(&self, key: &str) -> Result<&Vec, DBItemError> { + self + .get(key) + .ok_or(DBItemError { + attribute_name: key.to_string(), + attribute_value: None, + attribute_error: DBItemAttributeError::Missing, + })? + .to_vec(key) + } +} diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index bbb9277f2..c504b9334 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,123 +1,152 @@ use crate::{client_service::handle_db_error, database::DatabaseClient}; use tonic::{Request, Response, Status}; // This must be named client, because generated code from the authenticated // protobuf file references message structs from the client protobuf file // with the client:: namespace pub mod client { tonic::include_proto!("identity.client"); } pub mod auth_proto { tonic::include_proto!("identity.authenticated"); } use auth_proto::{ identity_client_service_server::IdentityClientService, KeyserverKeysResponse, - OutboundKeysForUserRequest, RefreshUserPreKeysRequest, + OutboundKeyInfo, OutboundKeysForUserRequest, RefreshUserPreKeysRequest, }; -use client::Empty; +use client::{Empty, IdentityKeyInfo}; use tracing::debug; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, } fn get_value(req: &Request, key: &str) -> Option { let raw_value = req.metadata().get(key)?; raw_value.to_str().ok().map(|s| s.to_string()) } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, "user_id")?; let device_id = get_value(req, "device_id")?; let access_token = get_value(req, "access_token")?; Some((user_id, device_id, access_token)) } pub fn auth_intercept( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { println!("Intercepting request: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, "user_id") .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, "device_id") .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { async fn refresh_user_pre_keys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_pre_keys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_pre_keys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .set_prekey( user_id, device_id, content_keys.pre_key, content_keys.pre_key_signature, notif_keys.pre_key, notif_keys.pre_key_signature, ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } async fn get_keyserver_keys( &self, - _request: Request, + request: Request, ) -> Result, Status> { - unimplemented!(); + let message = request.into_inner(); + + let inner_response = self + .db_client + .get_keyserver_keys_for_user(&message.user_id) + .await + .map_err(handle_db_error)? + .map(|db_keys| OutboundKeyInfo { + identity_info: Some(IdentityKeyInfo { + payload: db_keys.key_payload, + payload_signature: db_keys.key_payload_signature, + social_proof: db_keys.social_proof, + }), + content_prekey: Some(client::PreKey { + pre_key: db_keys.content_prekey.prekey, + pre_key_signature: db_keys.content_prekey.prekey_signature, + }), + notif_prekey: Some(client::PreKey { + pre_key: db_keys.notif_prekey.prekey, + pre_key_signature: db_keys.notif_prekey.prekey_signature, + }), + onetime_content_prekey: db_keys.content_one_time_key, + onetime_notif_prekey: db_keys.notif_one_time_key, + }); + + let response = Response::new(KeyserverKeysResponse { + keyserver_info: inner_response, + }); + + return Ok(response); } }