diff --git a/services/identity/src/database/one_time_keys.rs b/services/identity/src/database/one_time_keys.rs index d8347b8d6..24d758975 100644 --- a/services/identity/src/database/one_time_keys.rs +++ b/services/identity/src/database/one_time_keys.rs @@ -1,416 +1,466 @@ use std::collections::HashSet; use comm_lib::{ aws::{ - ddb::types::{AttributeValue, Delete, TransactWriteItem, Update}, + ddb::types::{ + AttributeValue, Delete, DeleteRequest, TransactWriteItem, Update, + WriteRequest, + }, DynamoDBError, }, database::{ + batch_operations::{batch_write, ExponentialBackoffConfig}, parse_int_attribute, AttributeExtractor, AttributeMap, DBItemAttributeError, DBItemError, }, }; use tracing::{debug, error, info}; use crate::{ constants::{MAX_ONE_TIME_KEYS, ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT}, database::DeviceIDAttribute, ddb_utils::{ create_one_time_key_partition_key, into_one_time_put_requests, into_one_time_update_and_delete_requests, is_transaction_retryable, OlmAccountType, }, error::{consume_error, Error}, olm::is_valid_olm_key, }; use super::DatabaseClient; impl DatabaseClient { /// Gets the next one-time key for the account and then, in a transaction, /// deletes the key and updates the key count /// /// Returns the retrieved one-time key if it exists and a boolean indicating /// whether the `spawn_refresh_keys_task`` was called pub(super) async fn get_one_time_key( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, can_request_more_keys: bool, ) -> Result<(Option, bool), Error> { use crate::constants::devices_table; use crate::constants::retry; use crate::constants::ONE_TIME_KEY_MINIMUM_THRESHOLD; let attr_otk_count = match account_type { OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, }; fn spawn_refresh_keys_task(device_id: &str) { // Clone the string slice to move into the async block let device_id = device_id.to_string(); tokio::spawn(async move { debug!("Attempting to request more keys for device: {}", &device_id); let result = crate::tunnelbroker::send_refresh_keys_request(&device_id).await; consume_error(result); }); } // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` // in `comm-lib` to handle transactions with retries let mut attempt = 0; // TODO: Introduce nanny task that handles calling `spawn_refresh_keys_task` let mut requested_more_keys = false; loop { attempt += 1; if attempt > retry::MAX_ATTEMPTS { return Err(Error::MaxRetriesExceeded); } let otk_count = self.get_otk_count(user_id, device_id, account_type).await?; if otk_count < ONE_TIME_KEY_MINIMUM_THRESHOLD && can_request_more_keys { spawn_refresh_keys_task(device_id); requested_more_keys = true; } if otk_count < 1 { return Ok((None, requested_more_keys)); } let Some(otk_row) = self - .get_one_time_keys(user_id, device_id, account_type, 1) + .get_one_time_keys(user_id, device_id, account_type, Some(1)) .await? .pop() else { return Err(Error::NotEnoughOneTimeKeys); }; let delete_otk_operation = otk_row.as_delete_request(); let update_otk_count = Update::builder() .table_name(devices_table::NAME) .key( devices_table::ATTR_USER_ID, AttributeValue::S(user_id.to_string()), ) .key( devices_table::ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into(), ) .update_expression(format!("ADD {} :decrement_val", attr_otk_count)) .expression_attribute_values( ":decrement_val", AttributeValue::N("-1".to_string()), ) .condition_expression(format!("{} = :old_val", attr_otk_count)) .expression_attribute_values( ":old_val", AttributeValue::N(otk_count.to_string()), ) .build(); let update_otk_count_operation = TransactWriteItem::builder() .update(update_otk_count) .build(); let transaction = self .client .transact_write_items() .set_transact_items(Some(vec![ delete_otk_operation, update_otk_count_operation, ])) .send() .await; match transaction { Ok(_) => return Ok((Some(otk_row.otk), requested_more_keys)), Err(e) => { let dynamo_db_error = DynamoDBError::from(e); let retryable_codes = HashSet::from([ retry::CONDITIONAL_CHECK_FAILED, retry::TRANSACTION_CONFLICT, ]); if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { info!("Encountered transaction conflict while retrieving one-time key - retrying"); } else { error!( "One-time key retrieval transaction failed: {:?}", dynamo_db_error ); return Err(Error::AwsSdk(dynamo_db_error)); } } } } } async fn get_one_time_keys( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, - num_keys: usize, + num_keys: Option, ) -> Result, Error> { use crate::constants::one_time_keys_table::*; - // DynamoDB will reject the `query` request if `limit < 1` - if num_keys < 1 { - return Ok(Vec::new()); - } - let partition_key = create_one_time_key_partition_key(user_id, device_id, account_type); - let otk_rows = self + let mut query = self .client .query() .table_name(NAME) .key_condition_expression("#pk = :pk") .expression_attribute_names("#pk", PARTITION_KEY) - .expression_attribute_values(":pk", AttributeValue::S(partition_key)) - .limit(num_keys as i32) + .expression_attribute_values(":pk", AttributeValue::S(partition_key)); + + if let Some(limit) = num_keys { + // DynamoDB will reject the `query` request if `limit < 1` + if limit < 1 { + return Ok(Vec::new()); + } + query = query.limit(limit as i32); + } + + let otk_rows = query .send() .await .map_err(|e| Error::AwsSdk(e.into()))? .items .unwrap_or_default() .into_iter() .map(OTKRow::try_from) .collect::, _>>() .map_err(Error::from)?; - if otk_rows.len() != num_keys { - error!("There are fewer one-time keys than the number requested"); - return Err(Error::NotEnoughOneTimeKeys); + if let Some(limit) = num_keys { + if otk_rows.len() != limit { + error!("There are fewer one-time keys than the number requested"); + return Err(Error::NotEnoughOneTimeKeys); + } } Ok(otk_rows) } pub async fn append_one_time_prekeys( &self, user_id: &str, device_id: &str, content_one_time_keys: &Vec, notif_one_time_keys: &Vec, ) -> Result<(), Error> { use crate::constants::retry; let num_content_keys_to_append = content_one_time_keys.len(); let num_notif_keys_to_append = notif_one_time_keys.len(); if num_content_keys_to_append > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT || num_notif_keys_to_append > ONE_TIME_KEY_UPLOAD_LIMIT_PER_ACCOUNT { return Err(Error::OneTimeKeyUploadLimitExceeded); } if content_one_time_keys .iter() .any(|otk| !is_valid_olm_key(otk)) || notif_one_time_keys.iter().any(|otk| !is_valid_olm_key(otk)) { debug!("Invalid one-time key format"); return Err(Error::InvalidFormat); } let current_time = chrono::Utc::now(); let content_otk_requests = into_one_time_put_requests( user_id, device_id, content_one_time_keys, OlmAccountType::Content, current_time, ); let notif_otk_requests = into_one_time_put_requests( user_id, device_id, notif_one_time_keys, OlmAccountType::Notification, current_time, ); let current_content_otk_count = self .get_otk_count(user_id, device_id, OlmAccountType::Content) .await?; let current_notif_otk_count = self .get_otk_count(user_id, device_id, OlmAccountType::Notification) .await?; let num_content_keys_to_delete = (num_content_keys_to_append + current_content_otk_count) .saturating_sub(MAX_ONE_TIME_KEYS); let num_notif_keys_to_delete = (num_notif_keys_to_append + current_notif_otk_count) .saturating_sub(MAX_ONE_TIME_KEYS); let content_keys_to_delete = self .get_one_time_keys( user_id, device_id, OlmAccountType::Content, - num_content_keys_to_delete, + Some(num_content_keys_to_delete), ) .await?; let notif_keys_to_delete = self .get_one_time_keys( user_id, device_id, OlmAccountType::Notification, - num_notif_keys_to_delete, + Some(num_notif_keys_to_delete), ) .await?; let update_and_delete_otk_count_operation = into_one_time_update_and_delete_requests( user_id, device_id, num_content_keys_to_append, num_notif_keys_to_append, content_keys_to_delete, notif_keys_to_delete, ); let mut operations = Vec::new(); operations.extend_from_slice(&content_otk_requests); operations.extend_from_slice(¬if_otk_requests); operations.extend_from_slice(&update_and_delete_otk_count_operation); // TODO: Introduce `transact_write_helper` similar to `batch_write_helper` // in `comm-lib` to handle transactions with retries let mut attempt = 0; loop { attempt += 1; if attempt > retry::MAX_ATTEMPTS { return Err(Error::MaxRetriesExceeded); } let transaction = self .client .transact_write_items() .set_transact_items(Some(operations.clone())) .send() .await; match transaction { Ok(_) => break, Err(e) => { let dynamo_db_error = DynamoDBError::from(e); let retryable_codes = HashSet::from([retry::TRANSACTION_CONFLICT]); if is_transaction_retryable(&dynamo_db_error, &retryable_codes) { info!("Encountered transaction conflict while uploading one-time keys - retrying"); } else { error!( "One-time key upload transaction failed: {:?}", dynamo_db_error ); return Err(Error::AwsSdk(dynamo_db_error)); } } } } Ok(()) } async fn get_otk_count( &self, user_id: &str, device_id: &str, account_type: OlmAccountType, ) -> Result { use crate::constants::devices_table; let attr_name = match account_type { OlmAccountType::Content => devices_table::ATTR_CONTENT_OTK_COUNT, OlmAccountType::Notification => devices_table::ATTR_NOTIF_OTK_COUNT, }; let response = self .client .get_item() .table_name(devices_table::NAME) .projection_expression(attr_name) .key( devices_table::ATTR_USER_ID, AttributeValue::S(user_id.to_string()), ) .key( devices_table::ATTR_ITEM_ID, DeviceIDAttribute(device_id.into()).into(), ) .send() .await .map_err(|e| { error!("Failed to get user's OTK count: {:?}", e); Error::AwsSdk(e.into()) })?; let mut user_item = response.item.unwrap_or_default(); match parse_int_attribute(attr_name, user_item.remove(attr_name)) { Ok(num) => Ok(num), Err(DBItemError { attribute_error: DBItemAttributeError::Missing, .. }) => Ok(0), Err(e) => Err(Error::Attribute(e)), } } + + /// Deletes all data for a user's device from one-time keys table + pub async fn delete_otks_table_rows_for_user_device( + &self, + user_id: &str, + device_id: &str, + ) -> Result<(), Error> { + use crate::constants::one_time_keys_table::*; + + let content_otk_primary_keys = self + .get_one_time_keys(user_id, device_id, OlmAccountType::Content, None) + .await?; + + let notif_otk_primary_keys = self + .get_one_time_keys(user_id, device_id, OlmAccountType::Notification, None) + .await?; + + let delete_requests = content_otk_primary_keys + .into_iter() + .chain(notif_otk_primary_keys) + .map(|otk_row| { + let request = DeleteRequest::builder() + .key(PARTITION_KEY, AttributeValue::S(otk_row.partition_key)) + .key(SORT_KEY, AttributeValue::S(otk_row.sort_key)) + .build(); + WriteRequest::builder().delete_request(request).build() + }) + .collect::>(); + + batch_write( + &self.client, + NAME, + delete_requests, + ExponentialBackoffConfig::default(), + ) + .await + .map_err(Error::from)?; + + Ok(()) + } } pub struct OTKRow { pub partition_key: String, pub sort_key: String, pub otk: String, } impl OTKRow { pub fn as_delete_request(&self) -> TransactWriteItem { use crate::constants::one_time_keys_table as otk_table; let delete_otk = Delete::builder() .table_name(otk_table::NAME) .key( otk_table::PARTITION_KEY, AttributeValue::S(self.partition_key.to_string()), ) .key( otk_table::SORT_KEY, AttributeValue::S(self.sort_key.to_string()), ) .condition_expression("attribute_exists(#otk)") .expression_attribute_names("#otk", otk_table::ATTR_ONE_TIME_KEY) .build(); TransactWriteItem::builder().delete(delete_otk).build() } } impl TryFrom for OTKRow { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { use crate::constants::one_time_keys_table as otk_table; let partition_key = attrs.take_attr(otk_table::PARTITION_KEY)?; let sort_key = attrs.take_attr(otk_table::SORT_KEY)?; let otk: String = attrs.take_attr(otk_table::ATTR_ONE_TIME_KEY)?; Ok(Self { partition_key, sort_key, otk, }) } } diff --git a/services/identity/src/error.rs b/services/identity/src/error.rs index 1335cc6e5..cd52eeb6c 100644 --- a/services/identity/src/error.rs +++ b/services/identity/src/error.rs @@ -1,52 +1,63 @@ use comm_lib::aws::DynamoDBError; use comm_lib::database::DBItemError; use tracing::error; #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] AwsSdk(DynamoDBError), #[display(...)] Attribute(DBItemError), #[display(...)] Transport(tonic::transport::Error), #[display(...)] Status(tonic::Status), #[display(...)] MissingItem, #[display(...)] DeviceList(DeviceListError), #[display(...)] MalformedItem, #[display(...)] Serde(serde_json::Error), #[display(...)] CannotOverwrite, #[display(...)] OneTimeKeyUploadLimitExceeded, #[display(...)] MaxRetriesExceeded, #[display(...)] InvalidFormat, #[display(...)] NotEnoughOneTimeKeys, } #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum DeviceListError { DeviceAlreadyExists, DeviceNotFound, ConcurrentUpdateError, InvalidDeviceListUpdate, } pub fn consume_error(result: Result) { match result { Ok(_) => (), Err(e) => { error!("{}", e); } } } + +impl From for Error { + fn from(value: comm_lib::database::Error) -> Self { + use comm_lib::database::Error as E; + match value { + E::AwsSdk(err) => Self::AwsSdk(err), + E::Attribute(err) => Self::Attribute(err), + E::MaxRetriesExceeded => Self::MaxRetriesExceeded, + } + } +} diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index b4ccb672f..2ddac126c 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,757 +1,763 @@ use std::collections::HashMap; use crate::config::CONFIG; use crate::database::{DeviceListRow, DeviceListUpdate}; use crate::{ client_service::{handle_db_error, UpdateState, WorkflowInProgress}, constants::request_metadata, database::DatabaseClient, ddb_utils::DateTimeExt, grpc_services::shared::get_value, }; use chrono::{DateTime, Utc}; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, warn}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserIdentityRequest, UserIdentityResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { debug!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID) .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, request_metadata::DEVICE_ID) .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_prekeys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_prekeys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .update_device_prekeys( user_id, device_id, content_keys.into(), notif_keys.into(), ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identifier(user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? else { return Err(Status::not_found("keyserver not found")); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await .map_err(handle_db_error)?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, user_id.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdateState { user_id }; let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(update_state)) .await .map_err(handle_db_error)?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(state.user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device(&user_id, &device_id) .await .map_err(handle_db_error)?; + self + .db_client + .delete_otks_table_rows_for_user_device(&user_id, &device_id) + .await + .map_err(handle_db_error)?; + self .db_client .delete_access_token_data(user_id, device_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await .map_err(handle_db_error)?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found("user not found")); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = construct_delete_password_user_info(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await .map_err(handle_db_error)?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::::from_utc_timestamp_millis(timestamp) .ok_or_else(|| tonic::Status::invalid_argument("Invalid timestamp")) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(RawDeviceList::from) .map(SignedDeviceList::try_from_raw) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); // do all fetches concurrently let mut fetch_tasks = tokio::task::JoinSet::new(); let mut device_lists = HashMap::with_capacity(user_ids.len()); for user_id in user_ids { let db_client = self.db_client.clone(); fetch_tasks.spawn(async move { let result = db_client.get_current_device_list(&user_id).await; (user_id, result) }); } while let Some(task_result) = fetch_tasks.join_next().await { match task_result { Ok((user_id, Ok(Some(device_list_row)))) => { let raw_list = RawDeviceList::from(device_list_row); let signed_list = SignedDeviceList::try_from_raw(raw_list)?; let serialized_list = signed_list.as_json_string()?; device_lists.insert(user_id, serialized_list); } Ok((user_id, Ok(None))) => { warn!(user_id, "User has no device list, skipping!"); } Ok((user_id, Err(err))) => { error!(user_id, "Failed fetching device list: {err}"); // abort fetching other users fetch_tasks.abort_all(); return Err(handle_db_error(err)); } Err(join_error) => { error!("Failed to join device list task: {join_error}"); fetch_tasks.abort_all(); return Err(Status::aborted("unexpected error")); } } } let response = PeersDeviceListsResponse { users_device_lists: device_lists, }; Ok(Response::new(response)) } async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _device_id) = get_user_and_device_id(&request)?; // TODO: when we stop doing "primary device rotation" (migration procedure) // we should verify if this RPC is called by primary device only let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; self .db_client .apply_devicelist_update(&user_id, update) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await .map_err(handle_db_error)?; if get_farcaster_users_response.len() > 1 { error!("multiple users associated with the same Farcaster ID"); return Err(Status::failed_precondition("cannot link Farcaster ID")); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists( "farcaster ID already associated with different user", )); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .remove_farcaster_id(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn find_user_identity( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let response = UserIdentityResponse { identity: Some(identifier.into()), }; return Ok(Response::new(response)); } } // raw device list that can be serialized to JSON (and then signed in the future) #[derive(serde::Serialize, serde::Deserialize)] struct RawDeviceList { devices: Vec, timestamp: i64, } impl From for RawDeviceList { fn from(row: DeviceListRow) -> Self { Self { devices: row.device_ids, timestamp: row.timestamp.timestamp_millis(), } } } #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct SignedDeviceList { /// JSON-stringified [`RawDeviceList`] raw_device_list: String, } impl SignedDeviceList { /// Serialize (and sign in the future) a [`RawDeviceList`] fn try_from_raw(raw: RawDeviceList) -> Result { let stringified_list = serde_json::to_string(&raw).map_err(|err| { error!("Failed to serialize raw device list: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Self { raw_device_list: stringified_list, }) } fn as_raw(&self) -> Result { // The device list payload is sent as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to deserialize serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) .map_err(|err| { warn!("Failed to deserialize raw device list: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } /// Serializes the signed device list to a JSON string fn as_json_string(&self) -> Result { serde_json::to_string(self).map_err(|err| { error!("Failed to serialize device list updates: {}", err); tonic::Status::failed_precondition("unexpected error") }) } } impl TryFrom for SignedDeviceList { type Error = tonic::Status; fn try_from(request: UpdateDeviceListRequest) -> Result { serde_json::from_str(&request.new_device_list).map_err(|err| { warn!("Failed to deserialize device list update: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } } impl TryFrom for DeviceListUpdate { type Error = tonic::Status; fn try_from(signed_list: SignedDeviceList) -> Result { let RawDeviceList { devices, timestamp: raw_timestamp, } = signed_list.as_raw()?; let timestamp = DateTime::::from_utc_timestamp_millis(raw_timestamp) .ok_or_else(|| { error!("Failed to parse RawDeviceList timestamp!"); tonic::Status::invalid_argument("invalid timestamp") })?; Ok(DeviceListUpdate::new(devices, timestamp)) } } #[derive(Clone, serde::Serialize, serde::Deserialize)] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } fn construct_delete_password_user_info( opaque_server_login: comm_opaque2::server::Login, ) -> DeletePasswordUserInfo { DeletePasswordUserInfo { opaque_server_login, } } #[cfg(test)] mod tests { use super::*; #[test] fn serialize_device_list_updates() { let raw_updates = vec![ RawDeviceList { devices: vec!["device1".into()], timestamp: 111111111, }, RawDeviceList { devices: vec!["device1".into(), "device2".into()], timestamp: 222222222, }, ]; let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_raw_list2 = r#"{"devices":["device1","device2"],"timestamp":222222222}"#; let signed_updates = raw_updates .into_iter() .map(SignedDeviceList::try_from_raw) .collect::, _>>() .expect("signing device list updates failed"); assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); let stringified_updates = signed_updates .iter() .map(serde_json::to_string) .collect::, _>>() .expect("serialize signed device lists failed"); let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; assert_eq!(stringified_updates[0], expected_stringified_list1); assert_eq!(stringified_updates[1], expected_stringified_list2); } #[test] fn deserialize_device_list_update() { let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; let request = UpdateDeviceListRequest { new_device_list: raw_payload.to_string(), }; let signed_list = SignedDeviceList::try_from(request) .expect("Failed to parse SignedDeviceList"); let update = DeviceListUpdate::try_from(signed_list) .expect("Failed to parse DeviceListUpdate from signed list"); let expected_timestamp = DateTime::::from_utc_timestamp_millis(123456789).unwrap(); assert_eq!(update.timestamp, expected_timestamp); assert_eq!( update.devices, vec!["device1".to_string(), "device2".to_string()] ); } }