diff --git a/services/identity/src/database.rs b/services/identity/src/database.rs --- a/services/identity/src/database.rs +++ b/services/identity/src/database.rs @@ -441,6 +441,30 @@ .await .map_err(|e| Error::AwsSdk(e.into())) } + + pub async fn get_users(&self) -> Result, Error> { + let scan_output = self + .client + .scan() + .table_name(USERS_TABLE) + .projection_expression(USERS_TABLE_PARTITION_KEY) + .send() + .await + .map_err(|e| Error::AwsSdk(e.into()))?; + + let mut result = Vec::new(); + if let Some(attributes) = scan_output.items { + for mut attribute in attributes { + let id = parse_string_attribute( + USERS_TABLE_PARTITION_KEY, + attribute.remove(USERS_TABLE_PARTITION_KEY), + ) + .map_err(Error::Attribute)?; + result.push(id); + } + } + Ok(result) + } } #[derive( diff --git a/services/identity/src/service.rs b/services/identity/src/service.rs --- a/services/identity/src/service.rs +++ b/services/identity/src/service.rs @@ -13,6 +13,7 @@ use rand::rngs::OsRng; use rand::{CryptoRng, Rng}; use siwe::Message; +use std::collections::HashSet; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -40,10 +41,11 @@ registration_request::Data::PakeRegistrationRequestAndUserId, registration_request::Data::PakeRegistrationUploadAndCredentialRequest, registration_response::Data::PakeLoginResponse as PakeRegistrationLoginResponse, - registration_response::Data::PakeRegistrationResponse, DeleteUserRequest, - DeleteUserResponse, GetUserIdRequest, GetUserIdResponse, - GetUserPublicKeyRequest, GetUserPublicKeyResponse, LoginRequest, - LoginResponse, PakeLoginRequest as PakeLoginRequestStruct, + registration_response::Data::PakeRegistrationResponse, CompareUsersRequest, + CompareUsersResponse, DeleteUserRequest, DeleteUserResponse, + GetUserIdRequest, GetUserIdResponse, GetUserPublicKeyRequest, + GetUserPublicKeyResponse, LoginRequest, LoginResponse, + PakeLoginRequest as PakeLoginRequestStruct, PakeLoginResponse as PakeLoginResponseStruct, RegistrationRequest, RegistrationResponse, VerifyUserTokenRequest, VerifyUserTokenResponse, WalletLoginRequest as WalletLoginRequestStruct, @@ -244,6 +246,29 @@ Err(e) => Err(handle_db_error(e)), } } + + #[instrument(skip(self))] + async fn compare_users( + &self, + request: Request, + ) -> Result, Status> { + let message = request.into_inner(); + let mut mysql_users_vec = message.users; + let mut ddb_users_vec = match self.client.get_users().await { + Ok(user_list) => user_list, + Err(e) => return Err(handle_db_error(e)), + }; + // We use HashSets here for faster lookups + let mysql_users_set = HashSet::::from_iter(mysql_users_vec.clone()); + let ddb_users_set = HashSet::::from_iter(ddb_users_vec.clone()); + + ddb_users_vec.retain(|user| !mysql_users_set.contains(user)); + mysql_users_vec.retain(|user| !ddb_users_set.contains(user)); + Ok(Response::new(CompareUsersResponse { + users_missing_from_keyserver: ddb_users_vec, + users_missing_from_identity: mysql_users_vec, + })) + } } async fn put_token_helper( diff --git a/shared/protos/identity.proto b/shared/protos/identity.proto --- a/shared/protos/identity.proto +++ b/shared/protos/identity.proto @@ -20,6 +20,10 @@ (GetUserPublicKeyResponse) {} rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse) {} + // Called by Ashoat's keyserver with a list of user IDs in MySQL and returns: + // 1. a list of user IDs that are in DynamoDB but not in the supplied list + // 2. a list of user IDs that are in the supplied list but not in DynamoDB + rpc CompareUsers(CompareUsersRequest) returns (CompareUsersResponse) {} } // Helper types @@ -165,3 +169,14 @@ // Need to respond with a message to show success, an // empty reponse should work just fine message DeleteUserResponse {} + +// CompareUsers + +message CompareUsersRequest { + repeated string users = 1; +} + +message CompareUsersResponse { + repeated string usersMissingFromKeyserver = 1; + repeated string usersMissingFromIdentity = 2; +}