diff --git a/services/identity/src/device_list.rs b/services/identity/src/device_list.rs index 5fe1542e4..f2fba3784 100644 --- a/services/identity/src/device_list.rs +++ b/services/identity/src/device_list.rs @@ -1,226 +1,442 @@ use chrono::{DateTime, Duration, Utc}; use std::collections::HashSet; +use tracing::{error, warn}; use crate::{ - constants::DEVICE_LIST_TIMESTAMP_VALID_FOR, error::DeviceListError, + constants::{error_types, DEVICE_LIST_TIMESTAMP_VALID_FOR}, + database::{DeviceListRow, DeviceListUpdate}, + ddb_utils::DateTimeExt, + error::DeviceListError, + grpc_services::protos::auth::UpdateDeviceListRequest, }; +// raw device list that can be serialized to JSON (and then signed in the future) +#[derive(serde::Serialize, serde::Deserialize)] +pub struct RawDeviceList { + devices: Vec, + timestamp: i64, +} + +impl From for RawDeviceList { + fn from(row: DeviceListRow) -> Self { + Self { + devices: row.device_ids, + timestamp: row.timestamp.timestamp_millis(), + } + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SignedDeviceList { + /// JSON-stringified [`RawDeviceList`] + raw_device_list: String, + /// Current primary device signature. + /// NOTE: Present only when the payload is received from primary device. + /// It's `None` for Identity-generated device-lists + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + cur_primary_signature: Option, + /// Previous primary device signature. Present only + /// if primary device has changed since last update. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + last_primary_signature: Option, +} + +impl SignedDeviceList { + /// Serialize (and sign in the future) a [`RawDeviceList`] + pub fn try_from_raw(raw: RawDeviceList) -> Result { + let stringified_list = serde_json::to_string(&raw).map_err(|err| { + error!( + errorType = error_types::GRPC_SERVICES_LOG, + "Failed to serialize raw device list: {}", err + ); + tonic::Status::failed_precondition("unexpected error") + })?; + + Ok(Self { + raw_device_list: stringified_list, + cur_primary_signature: None, + last_primary_signature: None, + }) + } + + fn as_raw(&self) -> Result { + // The device list payload is sent as an escaped JSON payload. + // Escaped double quotes need to be trimmed before attempting to deserialize + serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) + .map_err(|err| { + warn!("Failed to deserialize raw device list: {}", err); + tonic::Status::invalid_argument("invalid device list payload") + }) + } + + /// Serializes the signed device list to a JSON string + pub fn as_json_string(&self) -> Result { + serde_json::to_string(self).map_err(|err| { + error!( + errorType = error_types::GRPC_SERVICES_LOG, + "Failed to serialize device list updates: {}", err + ); + tonic::Status::failed_precondition("unexpected error") + }) + } +} + +impl TryFrom for SignedDeviceList { + type Error = tonic::Status; + fn try_from(request: UpdateDeviceListRequest) -> Result { + serde_json::from_str(&request.new_device_list).map_err(|err| { + warn!("Failed to deserialize device list update: {}", err); + tonic::Status::invalid_argument("invalid device list payload") + }) + } +} + +impl TryFrom for DeviceListUpdate { + type Error = tonic::Status; + fn try_from(signed_list: SignedDeviceList) -> Result { + let RawDeviceList { + devices, + timestamp: raw_timestamp, + } = signed_list.as_raw()?; + let timestamp = DateTime::::from_utc_timestamp_millis(raw_timestamp) + .ok_or_else(|| { + error!( + errorType = error_types::GRPC_SERVICES_LOG, + "Failed to parse RawDeviceList timestamp!" + ); + tonic::Status::invalid_argument("invalid timestamp") + })?; + Ok(DeviceListUpdate::new(devices, timestamp)) + } +} + /// Returns `true` if given timestamp is valid. The timestamp is considered /// valid under the following condition: /// - `new_timestamp` is greater than `previous_timestamp` (if provided) /// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] /// /// Note: For Identity-managed device lists, the timestamp can be `None`. /// Verification is then skipped fn is_new_timestamp_valid( previous_timestamp: Option<&DateTime>, new_timestamp: Option<&DateTime>, ) -> bool { let Some(new_timestamp) = new_timestamp else { return true; }; if let Some(previous_timestamp) = previous_timestamp { if new_timestamp < previous_timestamp { return false; } } let timestamp_valid_duration = Duration::from_std(DEVICE_LIST_TIMESTAMP_VALID_FOR) .expect("FATAL - Invalid duration constant provided"); Utc::now().signed_duration_since(new_timestamp) < timestamp_valid_duration } /// Returns error if new timestamp is invalid. The timestamp is considered /// valid under the following condition: /// - `new_timestamp` is greater than `previous_timestamp` (if provided) /// - `new_timestamp` is not older than [`DEVICE_LIST_TIMESTAMP_VALID_FOR`] /// /// Note: For Identity-managed device lists, the timestamp can be `None`. /// Verification is then skipped pub fn verify_device_list_timestamp( previous_timestamp: Option<&DateTime>, new_timestamp: Option<&DateTime>, ) -> Result<(), DeviceListError> { if !is_new_timestamp_valid(previous_timestamp, new_timestamp) { return Err(DeviceListError::InvalidDeviceListUpdate); } Ok(()) } pub mod validation { use super::*; /// Returns `true` if `new_device_list` contains exactly one more new device /// compared to `previous_device_list` fn is_device_added( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_set: HashSet<_> = previous_device_list.iter().collect(); let new_set: HashSet<_> = new_device_list.iter().collect(); return new_set.difference(&previous_set).count() == 1; } /// Returns `true` if `new_device_list` contains exactly one fewer device /// compared to `previous_device_list` fn is_device_removed( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_set: HashSet<_> = previous_device_list.iter().collect(); let new_set: HashSet<_> = new_device_list.iter().collect(); return previous_set.difference(&new_set).count() == 1; } fn primary_device_changed( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { let previous_primary = previous_device_list.first(); let new_primary = new_device_list.first(); new_primary != previous_primary } /// Verifies if exactly one device has been replaced. /// No reorders are permitted. Both lists have to have the same length. fn is_device_replaced( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { if previous_device_list.len() != new_device_list.len() { return false; } // exactly 1 different device ID std::iter::zip(previous_device_list, new_device_list) .filter(|(a, b)| a != b) .count() == 1 } // This is going to be used when doing primary devicd keys rotation #[allow(unused)] pub fn primary_device_rotation_validator( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { primary_device_changed(previous_device_list, new_device_list) && !is_device_replaced(&previous_device_list[1..], &new_device_list[1..]) } /// The `UpdateDeviceList` RPC should be able to either add or remove /// one device, and it cannot currently switch primary devices. /// The RPC is also able to replace a keyserver device pub fn update_device_list_rpc_validator( previous_device_list: &[&str], new_device_list: &[&str], ) -> bool { if primary_device_changed(previous_device_list, new_device_list) { return false; } // allow replacing a keyserver if is_device_replaced(previous_device_list, new_device_list) { return true; } let is_added = is_device_added(previous_device_list, new_device_list); let is_removed = is_device_removed(previous_device_list, new_device_list); is_added != is_removed } #[cfg(test)] mod tests { use super::*; #[test] fn test_device_added_or_removed() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device1", "device2"]; assert!(is_device_added(&list1, &list2)); assert!(is_device_removed(&list1, &list2).not()); assert!(is_device_added(&list2, &list1).not()); assert!(is_device_removed(&list2, &list1)); assert!(is_device_added(&list1, &list1).not()); assert!(is_device_removed(&list1, &list1).not()); } #[test] fn test_primary_device_changed() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device1", "device2"]; let list3 = vec!["device2"]; assert!(primary_device_changed(&list1, &list2).not()); assert!(primary_device_changed(&list1, &list3)); } #[test] fn test_device_replaced() { use std::ops::Not; let list1 = vec!["device1"]; let list2 = vec!["device2"]; let list3 = vec!["device1", "device2"]; let list4 = vec!["device2", "device1"]; let list5 = vec!["device2", "device3"]; assert!(is_device_replaced(&list1, &list2), "Singleton replacement"); assert!(is_device_replaced(&list4, &list5), "Standard replacement"); assert!(is_device_replaced(&list1, &list3).not(), "Length unequal"); assert!(is_device_replaced(&list3, &list3).not(), "Unchanged"); assert!(is_device_replaced(&list3, &list4).not(), "Reorder"); } } } #[cfg(test)] mod tests { use super::*; + #[test] + fn deserialize_device_list_signature() { + let payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; + let payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; + + let list_with_signature: SignedDeviceList = + serde_json::from_str(payload_with_signature).unwrap(); + let list_without_signatures: SignedDeviceList = + serde_json::from_str(payload_without_signatures).unwrap(); + + assert_eq!( + list_with_signature.cur_primary_signature, + Some("foo".to_string()) + ); + assert!(list_with_signature.last_primary_signature.is_none()); + + assert!(list_without_signatures.cur_primary_signature.is_none()); + assert!(list_without_signatures.last_primary_signature.is_none()); + } + + #[test] + fn serialize_device_list_signatures() { + let raw_list = r#"{"devices":["device1"],"timestamp":111111111}"#; + + let expected_payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; + let device_list_without_signature = SignedDeviceList { + raw_device_list: raw_list.to_string(), + cur_primary_signature: None, + last_primary_signature: None, + }; + assert_eq!( + device_list_without_signature.as_json_string().unwrap(), + expected_payload_without_signatures + ); + + let expected_payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; + let device_list_with_cur_signature = SignedDeviceList { + raw_device_list: raw_list.to_string(), + cur_primary_signature: Some("foo".to_string()), + last_primary_signature: None, + }; + assert_eq!( + device_list_with_cur_signature.as_json_string().unwrap(), + expected_payload_with_signature + ); + } + + #[test] + fn serialize_device_list_updates() { + let raw_updates = vec![ + RawDeviceList { + devices: vec!["device1".into()], + timestamp: 111111111, + }, + RawDeviceList { + devices: vec!["device1".into(), "device2".into()], + timestamp: 222222222, + }, + ]; + + let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; + let expected_raw_list2 = + r#"{"devices":["device1","device2"],"timestamp":222222222}"#; + + let signed_updates = raw_updates + .into_iter() + .map(SignedDeviceList::try_from_raw) + .collect::, _>>() + .expect("signing device list updates failed"); + + assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); + assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); + + let stringified_updates = signed_updates + .iter() + .map(serde_json::to_string) + .collect::, _>>() + .expect("serialize signed device lists failed"); + + let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; + let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; + + assert_eq!(stringified_updates[0], expected_stringified_list1); + assert_eq!(stringified_updates[1], expected_stringified_list2); + } + + #[test] + fn deserialize_device_list_update() { + let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; + let request = UpdateDeviceListRequest { + new_device_list: raw_payload.to_string(), + }; + + let signed_list = SignedDeviceList::try_from(request) + .expect("Failed to parse SignedDeviceList"); + let update = DeviceListUpdate::try_from(signed_list) + .expect("Failed to parse DeviceListUpdate from signed list"); + + let expected_timestamp = + DateTime::::from_utc_timestamp_millis(123456789).unwrap(); + + assert_eq!(update.timestamp, expected_timestamp); + assert_eq!( + update.devices, + vec!["device1".to_string(), "device2".to_string()] + ); + } + #[test] fn test_timestamp_validation() { let valid_timestamp = Utc::now() - Duration::milliseconds(100); let previous_timestamp = Utc::now() - Duration::seconds(10); let too_old_timestamp = previous_timestamp - Duration::seconds(1); let expired_timestamp = Utc::now() - Duration::minutes(20); assert!( verify_device_list_timestamp( Some(&previous_timestamp), Some(&valid_timestamp) ) .is_ok(), "Valid timestamp should pass verification" ); assert!( verify_device_list_timestamp( Some(&previous_timestamp), Some(&too_old_timestamp) ) .is_err(), "Timestamp older than previous, should fail verification" ); assert!( verify_device_list_timestamp(None, Some(&expired_timestamp)).is_err(), "Expired timestamp should fail verification" ); assert!( verify_device_list_timestamp(None, None).is_ok(), "No provided timestamp should pass" ); } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index 148318cd0..b3d6ab3c8 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,863 +1,648 @@ use std::collections::HashMap; use crate::config::CONFIG; -use crate::database::{DeviceListRow, DeviceListUpdate}; +use crate::database::DeviceListUpdate; +use crate::device_list::{RawDeviceList, SignedDeviceList}; use crate::{ client_service::{handle_db_error, UpdateState, WorkflowInProgress}, constants::{error_types, request_metadata}, database::DatabaseClient, ddb_utils::DateTimeExt, grpc_services::shared::get_value, }; use chrono::{DateTime, Utc}; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, warn}; use super::protos::auth::{ identity_client_service_server::IdentityClientService, DeletePasswordUserFinishRequest, DeletePasswordUserStartRequest, DeletePasswordUserStartResponse, GetDeviceListRequest, GetDeviceListResponse, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, PeersDeviceListsRequest, PeersDeviceListsResponse, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, UserIdentityRequest, UserIdentityResponse, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { debug!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID) .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, request_metadata::DEVICE_ID) .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { #[tracing::instrument(skip_all)] async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_prekeys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_prekeys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .update_device_prekeys( user_id, device_id, content_keys.into(), notif_keys.into(), ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } #[tracing::instrument(skip_all)] async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } #[tracing::instrument(skip_all)] async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identifier(user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(identifier.into()), })) } #[tracing::instrument(skip_all)] async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let Some(keyserver_info) = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? else { return Err(Status::not_found("keyserver not found")); }; let primary_device_data = self .db_client .get_primary_device_data(&message.user_id) .await .map_err(handle_db_error)?; let primary_device_keys = primary_device_data.device_key_info; let response = Response::new(KeyserverKeysResponse { keyserver_info: Some(keyserver_info.into()), identity: Some(identifier.into()), primary_device_identity_info: Some(primary_device_keys.into()), }); return Ok(response); } #[tracing::instrument(skip_all)] async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( &user_id, &device_id, &message.content_one_time_prekeys, &message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, user_id.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdateState { user_id }; let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(update_state)) .await .map_err(handle_db_error)?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(state.user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_otks_table_rows_for_user_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_access_token_data(user_id, device_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_wallet_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; debug!("Attempting to delete wallet user: {}", user_id); self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn delete_password_user_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to start deleting password user: {}", user_id); let maybe_username_and_password_file = self .db_client .get_username_and_password_file(&user_id) .await .map_err(handle_db_error)?; let Some((username, password_file_bytes)) = maybe_username_and_password_file else { return Err(tonic::Status::not_found("user not found")); }; let mut server_login = comm_opaque2::server::Login::new(); let server_response = server_login .start( &CONFIG.server_setup, &password_file_bytes, &message.opaque_login_request, username.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let delete_state = construct_delete_password_user_info(server_login); let session_id = self .db_client .insert_workflow(WorkflowInProgress::PasswordUserDeletion(Box::new( delete_state, ))) .await .map_err(handle_db_error)?; let response = Response::new(DeletePasswordUserStartResponse { session_id, opaque_login_response: server_response, }); Ok(response) } #[tracing::instrument(skip_all)] async fn delete_password_user_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to finish deleting password user: {}", user_id); let Some(WorkflowInProgress::PasswordUserDeletion(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let mut server_login = state.opaque_server_login; server_login .finish(&message.opaque_login_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::::from_utc_timestamp_millis(timestamp) .ok_or_else(|| tonic::Status::invalid_argument("Invalid timestamp")) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(RawDeviceList::from) .map(SignedDeviceList::try_from_raw) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(SignedDeviceList::as_json_string) .collect::, _>>()?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } #[tracing::instrument(skip_all)] async fn get_device_lists_for_users( &self, request: tonic::Request, ) -> Result, tonic::Status> { let PeersDeviceListsRequest { user_ids } = request.into_inner(); // do all fetches concurrently let mut fetch_tasks = tokio::task::JoinSet::new(); let mut device_lists = HashMap::with_capacity(user_ids.len()); for user_id in user_ids { let db_client = self.db_client.clone(); fetch_tasks.spawn(async move { let result = db_client.get_current_device_list(&user_id).await; (user_id, result) }); } while let Some(task_result) = fetch_tasks.join_next().await { match task_result { Ok((user_id, Ok(Some(device_list_row)))) => { let raw_list = RawDeviceList::from(device_list_row); let signed_list = SignedDeviceList::try_from_raw(raw_list)?; let serialized_list = signed_list.as_json_string()?; device_lists.insert(user_id, serialized_list); } Ok((user_id, Ok(None))) => { warn!(user_id, "User has no device list, skipping!"); } Ok((user_id, Err(err))) => { error!( user_id, errorType = error_types::GRPC_SERVICES_LOG, "Failed fetching device list: {err}" ); // abort fetching other users fetch_tasks.abort_all(); return Err(handle_db_error(err)); } Err(join_error) => { error!( errorType = error_types::GRPC_SERVICES_LOG, "Failed to join device list task: {join_error}" ); fetch_tasks.abort_all(); return Err(Status::aborted("unexpected error")); } } } let response = PeersDeviceListsResponse { users_device_lists: device_lists, }; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _device_id) = get_user_and_device_id(&request)?; // TODO: when we stop doing "primary device rotation" (migration procedure) // we should verify if this RPC is called by primary device only let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; self .db_client .apply_devicelist_update( &user_id, update, crate::device_list::validation::update_device_list_rpc_validator, ) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } #[tracing::instrument(skip_all)] async fn link_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let mut get_farcaster_users_response = self .db_client .get_farcaster_users(vec![message.farcaster_id.clone()]) .await .map_err(handle_db_error)?; if get_farcaster_users_response.len() > 1 { error!( errorType = error_types::GRPC_SERVICES_LOG, "multiple users associated with the same Farcaster ID" ); return Err(Status::failed_precondition("cannot link Farcaster ID")); } if let Some(u) = get_farcaster_users_response.pop() { if u.0.user_id == user_id { return Ok(Response::new(Empty {})); } else { return Err(Status::already_exists( "farcaster ID already associated with different user", )); } } self .db_client .add_farcaster_id(user_id, message.farcaster_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn unlink_farcaster_account( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .remove_farcaster_id(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } #[tracing::instrument(skip_all)] async fn find_user_identity( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let response = UserIdentityResponse { identity: Some(identifier.into()), }; return Ok(Response::new(response)); } } -// raw device list that can be serialized to JSON (and then signed in the future) -#[derive(serde::Serialize, serde::Deserialize)] -struct RawDeviceList { - devices: Vec, - timestamp: i64, -} - -impl From for RawDeviceList { - fn from(row: DeviceListRow) -> Self { - Self { - devices: row.device_ids, - timestamp: row.timestamp.timestamp_millis(), - } - } -} - -#[derive(serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -struct SignedDeviceList { - /// JSON-stringified [`RawDeviceList`] - raw_device_list: String, - /// Current primary device signature. - /// NOTE: Present only when the payload is received from primary device. - /// It's `None` for Identity-generated device-lists - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - cur_primary_signature: Option, - /// Previous primary device signature. Present only - /// if primary device has changed since last update. - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - last_primary_signature: Option, -} - -impl SignedDeviceList { - /// Serialize (and sign in the future) a [`RawDeviceList`] - fn try_from_raw(raw: RawDeviceList) -> Result { - let stringified_list = serde_json::to_string(&raw).map_err(|err| { - error!( - errorType = error_types::GRPC_SERVICES_LOG, - "Failed to serialize raw device list: {}", err - ); - tonic::Status::failed_precondition("unexpected error") - })?; - - Ok(Self { - raw_device_list: stringified_list, - cur_primary_signature: None, - last_primary_signature: None, - }) - } - - fn as_raw(&self) -> Result { - // The device list payload is sent as an escaped JSON payload. - // Escaped double quotes need to be trimmed before attempting to deserialize - serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) - .map_err(|err| { - warn!("Failed to deserialize raw device list: {}", err); - tonic::Status::invalid_argument("invalid device list payload") - }) - } - - /// Serializes the signed device list to a JSON string - fn as_json_string(&self) -> Result { - serde_json::to_string(self).map_err(|err| { - error!( - errorType = error_types::GRPC_SERVICES_LOG, - "Failed to serialize device list updates: {}", err - ); - tonic::Status::failed_precondition("unexpected error") - }) - } -} - -impl TryFrom for SignedDeviceList { - type Error = tonic::Status; - fn try_from(request: UpdateDeviceListRequest) -> Result { - serde_json::from_str(&request.new_device_list).map_err(|err| { - warn!("Failed to deserialize device list update: {}", err); - tonic::Status::invalid_argument("invalid device list payload") - }) - } -} - -impl TryFrom for DeviceListUpdate { - type Error = tonic::Status; - fn try_from(signed_list: SignedDeviceList) -> Result { - let RawDeviceList { - devices, - timestamp: raw_timestamp, - } = signed_list.as_raw()?; - let timestamp = DateTime::::from_utc_timestamp_millis(raw_timestamp) - .ok_or_else(|| { - error!( - errorType = error_types::GRPC_SERVICES_LOG, - "Failed to parse RawDeviceList timestamp!" - ); - tonic::Status::invalid_argument("invalid timestamp") - })?; - Ok(DeviceListUpdate::new(devices, timestamp)) - } -} - #[derive(Clone, serde::Serialize, serde::Deserialize)] pub struct DeletePasswordUserInfo { pub opaque_server_login: comm_opaque2::server::Login, } fn construct_delete_password_user_info( opaque_server_login: comm_opaque2::server::Login, ) -> DeletePasswordUserInfo { DeletePasswordUserInfo { opaque_server_login, } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn deserialize_device_list_signature() { - let payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; - let payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; - - let list_with_signature: SignedDeviceList = - serde_json::from_str(payload_with_signature).unwrap(); - let list_without_signatures: SignedDeviceList = - serde_json::from_str(payload_without_signatures).unwrap(); - - assert_eq!( - list_with_signature.cur_primary_signature, - Some("foo".to_string()) - ); - assert!(list_with_signature.last_primary_signature.is_none()); - - assert!(list_without_signatures.cur_primary_signature.is_none()); - assert!(list_without_signatures.last_primary_signature.is_none()); - } - - #[test] - fn serialize_device_list_signatures() { - let raw_list = r#"{"devices":["device1"],"timestamp":111111111}"#; - - let expected_payload_without_signatures = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; - let device_list_without_signature = SignedDeviceList { - raw_device_list: raw_list.to_string(), - cur_primary_signature: None, - last_primary_signature: None, - }; - assert_eq!( - device_list_without_signature.as_json_string().unwrap(), - expected_payload_without_signatures - ); - - let expected_payload_with_signature = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}","curPrimarySignature":"foo"}"#; - let device_list_with_cur_signature = SignedDeviceList { - raw_device_list: raw_list.to_string(), - cur_primary_signature: Some("foo".to_string()), - last_primary_signature: None, - }; - assert_eq!( - device_list_with_cur_signature.as_json_string().unwrap(), - expected_payload_with_signature - ); - } - - #[test] - fn serialize_device_list_updates() { - let raw_updates = vec![ - RawDeviceList { - devices: vec!["device1".into()], - timestamp: 111111111, - }, - RawDeviceList { - devices: vec!["device1".into(), "device2".into()], - timestamp: 222222222, - }, - ]; - - let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; - let expected_raw_list2 = - r#"{"devices":["device1","device2"],"timestamp":222222222}"#; - - let signed_updates = raw_updates - .into_iter() - .map(SignedDeviceList::try_from_raw) - .collect::, _>>() - .expect("signing device list updates failed"); - - assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); - assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); - - let stringified_updates = signed_updates - .iter() - .map(serde_json::to_string) - .collect::, _>>() - .expect("serialize signed device lists failed"); - - let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; - let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; - - assert_eq!(stringified_updates[0], expected_stringified_list1); - assert_eq!(stringified_updates[1], expected_stringified_list2); - } - - #[test] - fn deserialize_device_list_update() { - let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; - let request = UpdateDeviceListRequest { - new_device_list: raw_payload.to_string(), - }; - - let signed_list = SignedDeviceList::try_from(request) - .expect("Failed to parse SignedDeviceList"); - let update = DeviceListUpdate::try_from(signed_list) - .expect("Failed to parse DeviceListUpdate from signed list"); - - let expected_timestamp = - DateTime::::from_utc_timestamp_millis(123456789).unwrap(); - - assert_eq!(update.timestamp, expected_timestamp); - assert_eq!( - update.devices, - vec!["device1".to_string(), "device2".to_string()] - ); - } -}