Changeset View
Changeset View
Standalone View
Standalone View
services/identity/src/database.rs
use constant_time_eq::constant_time_eq; | use constant_time_eq::constant_time_eq; | ||||
use std::collections::{HashMap, HashSet}; | use std::collections::{HashMap, HashSet}; | ||||
use std::fmt::{Display, Formatter, Result as FmtResult}; | use std::fmt::{Display, Formatter, Result as FmtResult}; | ||||
use std::str::FromStr; | use std::str::FromStr; | ||||
use std::sync::Arc; | use std::sync::Arc; | ||||
use crate::ddb_utils::{into_onetime_put_requests, OlmAccountType}; | use crate::ddb_utils::{ | ||||
create_onetime_key_partition_key, into_onetime_put_requests, OlmAccountType, | |||||
}; | |||||
use crate::error::{DBItemAttributeError, DBItemError, Error}; | use crate::error::{DBItemAttributeError, DBItemError, Error}; | ||||
use aws_config::SdkConfig; | use aws_config::SdkConfig; | ||||
use aws_sdk_dynamodb::model::{AttributeValue, PutRequest, WriteRequest}; | use aws_sdk_dynamodb::model::{AttributeValue, PutRequest, WriteRequest}; | ||||
use aws_sdk_dynamodb::output::{ | use aws_sdk_dynamodb::output::{ | ||||
DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, | DeleteItemOutput, GetItemOutput, PutItemOutput, QueryOutput, | ||||
}; | }; | ||||
use aws_sdk_dynamodb::{types::Blob, Client}; | use aws_sdk_dynamodb::{types::Blob, Client}; | ||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
Show All 17 Lines | use crate::constants::{ | ||||
USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, | USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME, | ||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, | USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME, | ||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | ||||
USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, | USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME, | ||||
USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, | USERS_TABLE_PARTITION_KEY, USERS_TABLE_REGISTRATION_ATTRIBUTE, | ||||
USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, | USERS_TABLE_USERNAME_ATTRIBUTE, USERS_TABLE_USERNAME_INDEX, | ||||
USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, | USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE, USERS_TABLE_WALLET_ADDRESS_INDEX, | ||||
}; | }; | ||||
use crate::error::{AttributeValueFromHashMap, FromAttributeValue}; | |||||
use crate::id::generate_uuid; | use crate::id::generate_uuid; | ||||
use crate::nonce::NonceData; | use crate::nonce::NonceData; | ||||
use crate::token::{AccessTokenData, AuthType}; | use crate::token::{AccessTokenData, AuthType}; | ||||
#[derive(Serialize, Deserialize)] | #[derive(Serialize, Deserialize)] | ||||
pub struct OlmKeys { | pub struct OlmKeys { | ||||
pub curve25519: String, | pub curve25519: String, | ||||
pub ed25519: String, | pub ed25519: String, | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | fn fmt(&self, f: &mut Formatter) -> FmtResult { | ||||
match self { | match self { | ||||
Device::Keyserver => write!(f, "keyserver"), | Device::Keyserver => write!(f, "keyserver"), | ||||
Device::Native => write!(f, "native"), | Device::Native => write!(f, "native"), | ||||
Device::Web => write!(f, "web"), | Device::Web => write!(f, "web"), | ||||
} | } | ||||
} | } | ||||
} | } | ||||
// This is very similar to the protobuf definitions, however, | |||||
// coupling the protobuf schema to the database API should be avoided. | |||||
pub struct PreKey { | |||||
pub prekey: String, | |||||
pub prekey_signature: String, | |||||
} | |||||
pub struct OutboundKeys { | |||||
pub key_payload: String, | |||||
pub key_payload_signature: String, | |||||
pub social_proof: Option<String>, | |||||
pub content_prekey: PreKey, | |||||
pub notif_prekey: PreKey, | |||||
pub content_one_time_key: Option<String>, | |||||
pub notif_one_time_key: Option<String>, | |||||
} | |||||
#[derive(Clone)] | #[derive(Clone)] | ||||
pub struct DatabaseClient { | pub struct DatabaseClient { | ||||
client: Arc<Client>, | client: Arc<Client>, | ||||
} | } | ||||
impl DatabaseClient { | impl DatabaseClient { | ||||
pub fn new(aws_config: &SdkConfig) -> Self { | pub fn new(aws_config: &SdkConfig) -> Self { | ||||
let client = match &CONFIG.localstack_endpoint { | let client = match &CONFIG.localstack_endpoint { | ||||
▲ Show 20 Lines • Show All 121 Lines • ▼ Show 20 Lines | self | ||||
.add_device_to_users_table( | .add_device_to_users_table( | ||||
user_id, | user_id, | ||||
flattened_device_key_upload, | flattened_device_key_upload, | ||||
Some(social_proof), | Some(social_proof), | ||||
) | ) | ||||
.await | .await | ||||
} | } | ||||
pub async fn get_keyserver_keys_for_user( | |||||
&self, | |||||
user_id: &str, | |||||
) -> Result<Option<OutboundKeys>, Error> { | |||||
// DynamoDB doesn't have a way to "pop" a value from a list, so we must | |||||
// first read in user info, then update one_time_keys with value we | |||||
// gave to requester | |||||
let user_info = self | |||||
.get_item_from_users_table(&user_id) | |||||
.await? | |||||
.item | |||||
.ok_or(Error::MissingItem)?; | |||||
let devices = user_info | |||||
.get(USERS_TABLE_DEVICES_ATTRIBUTE) | |||||
.ok_or(Error::MissingItem)? | |||||
.to_hashmap(USERS_TABLE_DEVICES_ATTRIBUTE)?; | |||||
let mut maybe_keyserver_id = None; | |||||
for (device_id, device_info) in devices { | |||||
let device_type = device_info | |||||
.to_hashmap("device_id")? | |||||
.get(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME) | |||||
.ok_or(Error::MissingItem)? | |||||
.to_string(USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME)?; | |||||
if device_type == "keyserver" { | |||||
maybe_keyserver_id = Some(device_id); | |||||
break; | |||||
} | |||||
} | |||||
// Assert that the user has a keyserver, if they don't return None | |||||
let keyserver_id = match maybe_keyserver_id { | |||||
None => return Ok(None), | |||||
Some(id) => id, | |||||
}; | |||||
let keyserver = devices.get_map(keyserver_id)?; | |||||
let notif_one_time_key: Option<String> = self | |||||
.mint_onetime_key(keyserver_id, OlmAccountType::Notification) | |||||
.await?; | |||||
let content_one_time_key: Option<String> = self | |||||
.mint_onetime_key(keyserver_id, OlmAccountType::Content) | |||||
.await?; | |||||
debug!( | |||||
"Able to get notif key for keyserver {}: {}", | |||||
keyserver_id, | |||||
notif_one_time_key.is_some() | |||||
); | |||||
debug!( | |||||
"Able to get content key for keyserver {}: {}", | |||||
keyserver_id, | |||||
content_one_time_key.is_some() | |||||
); | |||||
let content_prekey = keyserver | |||||
.get_string(USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_ATTRIBUTE_NAME)?; | |||||
let content_prekey_signature = keyserver.get_string( | |||||
USERS_TABLE_DEVICES_MAP_CONTENT_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | |||||
)?; | |||||
let notif_prekey = keyserver | |||||
.get_string(USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_ATTRIBUTE_NAME)?; | |||||
let notif_prekey_signature = keyserver.get_string( | |||||
USERS_TABLE_DEVICES_MAP_NOTIF_PREKEY_SIGNATURE_ATTRIBUTE_NAME, | |||||
)?; | |||||
let key_payload = keyserver | |||||
.get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_ATTRIBUTE_NAME)? | |||||
.to_string(); | |||||
let key_payload_signature = keyserver | |||||
.get_string(USERS_TABLE_DEVICES_MAP_KEY_PAYLOAD_SIGNATURE_ATTRIBUTE_NAME)? | |||||
.to_string(); | |||||
let social_proof = keyserver | |||||
.get(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) | |||||
.map(|s| { | |||||
s.to_string(USERS_TABLE_DEVICES_MAP_SOCIAL_PROOF_ATTRIBUTE_NAME) | |||||
.ok() | |||||
}) | |||||
.flatten() | |||||
.map(|s| s.to_owned()); | |||||
let full_content_prekey = PreKey { | |||||
prekey: content_prekey.to_string(), | |||||
prekey_signature: content_prekey_signature.to_string(), | |||||
}; | |||||
let full_notif_prekey = PreKey { | |||||
prekey: notif_prekey.to_string(), | |||||
prekey_signature: notif_prekey_signature.to_string(), | |||||
}; | |||||
let outbound_payload = OutboundKeys { | |||||
key_payload, | |||||
key_payload_signature, | |||||
social_proof, | |||||
content_prekey: full_content_prekey, | |||||
notif_prekey: full_notif_prekey, | |||||
content_one_time_key, | |||||
notif_one_time_key, | |||||
}; | |||||
return Ok(Some(outbound_payload)); | |||||
} | |||||
/// Will "mint" a single onetime key by attempting to successfully deleting | |||||
/// a key | |||||
pub async fn mint_onetime_key( | |||||
&self, | |||||
device_id: &str, | |||||
account_type: OlmAccountType, | |||||
) -> Result<Option<String>, Error> { | |||||
use crate::constants::one_time_keys_table as otk_table; | |||||
let query_result = self.get_onetime_keys(device_id, account_type).await?; | |||||
let items = match query_result.items() { | |||||
Some(x) => x, | |||||
None => { | |||||
debug!("Unable to find {:?} onetime-key", account_type); | |||||
return Ok(None); | |||||
} | |||||
}; | |||||
// Attempt to delete the onetime keys individually, a successful delete | |||||
// mints the onetime key to the requester | |||||
for item in items { | |||||
let pk = item.get_string(otk_table::PARTITION_KEY)?; | |||||
let otk = item.get_string(otk_table::SORT_KEY)?; | |||||
let composite_key = HashMap::from([ | |||||
( | |||||
otk_table::PARTITION_KEY.to_string(), | |||||
AttributeValue::S(pk.to_string()), | |||||
), | |||||
( | |||||
otk_table::SORT_KEY.to_string(), | |||||
AttributeValue::S(otk.to_string()), | |||||
), | |||||
]); | |||||
debug!("Attempting to delete a {:?} onetime-key", account_type); | |||||
match self | |||||
.client | |||||
.delete_item() | |||||
.set_key(Some(composite_key)) | |||||
.table_name(otk_table::NAME) | |||||
.condition_expression("attribute_exists(sort_key)") | |||||
.send() | |||||
.await | |||||
{ | |||||
Ok(_) => return Ok(Some(otk.to_string())), | |||||
// This err should only happen if a delete occurred between the read | |||||
// above and this delete | |||||
Err(e) => { | |||||
debug!("Unable to delete key: {:?}", e); | |||||
continue; | |||||
} | |||||
} | |||||
} | |||||
// All potential keys exhausted | |||||
Ok(None) | |||||
} | |||||
pub async fn get_onetime_keys( | |||||
&self, | |||||
device_id: &str, | |||||
account_type: OlmAccountType, | |||||
) -> Result<QueryOutput, Error> { | |||||
use crate::constants::one_time_keys_table::*; | |||||
// Add related prefix to partition key to grab the correct result set | |||||
let partition_key = | |||||
create_onetime_key_partition_key(device_id, account_type); | |||||
self | |||||
.client | |||||
.query() | |||||
.table_name(NAME) | |||||
.key_condition_expression(format!("{} = :pk", PARTITION_KEY)) | |||||
.expression_attribute_values(":pk", AttributeValue::S(partition_key)) | |||||
.send() | |||||
.await | |||||
.map_err(|e| Error::AwsSdk(e.into())) | |||||
bartek: If we decide to go this way and defer this, can we at least add these lines for easier… | |||||
jonAuthorUnsubmitted Done Inline ActionsSeems reasonable to me jon: Seems reasonable to me | |||||
} | |||||
pub async fn set_prekey( | pub async fn set_prekey( | ||||
&self, | &self, | ||||
user_id: String, | user_id: String, | ||||
device_id: String, | device_id: String, | ||||
content_prekey: String, | content_prekey: String, | ||||
content_prekey_signature: String, | content_prekey_signature: String, | ||||
notif_prekey: String, | notif_prekey: String, | ||||
notif_prekey_signature: String, | notif_prekey_signature: String, | ||||
▲ Show 20 Lines • Show All 885 Lines • Show Last 20 Lines |
If we decide to go this way and defer this, can we at least add these lines for easier monitoring?