diff --git a/services/identity/src/database/farcaster.rs b/services/identity/src/database/farcaster.rs index 6ec30381d..c66a1b4f4 100644 --- a/services/identity/src/database/farcaster.rs +++ b/services/identity/src/database/farcaster.rs @@ -1,88 +1,110 @@ use comm_lib::aws::ddb::types::AttributeValue; use comm_lib::database::AttributeExtractor; use comm_lib::database::AttributeMap; use comm_lib::database::DBItemAttributeError; use comm_lib::database::DBItemError; use comm_lib::database::Value; use tracing::error; use crate::constants::USERS_TABLE; use crate::constants::USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME; use crate::constants::USERS_TABLE_FARCASTER_ID_INDEX; use crate::constants::USERS_TABLE_PARTITION_KEY; use crate::constants::USERS_TABLE_USERNAME_ATTRIBUTE; use crate::constants::USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE; use crate::grpc_services::protos::unauth::FarcasterUser; use super::DatabaseClient; use super::Error; pub struct FarcasterUserData(pub FarcasterUser); impl DatabaseClient { pub async fn get_farcaster_users( &self, farcaster_ids: Vec, ) -> Result, Error> { let mut users: Vec = Vec::new(); for id in farcaster_ids { let query_response = self .client .query() .table_name(USERS_TABLE) .index_name(USERS_TABLE_FARCASTER_ID_INDEX) .key_condition_expression(format!( "{} = :val", USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME )) .expression_attribute_values(":val", AttributeValue::S(id)) .send() .await .map_err(|e| { error!("Failed to query users by farcasterID: {:?}", e); Error::AwsSdk(e.into()) })? .items .and_then(|mut items| items.pop()) .map(FarcasterUserData::try_from) .transpose() .map_err(Error::from)?; if let Some(data) = query_response { users.push(data); } } Ok(users) } + + pub async fn add_farcaster_id( + &self, + user_id: String, + farcaster_id: String, + ) -> Result<(), Error> { + let update_expression = + format!("SET {} = :val", USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME); + + self + .client + .update_item() + .table_name(USERS_TABLE) + .key(USERS_TABLE_PARTITION_KEY, AttributeValue::S(user_id)) + .update_expression(update_expression) + .expression_attribute_values(":val", AttributeValue::S(farcaster_id)) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))?; + + Ok(()) + } } impl TryFrom for FarcasterUserData { type Error = DBItemError; fn try_from(mut attrs: AttributeMap) -> Result { let user_id = attrs.take_attr(USERS_TABLE_PARTITION_KEY)?; let maybe_username = attrs.take_attr(USERS_TABLE_USERNAME_ATTRIBUTE)?; let maybe_wallet_address = attrs.take_attr(USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE)?; let username = match (maybe_username, maybe_wallet_address) { (Some(u), _) => u, (_, Some(w)) => w, (_, _) => { return Err(DBItemError { attribute_name: USERS_TABLE_USERNAME_ATTRIBUTE.to_string(), attribute_value: Value::AttributeValue(None), attribute_error: DBItemAttributeError::Missing, }); } }; let farcaster_id = attrs.take_attr(USERS_TABLE_FARCASTER_ID_ATTRIBUTE_NAME)?; Ok(Self(FarcasterUser { user_id, username, farcaster_id, })) } } diff --git a/services/identity/src/grpc_services/authenticated.rs b/services/identity/src/grpc_services/authenticated.rs index 14085db59..29a48511b 100644 --- a/services/identity/src/grpc_services/authenticated.rs +++ b/services/identity/src/grpc_services/authenticated.rs @@ -1,544 +1,554 @@ use std::collections::HashMap; use crate::config::CONFIG; use crate::database::{DeviceListRow, DeviceListUpdate}; use crate::{ client_service::{handle_db_error, UpdateState, WorkflowInProgress}, constants::request_metadata, database::DatabaseClient, ddb_utils::DateTimeExt, grpc_services::shared::get_value, }; use chrono::{DateTime, Utc}; use comm_opaque2::grpc::protocol_error_to_grpc_status; use tonic::{Request, Response, Status}; use tracing::{debug, error, warn}; use super::protos::auth::{ identity, identity_client_service_server::IdentityClientService, GetDeviceListRequest, GetDeviceListResponse, Identity, InboundKeyInfo, InboundKeysForUserRequest, InboundKeysForUserResponse, KeyserverKeysResponse, LinkFarcasterAccountRequest, OutboundKeyInfo, OutboundKeysForUserRequest, OutboundKeysForUserResponse, RefreshUserPrekeysRequest, UpdateDeviceListRequest, UpdateUserPasswordFinishRequest, UpdateUserPasswordStartRequest, UpdateUserPasswordStartResponse, UploadOneTimeKeysRequest, }; use super::protos::unauth::Empty; #[derive(derive_more::Constructor)] pub struct AuthenticatedService { db_client: DatabaseClient, } fn get_auth_info(req: &Request<()>) -> Option<(String, String, String)> { debug!("Retrieving auth info for request: {:?}", req); let user_id = get_value(req, request_metadata::USER_ID)?; let device_id = get_value(req, request_metadata::DEVICE_ID)?; let access_token = get_value(req, request_metadata::ACCESS_TOKEN)?; Some((user_id, device_id, access_token)) } pub fn auth_interceptor( req: Request<()>, db_client: &DatabaseClient, ) -> Result, Status> { debug!("Intercepting request to check auth info: {:?}", req); let (user_id, device_id, access_token) = get_auth_info(&req) .ok_or_else(|| Status::unauthenticated("Missing credentials"))?; let handle = tokio::runtime::Handle::current(); let new_db_client = db_client.clone(); // This function cannot be `async`, yet must call the async db call // Force tokio to resolve future in current thread without an explicit .await let valid_token = tokio::task::block_in_place(move || { handle .block_on(new_db_client.verify_access_token( user_id, device_id, access_token, )) .map_err(handle_db_error) })?; if !valid_token { return Err(Status::aborted("Bad Credentials")); } Ok(req) } pub fn get_user_and_device_id( request: &Request, ) -> Result<(String, String), Status> { let user_id = get_value(request, request_metadata::USER_ID) .ok_or_else(|| Status::unauthenticated("Missing user_id field"))?; let device_id = get_value(request, request_metadata::DEVICE_ID) .ok_or_else(|| Status::unauthenticated("Missing device_id field"))?; Ok((user_id, device_id)) } #[tonic::async_trait] impl IdentityClientService for AuthenticatedService { async fn refresh_user_prekeys( &self, request: Request, ) -> Result, Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Refreshing prekeys for user: {}", user_id); let content_keys = message .new_content_prekeys .ok_or_else(|| Status::invalid_argument("Missing content keys"))?; let notif_keys = message .new_notif_prekeys .ok_or_else(|| Status::invalid_argument("Missing notification keys"))?; self .db_client .update_device_prekeys( user_id, device_id, content_keys.into(), notif_keys.into(), ) .await .map_err(handle_db_error)?; let response = Response::new(Empty {}); Ok(response) } async fn get_outbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, true) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, OutboundKeyInfo::from(device_info))) .collect::>(); Ok(tonic::Response::new(OutboundKeysForUserResponse { devices: transformed_devices, })) } async fn get_inbound_keys_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { use identity::IdentityInfo; let message = request.into_inner(); let user_id = &message.user_id; let devices_map = self .db_client .get_keys_for_user(user_id, false) .await .map_err(handle_db_error)? .ok_or_else(|| tonic::Status::not_found("user not found"))?; let transformed_devices = devices_map .into_iter() .map(|(key, device_info)| (key, InboundKeyInfo::from(device_info))) .collect::>(); let identifier = self .db_client .get_user_identifier(user_id) .await .map_err(handle_db_error)?; let identity_info = IdentityInfo::try_from(identifier)?; Ok(tonic::Response::new(InboundKeysForUserResponse { devices: transformed_devices, identity: Some(Identity { identity_info: Some(identity_info), }), })) } async fn get_keyserver_keys( &self, request: Request, ) -> Result, Status> { use identity::IdentityInfo; let message = request.into_inner(); let keyserver_info = self .db_client .get_keyserver_keys_for_user(&message.user_id) .await .map_err(handle_db_error)? .map(OutboundKeyInfo::from); let identifier = self .db_client .get_user_identifier(&message.user_id) .await .map_err(handle_db_error)?; let identity_info = IdentityInfo::try_from(identifier)?; let identity = Some(Identity { identity_info: Some(identity_info), }); let response = Response::new(KeyserverKeysResponse { keyserver_info, identity, }); return Ok(response); } async fn upload_one_time_keys( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; let message = request.into_inner(); debug!("Attempting to update one time keys for user: {}", user_id); self .db_client .append_one_time_prekeys( device_id, message.content_one_time_prekeys, message.notif_one_time_prekeys, ) .await .map_err(handle_db_error)?; Ok(tonic::Response::new(Empty {})) } async fn update_user_password_start( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; let message = request.into_inner(); let server_registration = comm_opaque2::server::Registration::new(); let server_message = server_registration .start( &CONFIG.server_setup, &message.opaque_registration_request, user_id.as_bytes(), ) .map_err(protocol_error_to_grpc_status)?; let update_state = UpdateState { user_id }; let session_id = self .db_client .insert_workflow(WorkflowInProgress::Update(update_state)) .await .map_err(handle_db_error)?; let response = UpdateUserPasswordStartResponse { session_id, opaque_registration_response: server_message, }; Ok(Response::new(response)) } async fn update_user_password_finish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let message = request.into_inner(); let Some(WorkflowInProgress::Update(state)) = self .db_client .get_workflow(message.session_id) .await .map_err(handle_db_error)? else { return Err(tonic::Status::not_found("session not found")); }; let server_registration = comm_opaque2::server::Registration::new(); let password_file = server_registration .finish(&message.opaque_registration_upload) .map_err(protocol_error_to_grpc_status)?; self .db_client .update_user_password(state.user_id, password_file) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn log_out_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, device_id) = get_user_and_device_id(&request)?; self .db_client .remove_device(&user_id, &device_id) .await .map_err(handle_db_error)?; self .db_client .delete_access_token_data(user_id, device_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn delete_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _) = get_user_and_device_id(&request)?; self .db_client .delete_user(user_id) .await .map_err(handle_db_error)?; let response = Empty {}; Ok(Response::new(response)) } async fn get_device_list_for_user( &self, request: tonic::Request, ) -> Result, tonic::Status> { let GetDeviceListRequest { user_id, since_timestamp, } = request.into_inner(); let since = since_timestamp .map(|timestamp| { DateTime::::from_utc_timestamp_millis(timestamp) .ok_or_else(|| tonic::Status::invalid_argument("Invalid timestamp")) }) .transpose()?; let mut db_result = self .db_client .get_device_list_history(user_id, since) .await .map_err(handle_db_error)?; // these should be sorted already, but just in case db_result.sort_by_key(|list| list.timestamp); let device_list_updates: Vec = db_result .into_iter() .map(RawDeviceList::from) .map(SignedDeviceList::try_from_raw) .collect::, _>>()?; let stringified_updates = device_list_updates .iter() .map(serde_json::to_string) .collect::, _>>() .map_err(|err| { error!("Failed to serialize device list updates: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Response::new(GetDeviceListResponse { device_list_updates: stringified_updates, })) } async fn update_device_list( &self, request: tonic::Request, ) -> Result, tonic::Status> { let (user_id, _device_id) = get_user_and_device_id(&request)?; // TODO: when we stop doing "primary device rotation" (migration procedure) // we should verify if this RPC is called by primary device only let new_list = SignedDeviceList::try_from(request.into_inner())?; let update = DeviceListUpdate::try_from(new_list)?; self .db_client .apply_devicelist_update(&user_id, update) .await .map_err(handle_db_error)?; Ok(Response::new(Empty {})) } async fn link_farcaster_account( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - unimplemented!(); + let (user_id, _) = get_user_and_device_id(&request)?; + let message = request.into_inner(); + + self + .db_client + .add_farcaster_id(user_id, message.farcaster_id) + .await + .map_err(handle_db_error)?; + + let response = Empty {}; + Ok(Response::new(response)) } } // raw device list that can be serialized to JSON (and then signed in the future) #[derive(serde::Serialize, serde::Deserialize)] struct RawDeviceList { devices: Vec, timestamp: i64, } impl From for RawDeviceList { fn from(row: DeviceListRow) -> Self { Self { devices: row.device_ids, timestamp: row.timestamp.timestamp_millis(), } } } #[derive(serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct SignedDeviceList { /// JSON-stringified [`RawDeviceList`] raw_device_list: String, } impl SignedDeviceList { /// Serialize (and sign in the future) a [`RawDeviceList`] fn try_from_raw(raw: RawDeviceList) -> Result { let stringified_list = serde_json::to_string(&raw).map_err(|err| { error!("Failed to serialize raw device list: {}", err); tonic::Status::failed_precondition("unexpected error") })?; Ok(Self { raw_device_list: stringified_list, }) } fn as_raw(&self) -> Result { // The device list payload is sent as an escaped JSON payload. // Escaped double quotes need to be trimmed before attempting to deserialize serde_json::from_str(&self.raw_device_list.replace(r#"\""#, r#"""#)) .map_err(|err| { warn!("Failed to deserialize raw device list: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } } impl TryFrom for SignedDeviceList { type Error = tonic::Status; fn try_from(request: UpdateDeviceListRequest) -> Result { serde_json::from_str(&request.new_device_list).map_err(|err| { warn!("Failed to deserialize device list update: {}", err); tonic::Status::invalid_argument("invalid device list payload") }) } } impl TryFrom for DeviceListUpdate { type Error = tonic::Status; fn try_from(signed_list: SignedDeviceList) -> Result { let RawDeviceList { devices, timestamp: raw_timestamp, } = signed_list.as_raw()?; let timestamp = DateTime::::from_utc_timestamp_millis(raw_timestamp) .ok_or_else(|| { error!("Failed to parse RawDeviceList timestamp!"); tonic::Status::invalid_argument("invalid timestamp") })?; Ok(DeviceListUpdate::new(devices, timestamp)) } } #[cfg(test)] mod tests { use super::*; #[test] fn serialize_device_list_updates() { let raw_updates = vec![ RawDeviceList { devices: vec!["device1".into()], timestamp: 111111111, }, RawDeviceList { devices: vec!["device1".into(), "device2".into()], timestamp: 222222222, }, ]; let expected_raw_list1 = r#"{"devices":["device1"],"timestamp":111111111}"#; let expected_raw_list2 = r#"{"devices":["device1","device2"],"timestamp":222222222}"#; let signed_updates = raw_updates .into_iter() .map(SignedDeviceList::try_from_raw) .collect::, _>>() .expect("signing device list updates failed"); assert_eq!(signed_updates[0].raw_device_list, expected_raw_list1); assert_eq!(signed_updates[1].raw_device_list, expected_raw_list2); let stringified_updates = signed_updates .iter() .map(serde_json::to_string) .collect::, _>>() .expect("serialize signed device lists failed"); let expected_stringified_list1 = r#"{"rawDeviceList":"{\"devices\":[\"device1\"],\"timestamp\":111111111}"}"#; let expected_stringified_list2 = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":222222222}"}"#; assert_eq!(stringified_updates[0], expected_stringified_list1); assert_eq!(stringified_updates[1], expected_stringified_list2); } #[test] fn deserialize_device_list_update() { let raw_payload = r#"{"rawDeviceList":"{\"devices\":[\"device1\",\"device2\"],\"timestamp\":123456789}"}"#; let request = UpdateDeviceListRequest { new_device_list: raw_payload.to_string(), }; let signed_list = SignedDeviceList::try_from(request) .expect("Failed to parse SignedDeviceList"); let update = DeviceListUpdate::try_from(signed_list) .expect("Failed to parse DeviceListUpdate from signed list"); let expected_timestamp = DateTime::::from_utc_timestamp_millis(123456789).unwrap(); assert_eq!(update.timestamp, expected_timestamp); assert_eq!( update.devices, vec!["device1".to_string(), "device2".to_string()] ); } }