diff --git a/services/identity/src/device_list.rs b/services/identity/src/device_list.rs index 5ef68d021..505d17d95 100644 --- a/services/identity/src/device_list.rs +++ b/services/identity/src/device_list.rs @@ -1,603 +1,603 @@ use chrono::{DateTime, Duration, Utc}; use std::{collections::HashSet, str::FromStr}; use tracing::{debug, error, warn}; use crate::{ constants::{ error_types, tonic_status_messages, DEVICE_LIST_TIMESTAMP_VALID_FOR, }, database::{DeviceListRow, DeviceListUpdate}, error::DeviceListError, grpc_services::protos::auth::UpdateDeviceListRequest, }; // serde helper for serializing/deserializing // device list JSON payload #[derive(serde::Serialize, serde::Deserialize)] struct RawDeviceList { devices: Vec, timestamp: i64, } /// Signed device list payload that is serializable to JSON. /// For the DDB payload, see [`DeviceListUpdate`] #[derive(Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct SignedDeviceList { /// JSON-stringified [`RawDeviceList`] raw_device_list: String, /// Current primary device signature. /// NOTE: Present only when the payload is received from primary device. /// It's `None` for Identity-generated device-lists #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] cur_primary_signature: Option, /// Previous primary device signature. Present only /// if primary device has changed since last update. #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] last_primary_signature: Option, } impl SignedDeviceList { fn as_raw(&self) -> Result { // The device list payload is sent as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to deserialize serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) .map_err(|err| { warn!("Failed to deserialize raw device list: {}", err); tonic::Status::invalid_argument( tonic_status_messages::INVALID_DEVICE_LIST_PAYLOAD, ) }) } /// Serializes the signed device list to a JSON string pub fn as_json_string(&self) -> Result { serde_json::to_string(self).map_err(|err| { error!( errorType = error_types::GRPC_SERVICES_LOG, "Failed to serialize device list updates: {}", err ); tonic::Status::failed_precondition( tonic_status_messages::UNEXPECTED_ERROR, ) }) } } impl TryFrom for SignedDeviceList { type Error = tonic::Status; fn try_from(row: DeviceListRow) -> Result { let raw_list = RawDeviceList { devices: row.device_ids, timestamp: row.timestamp.timestamp_millis(), }; let stringified_list = serde_json::to_string(&raw_list).map_err(|err| { error!( errorType = error_types::GRPC_SERVICES_LOG, "Failed to serialize raw device list: {}", err ); tonic::Status::failed_precondition( tonic_status_messages::UNEXPECTED_ERROR, ) })?; Ok(Self { raw_device_list: stringified_list, cur_primary_signature: row.current_primary_signature, last_primary_signature: row.last_primary_signature, }) } } impl TryFrom for SignedDeviceList { type Error = tonic::Status; fn try_from(request: UpdateDeviceListRequest) -> Result { - request.new_device_list.parse().map_err(|err| { - warn!("Failed to deserialize device list update: {}", err); - tonic::Status::invalid_argument( - tonic_status_messages::INVALID_DEVICE_LIST_PAYLOAD, - ) - }) + request.new_device_list.parse() } } impl FromStr for SignedDeviceList { - type Err = serde_json::Error; + type Err = tonic::Status; fn from_str(s: &str) -> Result { - serde_json::from_str(s) + serde_json::from_str(s).map_err(|err| { + warn!("Failed to deserialize device list: {}", err); + tonic::Status::invalid_argument( + tonic_status_messages::INVALID_DEVICE_LIST_PAYLOAD, + ) + }) } } impl TryFrom for DeviceListUpdate { type Error = tonic::Status; fn try_from(signed_list: SignedDeviceList) -> Result { let RawDeviceList { devices, timestamp: raw_timestamp, } = signed_list.as_raw()?; let timestamp = DateTime::from_timestamp_millis(raw_timestamp).ok_or_else(|| { error!( errorType = error_types::GRPC_SERVICES_LOG, "Failed to parse RawDeviceList timestamp!" ); tonic::Status::invalid_argument( tonic_status_messages::INVALID_TIMESTAMP, ) })?; Ok(DeviceListUpdate { devices, timestamp, current_primary_signature: signed_list.cur_primary_signature, last_primary_signature: signed_list.last_primary_signature, raw_payload: signed_list.raw_device_list, }) } } /// Returns `true` if given timestamp is valid. The timestamp is considered /// valid under the following condition: /// - `new_timestamp` is greater than `previous_timestamp` (if provided) /// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] /// /// Note: For Identity-managed device lists, the timestamp can be `None`. /// Verification is then skipped fn is_new_timestamp_valid( previous_timestamp: Option<&DateTime>, new_timestamp: Option<&DateTime>, ) -> bool { let Some(new_timestamp) = new_timestamp else { return true; }; if let Some(previous_timestamp) = previous_timestamp { if new_timestamp < previous_timestamp { return false; } } let timestamp_valid_duration = Duration::from_std(DEVICE_LIST_TIMESTAMP_VALID_FOR) .expect("FATAL - Invalid duration constant provided"); Utc::now().signed_duration_since(new_timestamp) < timestamp_valid_duration } /// Returns error if new timestamp is invalid. The timestamp is considered /// valid under the following condition: /// - `new_timestamp` is greater than `previous_timestamp` (if provided) /// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] /// /// Note: For Identity-managed device lists, the timestamp can be `None`. /// Verification is then skipped pub fn verify_device_list_timestamp( previous_timestamp: Option<&DateTime>, new_timestamp: Option<&DateTime>, ) -> Result<(), DeviceListError> { if !is_new_timestamp_valid(previous_timestamp, new_timestamp) { return Err(DeviceListError::InvalidDeviceListUpdate); } Ok(()) } pub fn verify_device_list_signatures( previous_primary_device_id: Option<&String>, new_device_list: &DeviceListUpdate, ) -> Result<(), DeviceListError> { let Some(primary_device_id) = new_device_list.devices.first() else { return Ok(()); }; // verify current signature if let Some(signature) = &new_device_list.current_primary_signature { crate::grpc_utils::ed25519_verify( primary_device_id, &new_device_list.raw_payload, signature, ) .map_err(|err| { debug!("curPrimarySignature verification failed: {err}"); DeviceListError::InvalidSignature })?; } // verify last signature if primary device changed if let (Some(previous_primary_id), Some(last_signature)) = ( previous_primary_device_id.filter(|prev| *prev != primary_device_id), &new_device_list.last_primary_signature, ) { crate::grpc_utils::ed25519_verify( previous_primary_id, &new_device_list.raw_payload, last_signature, ) .map_err(|err| { debug!("lastPrimarySignature verification failed: {err}"); DeviceListError::InvalidSignature })?; } Ok(()) } pub fn verify_singleton_device_list( device_list: &DeviceListUpdate, expected_primary_device_id: &str, // expected primary device ID for "lastPrimarySignature". // Use `None` if the device list isn't expected to contain last signature. expected_previous_primary_device_id: Option<&String>, ) -> Result<(), tonic::Status> { use tonic::Status; use tonic_status_messages::INVALID_DEVICE_LIST_UPDATE as INVALID_DEVICE_LIST; match ( &device_list.last_primary_signature, expected_previous_primary_device_id, ) { (None, None) => (), (Some(_), None) => { debug!("Unexpected lastPrimarySignature for singleton device list"); return Err(Status::invalid_argument(INVALID_DEVICE_LIST)); } (None, Some(_)) => { debug!("Missing lastPrimarySignature for singleton device list"); return Err(Status::invalid_argument(INVALID_DEVICE_LIST)); } (Some(last_signature), Some(last_signing_public_key)) => { crate::grpc_utils::ed25519_verify( last_signing_public_key, &device_list.raw_payload, last_signature, )?; } }; let Some(signature) = &device_list.current_primary_signature else { debug!("Missing curPrimarySignature for singleton device list"); return Err(Status::invalid_argument(INVALID_DEVICE_LIST)); }; crate::grpc_utils::ed25519_verify( expected_primary_device_id, &device_list.raw_payload, signature, )?; if device_list.devices.len() != 1 { debug!("Invalid device list length"); return Err(Status::invalid_argument(INVALID_DEVICE_LIST)); } if device_list .devices .first() .filter(|it| **it == expected_primary_device_id) .is_none() { debug!("Invalid primary device ID for singleton device list"); return Err(Status::invalid_argument(INVALID_DEVICE_LIST)); } Ok(()) } pub mod validation { /// utility alias to help infer validator type if exact function is not provided. pub type DeviceListValidator = fn(&[&str], &[&str]) -> bool; use super::*; /// Returns `true` if `new_device_list` contains exactly one more new device /// compared to `previous_device_list` fn is_device_added( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_set: HashSet<_> = previous_device_list.iter().collect(); let new_set: HashSet<_> = new_device_list.iter().collect(); return new_set.difference(&previous_set).count() == 1; } /// Returns `true` if `new_device_list` contains exactly one fewer device /// compared to `previous_device_list` fn is_device_removed( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_set: HashSet<_> = previous_device_list.iter().collect(); let new_set: HashSet<_> = new_device_list.iter().collect(); return previous_set.difference(&new_set).count() == 1; } fn primary_device_changed( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_primary = previous_device_list.first(); let new_primary = new_device_list.first(); new_primary != previous_primary } /// Verifies if exactly one device has been replaced. /// No reorders are permitted. Both lists have to have the same length. fn is_device_replaced( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { if previous_device_list.len() != new_device_list.len() { return false; } // exactly 1 different device ID std::iter::zip(previous_device_list, new_device_list) .filter(|(a, b)| a != b) .count() == 1 } /// Verifies if the device list contains duplicated device IDs fn has_duplicates(device_list: &[&str]) -> bool { let devices_set: HashSet<&str> = device_list.iter().copied().collect(); devices_set.len() != device_list.len() } // This is going to be used when doing primary devicd keys rotation #[allow(unused)] pub fn primary_device_rotation_validator( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { primary_device_changed(previous_device_list, new_device_list) && !is_device_replaced(&previous_device_list[1..], &new_device_list[1..]) } /// The `UpdateDeviceList` RPC should be able to either add or remove /// one device, and it cannot currently switch primary devices. /// The RPC is also able to replace a keyserver device pub fn update_device_list_rpc_validator( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { if primary_device_changed(previous_device_list, new_device_list) { return false; } if has_duplicates(new_device_list) { return false; } // allow replacing a keyserver if is_device_replaced(previous_device_list, new_device_list) { return true; } let is_added = is_device_added(previous_device_list, new_device_list); let is_removed = is_device_removed(previous_device_list, new_device_list); is_added != is_removed } #[cfg(test)] mod tests { use super::*; #[test] fn test_device_added_or_removed() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device1", "device2"]; assert!(is_device_added(&list1, &list2)); assert!(is_device_removed(&list1, &list2).not()); assert!(is_device_added(&list2, &list1).not()); assert!(is_device_removed(&list2, &list1)); assert!(is_device_added(&list1, &list1).not()); assert!(is_device_removed(&list1, &list1).not()); } #[test] fn test_primary_device_changed() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device1", "device2"]; let list3 = vec!["device2"]; assert!(primary_device_changed(&list1, &list2).not()); assert!(primary_device_changed(&list1, &list3)); } #[test] fn test_device_replaced() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device2"]; let list3 = vec!["device1", "device2"]; let list4 = vec!["device2", "device1"]; let list5 = vec!["device2", "device3"]; assert!(is_device_replaced(&list1, &list2), "Singleton replacement"); assert!(is_device_replaced(&list4, &list5), "Standard replacement"); assert!(is_device_replaced(&list1, &list3).not(), "Length unequal"); assert!(is_device_replaced(&list3, &list3).not(), "Unchanged"); assert!(is_device_replaced(&list3, &list4).not(), "Reorder"); } #[test] fn test_duplicated_devices() { use std::ops::Not; let list1 = vec!["device1", "device2", "device3"]; let list2 = vec!["device1", "device2", "device2"]; assert!(has_duplicates(&list1).not(), "No duplicates"); assert!(has_duplicates(&list2), "With duplicates"); } } } #[cfg(test)] mod tests { use super::*; #[test] fn deserialize_device_list_signature() { let payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; let payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; let list_with_signature: SignedDeviceList = serde_json::from_str(payload_with_signature).unwrap(); let list_without_signatures: SignedDeviceList = serde_json::from_str(payload_without_signatures).unwrap(); assert_eq!( list_with_signature.cur_primary_signature, Some("foo".to_string()) ); assert!(list_with_signature.last_primary_signature.is_none()); assert!(list_without_signatures.cur_primary_signature.is_none()); assert!(list_without_signatures.last_primary_signature.is_none()); } #[test] fn serialize_device_list_signatures() { let raw_list = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let device_list_without_signature = SignedDeviceList { raw_device_list: raw_list.to_string(), cur_primary_signature: None, last_primary_signature: None, }; assert_eq!( device_list_without_signature.as_json_string().unwrap(), expected_payload_without_signatures ); let expected_payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; let device_list_with_cur_signature = SignedDeviceList { raw_device_list: raw_list.to_string(), cur_primary_signature: Some("foo".to_string()), last_primary_signature: None, }; assert_eq!( device_list_with_cur_signature.as_json_string().unwrap(), expected_payload_with_signature ); } #[test] fn serialize_device_list_updates() { let raw_updates = vec![ create_device_list_row(RawDeviceList { devices: vec!["device1".into()], timestamp: 111111111, }), create_device_list_row(RawDeviceList { devices: vec!["device1".into(), "device2".into()], timestamp: 222222222, }), ]; let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_raw_list2 = r#"{"devices":["device1","device2"],"timestamp":222222222}"#; let signed_updates = raw_updates .into_iter() .map(SignedDeviceList::try_from) .collect::, _>>() .expect("signing device list updates failed"); assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); let stringified_updates = signed_updates .iter() .map(serde_json::to_string) .collect::, _>>() .expect("serialize signed device lists failed"); let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; assert_eq!(stringified_updates[0], expected_stringified_list1); assert_eq!(stringified_updates[1], expected_stringified_list2); } #[test] fn deserialize_device_list_update() { let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; let request = UpdateDeviceListRequest { new_device_list: raw_payload.to_string(), }; let signed_list = SignedDeviceList::try_from(request) .expect("Failed to parse SignedDeviceList"); let update = DeviceListUpdate::try_from(signed_list) .expect("Failed to parse DeviceListUpdate from signed list"); let expected_timestamp = DateTime::from_timestamp_millis(123456789).unwrap(); assert_eq!(update.timestamp, expected_timestamp); assert_eq!( update.devices, vec!["device1".to_string(), "device2".to_string()] ); } #[test] fn test_timestamp_validation() { let valid_timestamp = Utc::now() - Duration::milliseconds(100); let previous_timestamp = Utc::now() - Duration::seconds(10); let too_old_timestamp = previous_timestamp - Duration::seconds(1); let expired_timestamp = Utc::now() - Duration::minutes(20); assert!( verify_device_list_timestamp( Some(&previous_timestamp), Some(&valid_timestamp) ) .is_ok(), "Valid timestamp should pass verification" ); assert!( verify_device_list_timestamp( Some(&previous_timestamp), Some(&too_old_timestamp) ) .is_err(), "Timestamp older than previous, should fail verification" ); assert!( verify_device_list_timestamp(None, Some(&expired_timestamp)).is_err(), "Expired timestamp should fail verification" ); assert!( verify_device_list_timestamp(None, None).is_ok(), "No provided timestamp should pass" ); } /// helper for mocking DB rows from raw device list payloads fn create_device_list_row(raw_list: RawDeviceList) -> DeviceListRow { DeviceListRow { user_id: "".to_string(), device_ids: raw_list.devices, timestamp: DateTime::from_timestamp_millis(raw_list.timestamp).unwrap(), current_primary_signature: None, last_primary_signature: None, } } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index 7aaeb0901..752022c06 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,1013 +1,1008 @@ use std::collections::{HashMap, HashSet}; use crate::config::CONFIG; use crate::database::{DeviceListUpdate, PlatformDetails}; use crate::device_list::validation::DeviceListValidator; use crate::device_list::SignedDeviceList; use crate::error::consume_error; use crate::log::redact_sensitive_data; use crate::{ client_service::{handle_db_error, WorkflowInProgress}, constants::{error_types, request_metadata, tonic_status_messages}, database::DatabaseClient, grpc_services::shared::{get_platform_metadata, get_value}, }; use chrono::DateTime; use comm_lib::auth::AuthService; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, trace, warn}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, PrimaryDeviceLogoutRequest, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserDevicesPlatformDetails, UserIdentitiesRequest, UserIdentitiesResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, comm_auth_service: AuthService, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { trace!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { trace!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::MISSING_CREDENTIALS) })?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted(tonic_status_messages::BAD_CREDENTIALS)); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::USER_ID_MISSING) })?; let device_id = get_value(request, request_metadata::DEVICE_ID).ok_or_else(|| { Status::unauthenticated(tonic_status_messages::DEVICE_ID_MISSING) })?; Ok((user_id, device_id)) } fn spawn_delete_tunnelbroker_data_task(device_ids: Vec) { tokio::spawn(async move { debug!( "Attempting to delete Tunnelbroker data for devices: {:?}", device_ids.as_slice() ); let result = crate::tunnelbroker::delete_devices_data(&device_ids).await; consume_error(result); }); } fn spawn_delete_backup_data_task(user_id: String, auth_service: AuthService) { tokio::spawn(async move { debug!("Attempting to delete Backup data for user: {}", &user_id); let result = crate::backup::delete_backup_user_data(&user_id, &auth_service).await; consume_error(result); }); } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { #[tracing::instrument(skip_all)] async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_key = message.new_content_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_CONTENT_KEYS) })?; let notif_key = message.new_notif_prekey.ok_or_else(|| { Status::invalid_argument(tonic_status_messages::MISSING_NOTIF_KEYS) })?; self .db_client .update_device_prekeys( user_id, device_id, content_key.into(), notif_key.into(), ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } #[tracing::instrument(skip_all)] async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identity(user_id) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } #[tracing::instrument(skip_all)] async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identity(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| { tonic::Status::not_found(tonic_status_messages::USER_NOT_FOUND) })?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? else { return Err(Status::not_found( tonic_status_messages::KEYSERVER_NOT_FOUND, )); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await .map_err(handle_db_error)?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } #[tracing::instrument(skip_all)] async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let Some((username, password_file)) = self .db_client .get_username_and_password_file(&user_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::permission_denied( tonic_status_messages::WALLET_USER, )); }; let message = request.into_inner(); let mut server_login = comm_opaque2::server::Login::new(); let login_response = server_login .start( &CONFIG.server_setup, &password_file, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let registration_response = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdatePasswordInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(Box::new(update_state))) .await .map_err(handle_db_error)?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: registration_response, opaque_login_response: login_response, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_access_token_data(&user_id, &device_id) .await .map_err(handle_db_error)?; let device_list = self .db_client .get_current_device_list(&user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(&user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition("no device list")); }; tokio::spawn(async move { debug!( "Sending device list updates to {:?}", device_list.device_ids ); let device_ids: Vec<&str> = device_list.device_ids.iter().map(AsRef::as_ref).collect(); let result = crate::tunnelbroker::send_device_list_update(&device_ids).await; consume_error(result); }); spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_primary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!( "Primary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; // Get and verify singleton device list let parsed_device_list: SignedDeviceList = - message.signed_device_list.parse().map_err(|err| { - warn!("Failed to deserialize device list: {}", err); - tonic::Status::invalid_argument( - tonic_status_messages::INVALID_DEVICE_LIST_PAYLOAD, - ) - })?; + message.signed_device_list.parse()?; let update_payload = DeviceListUpdate::try_from(parsed_device_list)?; crate::device_list::verify_singleton_device_list( &update_payload, &device_id, None, )?; self .db_client .apply_devicelist_update( &user_id, update_payload, // - We've already validated the list so no need to do it here. // - Need to pass the type because it cannot be inferred from None None::, // We don't want side effects - we'll take care of removing devices // on our own. (Side effect would skip the primary device). false, ) .await .map_err(handle_db_error)?; debug!(user_id, "Attempting to delete user's access tokens"); self .db_client .delete_all_tokens_for_user(&user_id) .await .map_err(handle_db_error)?; // We must delete the one-time keys first because doing so requires device // IDs from the devices table debug!(user_id, "Attempting to delete user's one-time keys"); self .db_client .delete_otks_table_rows_for_user(&user_id) .await .map_err(handle_db_error)?; debug!(user_id, "Attempting to delete user's devices"); let device_ids = self .db_client .delete_devices_data_for_user(&user_id) .await .map_err(handle_db_error)?; spawn_delete_tunnelbroker_data_task(device_ids); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_secondary_device( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; debug!( "Secondary device logout request for user_id={}, device_id={}", user_id, device_id ); self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Secondary, ) .await?; self .db_client .delete_access_token_data(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await .map_err(handle_db_error)?; spawn_delete_tunnelbroker_data_task([device_id].into()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); let user_is_password_authenticated = self .db_client .user_is_password_authenticated(&user_id) .await .map_err(handle_db_error)?; if user_is_password_authenticated { return Err(tonic::Status::permission_denied( tonic_status_messages::PASSWORD_USER, )); } let device_ids = self .db_client .delete_user(user_id.clone()) .await .map_err(handle_db_error)?; spawn_delete_tunnelbroker_data_task(device_ids); spawn_delete_backup_data_task(user_id, self.comm_auth_service.clone()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await .map_err(handle_db_error)?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found( tonic_status_messages::USER_NOT_FOUND, )); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = DeletePasswordUserInfo::new(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await .map_err(handle_db_error)?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found( tonic_status_messages::SESSION_NOT_FOUND, )); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; let device_ids = self .db_client .delete_user(user_id.clone()) .await .map_err(handle_db_error)?; spawn_delete_tunnelbroker_data_task(device_ids); spawn_delete_backup_data_task(user_id, self.comm_auth_service.clone()); let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::from_timestamp_millis(timestamp).ok_or_else(|| { tonic::Status::invalid_argument( tonic_status_messages::INVALID_TIMESTAMP, ) }) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(SignedDeviceList::try_from) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } #[tracing::instrument(skip_all)] async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); let request_count = user_ids.len(); let user_ids: HashSet = user_ids.into_iter().collect(); debug!( "Requesting device lists and platform details for {} users ({} unique)", request_count, user_ids.len() ); // 1. Fetch device lists let device_lists = self .db_client .get_current_device_lists(user_ids) .await .map_err(handle_db_error)?; trace!("Found device lists for {} users", device_lists.keys().len()); // 2. Fetch platform details let flattened_user_device_ids: Vec<(String, String)> = device_lists .iter() .flat_map(|(user_id, device_list)| { device_list .device_ids .iter() .map(|device_id| (user_id.clone(), device_id.clone())) .collect::>() }) .collect(); let platform_details = self .db_client .get_devices_platform_details(flattened_user_device_ids) .await .map_err(handle_db_error)?; trace!( "Found platform details for {} users", platform_details.keys().len() ); // 3. Prepare output format let users_device_lists: HashMap = device_lists .into_iter() .map(|(user_id, device_list_row)| { let signed_list = SignedDeviceList::try_from(device_list_row)?; let serialized_list = signed_list.as_json_string()?; Ok((user_id, serialized_list)) }) .collect::>()?; let users_devices_platform_details = platform_details .into_iter() .map(|(user_id, devices_map)| { (user_id, UserDevicesPlatformDetails::from(devices_map)) }) .collect(); let response = PeersDeviceListsResponse { users_device_lists, users_devices_platform_details, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .verify_device_on_device_list( &user_id, &device_id, DeviceListItemKind::Primary, ) .await?; let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; let validator = crate::device_list::validation::update_device_list_rpc_validator; self .db_client .apply_devicelist_update(&user_id, update, Some(validator), true) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await .map_err(handle_db_error)?; if get_farcaster_users_response.len() > 1 { error!( errorType = error_types::GRPC_SERVICES_LOG, "multiple users associated with the same Farcaster ID" ); return Err(Status::failed_precondition( tonic_status_messages::CANNOT_LINK_FID, )); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists(tonic_status_messages::FID_TAKEN)); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .remove_farcaster_id(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn find_user_identities( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_ids: HashSet = message.user_ids.into_iter().collect(); let users_table_results = self .db_client .find_db_user_identities(user_ids.clone()) .await .map_err(handle_db_error)?; // Look up only user IDs that haven't been found in users table let reserved_user_ids_to_query: Vec = user_ids .into_iter() .filter(|user_id| !users_table_results.contains_key(user_id)) .collect(); let reserved_user_identifiers = self .db_client .query_reserved_usernames_by_user_ids(reserved_user_ids_to_query) .await .map_err(handle_db_error)?; let identities = users_table_results .into_iter() .map(|(user_id, identifier)| (user_id, identifier.into())) .collect(); let response = UserIdentitiesResponse { identities, reserved_user_identifiers, }; return Ok(Response::new(response)); } #[tracing::instrument(skip_all)] async fn sync_platform_details( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let platform_metadata = get_platform_metadata(&request)?; let platform_details = PlatformDetails::new(platform_metadata, None) .map_err(|_| { Status::invalid_argument( tonic_status_messages::INVALID_PLATFORM_METADATA, ) })?; self .db_client .update_device_platform_details(user_id, device_id, platform_details) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } } #[allow(dead_code)] enum DeviceListItemKind { Any, Primary, Secondary, } impl AuthenticatedService { async fn verify_device_on_device_list( &self, user_id: &String, device_id: &String, device_kind: DeviceListItemKind, ) -> Result<(), tonic::Status> { let device_list = self .db_client .get_current_device_list(user_id) .await .map_err(|err| { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); handle_db_error(err) })?; let Some(device_list) = device_list else { error!( user_id = redact_sensitive_data(user_id), errorType = error_types::GRPC_SERVICES_LOG, "User has no device list!" ); return Err(Status::failed_precondition( tonic_status_messages::NO_DEVICE_LIST, )); }; use DeviceListItemKind as DeviceKind; let device_on_list = match device_kind { DeviceKind::Any => device_list.has_device(device_id), DeviceKind::Primary => device_list.is_primary_device(device_id), DeviceKind::Secondary => device_list.has_secondary_device(device_id), }; if !device_on_list { debug!( "Device {} not in device list for user {}", device_id, user_id ); return Err(Status::permission_denied( tonic_status_messages::DEVICE_NOT_IN_DEVICE_LIST, )); } Ok(()) } } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } #[derive( Clone, serde::Serialize, serde::Deserialize, derive_more::Constructor, )] pub struct UpdatePasswordInfo { pub opaque_server_login: comm_opaque2::server::Login, }