Changeset View
Changeset View
Standalone View
Standalone View
services/identity/src/database.rs
use std::collections::HashMap; | use std::collections::HashMap; | ||||
use std::fmt::{Display, Formatter, Result as FmtResult}; | use std::fmt::{Display, Formatter, Result as FmtResult}; | ||||
use std::str::FromStr; | use std::str::FromStr; | ||||
use std::sync::Arc; | use std::sync::Arc; | ||||
use aws_config::SdkConfig; | use aws_config::SdkConfig; | ||||
use aws_sdk_dynamodb::model::AttributeValue; | use aws_sdk_dynamodb::model::AttributeValue; | ||||
use aws_sdk_dynamodb::output::{ | use aws_sdk_dynamodb::output::{ | ||||
DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, UpdateItemOutput, | DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, | ||||
}; | }; | ||||
use aws_sdk_dynamodb::types::Blob; | use aws_sdk_dynamodb::types::Blob; | ||||
use aws_sdk_dynamodb::{Client, Error as DynamoDBError}; | use aws_sdk_dynamodb::{Client, Error as DynamoDBError}; | ||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
use opaque_ke::{errors::ProtocolError, ServerRegistration}; | use opaque_ke::{errors::ProtocolError, ServerRegistration}; | ||||
use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||
use tracing::{debug, error, info, warn}; | use tracing::{debug, error, info, warn}; | ||||
use crate::client_service::UserRegistrationInfo; | |||||
use crate::config::CONFIG; | use crate::config::CONFIG; | ||||
use crate::constants::{ | use crate::constants::{ | ||||
ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, | ACCESS_TOKEN_SORT_KEY, ACCESS_TOKEN_TABLE, | ||||
ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, | ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE, ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE, | ||||
ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, | ACCESS_TOKEN_TABLE_PARTITION_KEY, ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE, | ||||
ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, | ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE, NONCE_TABLE, | ||||
NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, USERS_TABLE, | NONCE_TABLE_CREATED_ATTRIBUTE, NONCE_TABLE_PARTITION_KEY, USERS_TABLE, | ||||
USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, | USERS_TABLE_DEVICES_ATTRIBUTE, | ||||
USERS_TABLE_DEVICE_ATTRIBUTE_NAME, USERS_TABLE_INITIALIZATION_INFO, | USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME, | ||||
USERS_TABLE_DEVICES_MAP_IDENTITY_ONETIME_KEYS_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_IDENTITY_PREKEY_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_IDENTITY_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_ONETIME_KEYS_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | |||||
USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, | USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, | ||||
USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, | USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, | ||||
USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, | USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, | ||||
}; | }; | ||||
use crate::id::generate_uuid; | |||||
use crate::nonce::NonceData; | use crate::nonce::NonceData; | ||||
use crate::token::{AccessTokenData, AuthType}; | use crate::token::{AccessTokenData, AuthType}; | ||||
use comm_opaque::Cipher; | use comm_opaque::Cipher; | ||||
#[derive(Serialize, Deserialize)] | #[derive(Serialize, Deserialize)] | ||||
pub struct OlmKeys { | pub struct OlmKeys { | ||||
pub curve25519: String, | pub curve25519: String, | ||||
pub ed25519: String, | pub ed25519: String, | ||||
Show All 11 Lines | impl FromStr for KeyPayload { | ||||
// The payload is held in the database as an escaped JSON payload. | // The payload is held in the database as an escaped JSON payload. | ||||
// Escaped double quotes need to be trimmed before attempting to serialize | // Escaped double quotes need to be trimmed before attempting to serialize | ||||
fn from_str(payload: &str) -> Result<KeyPayload, Self::Err> { | fn from_str(payload: &str) -> Result<KeyPayload, Self::Err> { | ||||
serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) | serde_json::from_str(&payload.replace(r#"\""#, r#"""#)) | ||||
} | } | ||||
} | } | ||||
pub enum Device { | |||||
Client, | |||||
Keyserver, | |||||
} | |||||
impl Display for Device { | |||||
fn fmt(&self, f: &mut Formatter) -> FmtResult { | |||||
match self { | |||||
Device::Client => write!(f, "client"), | |||||
Device::Keyserver => write!(f, "keyserver"), | |||||
} | |||||
} | |||||
} | |||||
#[derive(Clone)] | #[derive(Clone)] | ||||
pub struct DatabaseClient { | pub struct DatabaseClient { | ||||
client: Arc<Client>, | client: Arc<Client>, | ||||
} | } | ||||
impl DatabaseClient { | impl DatabaseClient { | ||||
pub fn new(aws_config: &SdkConfig) -> Self { | pub fn new(aws_config: &SdkConfig) -> Self { | ||||
let client = match &CONFIG.localstack_endpoint { | let client = match &CONFIG.localstack_endpoint { | ||||
Show All 40 Lines | match self.get_item_from_users_table(&user_id).await { | ||||
"DynamoDB client failed to get registration data for user {}: {}", | "DynamoDB client failed to get registration data for user {}: {}", | ||||
user_id, e | user_id, e | ||||
); | ); | ||||
Err(e) | Err(e) | ||||
} | } | ||||
} | } | ||||
} | } | ||||
pub async fn get_session_initialization_info( | |||||
&self, | |||||
user_id: &str, | |||||
) -> Result<Option<HashMap<String, HashMap<String, String>>>, Error> { | |||||
match self.get_item_from_users_table(user_id).await { | |||||
Ok(GetItemOutput { | |||||
item: Some(mut item), | |||||
.. | |||||
}) => parse_devices_attribute(item.remove(USERS_TABLE_DEVICES_ATTRIBUTE)) | |||||
.map(Some) | |||||
.map_err(Error::Attribute), | |||||
Ok(_) => { | |||||
info!("No item found for user {} in users table", user_id); | |||||
Ok(None) | |||||
} | |||||
Err(e) => { | |||||
error!( | |||||
"DynamoDB client failed to get session initialization info for user {}: {}", | |||||
user_id, e | |||||
); | |||||
Err(e) | |||||
} | |||||
} | |||||
} | |||||
pub async fn update_users_table( | |||||
&self, | |||||
user_id: String, | |||||
signing_public_key: Option<String>, | |||||
registration: Option<ServerRegistration<Cipher>>, | |||||
username: Option<String>, | |||||
session_initialization_info: Option<&HashMap<String, String>>, | |||||
) -> Result<UpdateItemOutput, Error> { | |||||
let mut update_expression_parts = Vec::new(); | |||||
let mut expression_attribute_names = HashMap::new(); | |||||
let mut expression_attribute_values = HashMap::new(); | |||||
if let Some(reg) = registration { | |||||
update_expression_parts | |||||
.push(format!("{} = :r", USERS_TABLE_REGISTRATION_ATTRIBUTE)); | |||||
expression_attribute_values.insert( | |||||
":r".to_string(), | |||||
AttributeValue::B(Blob::new(reg.serialize())), | |||||
); | |||||
}; | |||||
if let Some(username) = username { | |||||
update_expression_parts | |||||
.push(format!("{} = :u", USERS_TABLE_USERNAME_ATTRIBUTE)); | |||||
expression_attribute_values | |||||
.insert(":u".to_string(), AttributeValue::S(username)); | |||||
}; | |||||
if let Some(public_key) = signing_public_key { | |||||
let device_info = match session_initialization_info { | |||||
Some(info) => info | |||||
.iter() | |||||
.map(|(k, v)| (k.to_string(), AttributeValue::S(v.to_string()))) | |||||
.collect(), | |||||
None => HashMap::new(), | |||||
}; | |||||
// How we construct the update expression will depend on whether the user | |||||
// already exists or not | |||||
if let GetItemOutput { item: Some(_), .. } = | |||||
self.get_item_from_users_table(&user_id).await? | |||||
{ | |||||
update_expression_parts.push(format!( | |||||
"{}.#{} = :k", | |||||
USERS_TABLE_DEVICES_ATTRIBUTE, USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME, | |||||
)); | |||||
expression_attribute_names.insert( | |||||
format!("#{}", USERS_TABLE_DEVICES_MAP_ATTRIBUTE_NAME), | |||||
public_key, | |||||
); | |||||
expression_attribute_values | |||||
.insert(":k".to_string(), AttributeValue::M(device_info)); | |||||
} else { | |||||
update_expression_parts | |||||
.push(format!("{} = :k", USERS_TABLE_DEVICES_ATTRIBUTE)); | |||||
let mut devices = HashMap::new(); | |||||
devices.insert(public_key, AttributeValue::M(device_info)); | |||||
expression_attribute_values | |||||
.insert(":k".to_string(), AttributeValue::M(devices)); | |||||
}; | |||||
}; | |||||
self | |||||
.client | |||||
.update_item() | |||||
.table_name(USERS_TABLE) | |||||
.key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) | |||||
.update_expression(format!("SET {}", update_expression_parts.join(","))) | |||||
.set_expression_attribute_names( | |||||
if expression_attribute_names.is_empty() { | |||||
None | |||||
} else { | |||||
Some(expression_attribute_names) | |||||
}, | |||||
) | |||||
.set_expression_attribute_values( | |||||
if expression_attribute_values.is_empty() { | |||||
None | |||||
} else { | |||||
Some(expression_attribute_values) | |||||
}, | |||||
) | |||||
.send() | |||||
.await | |||||
.map_err(|e| Error::AwsSdk(e.into())) | |||||
} | |||||
pub async fn add_user_to_users_table( | pub async fn add_user_to_users_table( | ||||
&self, | &self, | ||||
user_id: String, | registration_state: UserRegistrationInfo, | ||||
registration: ServerRegistration<Cipher>, | password_file: Vec<u8>, | ||||
username: String, | ) -> Result<String, Error> { | ||||
signing_public_key: String, | let user_id = generate_uuid(); | ||||
session_initialization_info: &HashMap<String, String>, | let device_info = HashMap::from([ | ||||
) -> Result<PutItemOutput, Error> { | ( | ||||
let device_info: HashMap<String, AttributeValue> = | USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME.to_string(), | ||||
session_initialization_info | AttributeValue::S(Device::Client.to_string()), | ||||
.iter() | ), | ||||
.map(|(k, v)| (k.to_string(), AttributeValue::S(v.to_string()))) | ( | ||||
.collect(); | USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME.to_string(), | ||||
AttributeValue::S(registration_state.key_payload), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME | |||||
.to_string(), | |||||
AttributeValue::S(registration_state.key_payload_signature), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_IDENTITY_PREKEY_ATTRIBUTE_NAME.to_string(), | |||||
AttributeValue::S(registration_state.identity_prekey), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_IDENTITY_PREKEY_SIGNATURE_ATTRIBUTE_NAME | |||||
.to_string(), | |||||
AttributeValue::S(registration_state.identity_prekey_signature), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_IDENTITY_ONETIME_KEYS_ATTRIBUTE_NAME | |||||
.to_string(), | |||||
AttributeValue::L( | |||||
registration_state | |||||
.identity_onetime_keys | |||||
.into_iter() | |||||
.map(AttributeValue::S) | |||||
.collect(), | |||||
), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME.to_string(), | |||||
AttributeValue::S(registration_state.notif_prekey), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME | |||||
.to_string(), | |||||
AttributeValue::S(registration_state.notif_prekey_signature), | |||||
), | |||||
( | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_ONETIME_KEYS_ATTRIBUTE_NAME.to_string(), | |||||
AttributeValue::L( | |||||
registration_state | |||||
.notif_onetime_keys | |||||
.into_iter() | |||||
.map(AttributeValue::S) | |||||
.collect(), | |||||
), | |||||
), | |||||
]); | |||||
let devices = HashMap::from([( | |||||
registration_state.device_id_key, | |||||
AttributeValue::M(device_info), | |||||
)]); | |||||
let item = HashMap::from([ | let user = HashMap::from([ | ||||
( | ( | ||||
USERS_TABLE_PARTITION_KEY.to_string(), | USERS_TABLE_PARTITION_KEY.to_string(), | ||||
AttributeValue::S(user_id), | AttributeValue::S(user_id.clone()), | ||||
), | ), | ||||
( | ( | ||||
USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), | USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), | ||||
AttributeValue::S(username), | AttributeValue::S(registration_state.username), | ||||
), | ), | ||||
( | ( | ||||
USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), | USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), | ||||
AttributeValue::B(Blob::new(registration.serialize())), | AttributeValue::M(devices), | ||||
), | ), | ||||
( | ( | ||||
USERS_TABLE_DEVICES_ATTRIBUTE.to_string(), | USERS_TABLE_REGISTRATION_ATTRIBUTE.to_string(), | ||||
AttributeValue::M(HashMap::from([( | AttributeValue::B(Blob::new(password_file)), | ||||
signing_public_key, | |||||
AttributeValue::M(device_info), | |||||
)])), | |||||
), | ), | ||||
]); | ]); | ||||
self | self | ||||
.client | .client | ||||
.put_item() | .put_item() | ||||
.table_name(USERS_TABLE) | .table_name(USERS_TABLE) | ||||
.set_item(Some(item)) | .set_item(Some(user)) | ||||
.send() | .send() | ||||
.await | .await | ||||
.map_err(|e| Error::AwsSdk(e.into())) | .map_err(|e| Error::AwsSdk(e.into()))?; | ||||
} | |||||
Ok(user_id) | |||||
} | |||||
pub async fn delete_user( | pub async fn delete_user( | ||||
&self, | &self, | ||||
user_id: String, | user_id: String, | ||||
) -> Result<DeleteItemOutput, Error> { | ) -> Result<DeleteItemOutput, Error> { | ||||
debug!("Attempting to delete user: {}", user_id); | debug!("Attempting to delete user: {}", user_id); | ||||
match self | match self | ||||
.client | .client | ||||
▲ Show 20 Lines • Show All 432 Lines • ▼ Show 20 Lines | match &attribute { | ||||
None => Err(DBItemError::new( | None => Err(DBItemError::new( | ||||
USERS_TABLE_REGISTRATION_ATTRIBUTE, | USERS_TABLE_REGISTRATION_ATTRIBUTE, | ||||
attribute, | attribute, | ||||
DBItemAttributeError::Missing, | DBItemAttributeError::Missing, | ||||
)), | )), | ||||
} | } | ||||
} | } | ||||
fn parse_devices_attribute( | |||||
attribute: Option<AttributeValue>, | |||||
) -> Result<HashMap<String, HashMap<String, String>>, DBItemError> { | |||||
let mut devices = HashMap::new(); | |||||
let ddb_devices = | |||||
parse_map_attribute(USERS_TABLE_DEVICES_ATTRIBUTE, attribute)?; | |||||
for (signing_public_key, session_initialization_info) in ddb_devices { | |||||
let session_initialization_info_map = parse_map_attribute( | |||||
USERS_TABLE_DEVICE_ATTRIBUTE_NAME, | |||||
Some(session_initialization_info), | |||||
)?; | |||||
let mut inner_hash_map = HashMap::new(); | |||||
for (initialization_component_name, initialization_component_value) in | |||||
session_initialization_info_map | |||||
{ | |||||
let initialization_piece_value_string = parse_string_attribute( | |||||
USERS_TABLE_INITIALIZATION_INFO, | |||||
Some(initialization_component_value), | |||||
)?; | |||||
inner_hash_map.insert( | |||||
initialization_component_name, | |||||
initialization_piece_value_string, | |||||
); | |||||
} | |||||
devices.insert(signing_public_key, inner_hash_map); | |||||
} | |||||
Ok(devices) | |||||
} | |||||
fn parse_map_attribute( | fn parse_map_attribute( | ||||
attribute_name: &'static str, | attribute_name: &'static str, | ||||
attribute_value: Option<AttributeValue>, | attribute_value: Option<AttributeValue>, | ||||
) -> Result<HashMap<String, AttributeValue>, DBItemError> { | ) -> Result<HashMap<String, AttributeValue>, DBItemError> { | ||||
match attribute_value { | match attribute_value { | ||||
Some(AttributeValue::M(map)) => Ok(map), | Some(AttributeValue::M(map)) => Ok(map), | ||||
Some(_) => Err(DBItemError::new( | Some(_) => Err(DBItemError::new( | ||||
attribute_name, | attribute_name, | ||||
▲ Show 20 Lines • Show All 83 Lines • Show Last 20 Lines |