diff --git a/keyserver/addons/rust-node-addon/src/delete_user.rs b/keyserver/addons/rust-node-addon/src/delete_user.rs deleted file mode 100644 index eaaf1fc41..000000000 --- a/keyserver/addons/rust-node-addon/src/delete_user.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::identity::identity_service_client::IdentityServiceClient; -use crate::identity::DeleteUserRequest; -use crate::IDENTITY_SERVICE_SOCKET_ADDR; -use napi::bindgen_prelude::{Error, Result, Status}; -use tonic::Request; -use tracing::instrument; - -#[napi] -#[instrument(skip_all)] -pub async fn delete_user(user_id: String) -> Result<()> { - let mut identity_client = - IdentityServiceClient::connect(IDENTITY_SERVICE_SOCKET_ADDR.as_str()) - .await - .map_err(|_| { - Error::new( - Status::GenericFailure, - "Unable to connect to identity service".to_string(), - ) - })?; - - let request = Request::new(DeleteUserRequest { - user_id: user_id.clone(), - }); - identity_client - .delete_user(request) - .await - .map_err(|e| Error::new(Status::GenericFailure, e.to_string()))?; - - Ok(()) -} diff --git a/keyserver/addons/rust-node-addon/src/identity_client/delete_user.rs b/keyserver/addons/rust-node-addon/src/identity_client/delete_user.rs new file mode 100644 index 000000000..8bdd331e3 --- /dev/null +++ b/keyserver/addons/rust-node-addon/src/identity_client/delete_user.rs @@ -0,0 +1,33 @@ +use super::*; + +#[napi] +#[instrument(skip_all)] +pub async fn delete_user(user_id: String) -> Result<()> { + let channel = Channel::from_static(&IDENTITY_SERVICE_SOCKET_ADDR) + .connect() + .await + .map_err(|_| { + Error::new( + Status::GenericFailure, + "Unable to connect to identity service".to_string(), + ) + })?; + let token: MetadataValue<_> = AUTH_TOKEN + .parse() + .map_err(|_| Error::from_status(Status::GenericFailure))?; + let mut identity_client = + IdentityServiceClient::with_interceptor(channel, |mut req: Request<()>| { + req.metadata_mut().insert("authorization", token.clone()); + Ok(req) + }); + + let request = Request::new(DeleteUserRequest { + user_id: user_id.clone(), + }); + identity_client + .delete_user(request) + .await + .map_err(|e| Error::new(Status::GenericFailure, e.to_string()))?; + + Ok(()) +} diff --git a/keyserver/addons/rust-node-addon/src/identity_client/mod.rs b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs new file mode 100644 index 000000000..19e245362 --- /dev/null +++ b/keyserver/addons/rust-node-addon/src/identity_client/mod.rs @@ -0,0 +1,94 @@ +pub mod delete_user; +pub mod register_user; +pub mod identity { + tonic::include_proto!("identity"); +} + +use comm_opaque::Cipher; +use identity::identity_service_client::IdentityServiceClient; +use identity::{ + pake_login_response::Data::AccessToken, + pake_login_response::Data::PakeCredentialResponse, + registration_request::Data::PakeCredentialFinalization as RegistrationPakeCredentialFinalization, + registration_request::Data::PakeRegistrationRequestAndUserId, + registration_request::Data::PakeRegistrationUploadAndCredentialRequest, + registration_response::Data::PakeLoginResponse as RegistrationPakeLoginResponse, + registration_response::Data::PakeRegistrationResponse, DeleteUserRequest, + PakeLoginResponse as PakeLoginResponseStruct, + PakeRegistrationRequestAndUserId as PakeRegistrationRequestAndUserIdStruct, + PakeRegistrationUploadAndCredentialRequest as PakeRegistrationUploadAndCredentialRequestStruct, + RegistrationRequest, RegistrationResponse as RegistrationResponseMessage, + SessionInitializationInfo, +}; +use lazy_static::lazy_static; +use napi::bindgen_prelude::*; +use opaque_ke::{ + ClientLogin, ClientLoginFinishParameters, ClientLoginStartParameters, + ClientLoginStartResult, ClientRegistration, + ClientRegistrationFinishParameters, CredentialFinalization, + CredentialResponse, RegistrationResponse, RegistrationUpload, +}; +use rand::{rngs::OsRng, CryptoRng, Rng}; +use std::collections::HashMap; +use std::env::var; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{metadata::MetadataValue, transport::Channel, Request}; +use tracing::{error, instrument}; + +lazy_static! { + pub static ref IDENTITY_SERVICE_SOCKET_ADDR: String = + var("COMM_IDENTITY_SERVICE_SOCKET_ADDR") + .unwrap_or_else(|_| "https://[::1]:50051".to_string()); + pub static ref AUTH_TOKEN: String = var("COMM_IDENTITY_SERVICE_AUTH_TOKEN") + .unwrap_or_else(|_| "test".to_string()); +} + +fn handle_unexpected_response(message: Option) -> Error { + error!("Received an unexpected message: {:?}", message); + Error::from_status(Status::GenericFailure) +} + +async fn send_to_mpsc(tx: mpsc::Sender, request: T) -> Result<()> { + if let Err(e) = tx.send(request).await { + error!("Response was dropped: {}", e); + return Err(Error::from_status(Status::GenericFailure)); + } + Ok(()) +} + +fn pake_login_start( + rng: &mut (impl Rng + CryptoRng), + password: &str, +) -> Result> { + ClientLogin::::start( + rng, + password.as_bytes(), + ClientLoginStartParameters::default(), + ) + .map_err(|e| { + error!("Failed to start PAKE login: {}", e); + Error::from_status(Status::GenericFailure) + }) +} + +fn pake_login_finish( + credential_response_bytes: &[u8], + client_login: ClientLogin, +) -> Result> { + client_login + .finish( + CredentialResponse::deserialize(credential_response_bytes).map_err( + |e| { + error!("Could not deserialize credential response bytes: {}", e); + Error::from_status(Status::GenericFailure) + }, + )?, + ClientLoginFinishParameters::default(), + ) + .map_err(|e| { + error!("Failed to finish PAKE login: {}", e); + Error::from_status(Status::GenericFailure) + }) + .map(|res| res.message) +} diff --git a/keyserver/addons/rust-node-addon/src/identity_client.rs b/keyserver/addons/rust-node-addon/src/identity_client/register_user.rs similarity index 70% rename from keyserver/addons/rust-node-addon/src/identity_client.rs rename to keyserver/addons/rust-node-addon/src/identity_client/register_user.rs index dda609bc9..edba3f1a8 100644 --- a/keyserver/addons/rust-node-addon/src/identity_client.rs +++ b/keyserver/addons/rust-node-addon/src/identity_client/register_user.rs @@ -1,308 +1,235 @@ -use crate::identity::identity_service_client::IdentityServiceClient; -use crate::identity::{ - pake_login_response::Data::AccessToken, - pake_login_response::Data::PakeCredentialResponse, - registration_request::Data::PakeCredentialFinalization as RegistrationPakeCredentialFinalization, - registration_request::Data::PakeRegistrationRequestAndUserId, - registration_request::Data::PakeRegistrationUploadAndCredentialRequest, - registration_response::Data::PakeLoginResponse as RegistrationPakeLoginResponse, - registration_response::Data::PakeRegistrationResponse, - PakeLoginResponse as PakeLoginResponseStruct, - PakeRegistrationRequestAndUserId as PakeRegistrationRequestAndUserIdStruct, - PakeRegistrationUploadAndCredentialRequest as PakeRegistrationUploadAndCredentialRequestStruct, - RegistrationRequest, RegistrationResponse as RegistrationResponseMessage, - SessionInitializationInfo, -}; -use crate::{AUTH_TOKEN, IDENTITY_SERVICE_SOCKET_ADDR}; -use comm_opaque::Cipher; -use napi::bindgen_prelude::*; -use opaque_ke::{ - ClientLogin, ClientLoginFinishParameters, ClientLoginStartParameters, - ClientLoginStartResult, ClientRegistration, - ClientRegistrationFinishParameters, CredentialFinalization, - CredentialResponse, RegistrationResponse, RegistrationUpload, -}; -use rand::{rngs::OsRng, CryptoRng, Rng}; -use std::collections::HashMap; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{metadata::MetadataValue, transport::Channel, Request}; -use tracing::{error, instrument}; +use super::*; #[napi] #[instrument(skip_all)] pub async fn register_user( user_id: String, signing_public_key: String, username: String, password: String, session_initialization_info: HashMap, ) -> Result { let channel = Channel::from_static(&IDENTITY_SERVICE_SOCKET_ADDR) .connect() .await - .map_err(|_| Error::from_status(Status::GenericFailure))?; + .map_err(|_| { + Error::new( + Status::GenericFailure, + "Unable to connect to identity service".to_string(), + ) + })?; let token: MetadataValue<_> = AUTH_TOKEN .parse() .map_err(|_| Error::from_status(Status::GenericFailure))?; let mut identity_client = IdentityServiceClient::with_interceptor(channel, |mut req: Request<()>| { req.metadata_mut().insert("authorization", token.clone()); Ok(req) }); // Create a RegistrationRequest channel and use ReceiverStream to turn the // MPSC receiver into a Stream for outbound messages let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); let request = Request::new(stream); // `response` is the Stream for inbound messages let mut response = identity_client .register_user(request) .await .map_err(|_| Error::from_status(Status::GenericFailure))? .into_inner(); // Start PAKE registration on client and send initial registration request // to Identity service let mut client_rng = OsRng; let (registration_request, client_registration) = pake_registration_start( &mut client_rng, user_id, signing_public_key, &password, username, SessionInitializationInfo { info: session_initialization_info, }, )?; send_to_mpsc(tx.clone(), registration_request).await?; // Handle responses from Identity service sequentially, making sure we get // messages in the correct order // Finish PAKE registration and begin PAKE login; send the final // registration request and initial login request together to reduce the // number of trips let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; let client_login = handle_registration_response( message, &mut client_rng, client_registration, &password, tx.clone(), ) .await?; // Finish PAKE login; send final login request to Identity service let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; handle_registration_credential_response(message, client_login, tx) .await .map_err(|_| Error::from_status(Status::GenericFailure))?; // Return access token let message = response .message() .await .map_err(|_| Error::from_status(Status::GenericFailure))?; handle_registration_token_response(message) } -fn handle_unexpected_response(message: Option) -> Error { - error!("Received an unexpected message: {:?}", message); - Error::from_status(Status::GenericFailure) -} - -async fn send_to_mpsc(tx: mpsc::Sender, request: T) -> Result<()> { - if let Err(e) = tx.send(request).await { - error!("Response was dropped: {}", e); - return Err(Error::from_status(Status::GenericFailure)); - } - Ok(()) -} - -fn pake_login_start( - rng: &mut (impl Rng + CryptoRng), - password: &str, -) -> Result> { - ClientLogin::::start( - rng, - password.as_bytes(), - ClientLoginStartParameters::default(), - ) - .map_err(|e| { - error!("Failed to start PAKE login: {}", e); - Error::from_status(Status::GenericFailure) - }) -} - -fn pake_login_finish( - credential_response_bytes: &[u8], - client_login: ClientLogin, -) -> Result> { - client_login - .finish( - CredentialResponse::deserialize(credential_response_bytes).map_err( - |e| { - error!("Could not deserialize credential response bytes: {}", e); - Error::from_status(Status::GenericFailure) - }, - )?, - ClientLoginFinishParameters::default(), - ) - .map_err(|e| { - error!("Failed to finish PAKE login: {}", e); - Error::from_status(Status::GenericFailure) - }) - .map(|res| res.message) -} - -fn pake_registration_start( - rng: &mut (impl Rng + CryptoRng), - user_id: String, - signing_public_key: String, - password: &str, - username: String, - session_initialization_info: SessionInitializationInfo, -) -> Result<(RegistrationRequest, ClientRegistration)> { - let client_registration_start_result = - ClientRegistration::::start(rng, password.as_bytes()).map_err( - |e| { - error!("Failed to start PAKE registration: {}", e); - Error::from_status(Status::GenericFailure) - }, - )?; - let pake_registration_request = - client_registration_start_result.message.serialize(); - Ok(( - RegistrationRequest { - data: Some(PakeRegistrationRequestAndUserId( - PakeRegistrationRequestAndUserIdStruct { - user_id, - pake_registration_request, - username, - signing_public_key, - session_initialization_info: Some(session_initialization_info), - }, - )), - }, - client_registration_start_result.state, - )) -} - async fn handle_registration_response( message: Option, client_rng: &mut (impl Rng + CryptoRng), client_registration: ClientRegistration, password: &str, tx: mpsc::Sender, ) -> Result> { if let Some(RegistrationResponseMessage { data: Some(PakeRegistrationResponse(registration_response_bytes)), .. }) = message { let pake_registration_upload = pake_registration_finish( client_rng, ®istration_response_bytes, client_registration, )? .serialize(); let client_login_start_result = pake_login_start(client_rng, password)?; // `registration_request` is a gRPC message containing serialized bytes to // complete PAKE registration and begin PAKE login let registration_request = RegistrationRequest { data: Some(PakeRegistrationUploadAndCredentialRequest( PakeRegistrationUploadAndCredentialRequestStruct { pake_registration_upload, pake_credential_request: client_login_start_result .message .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Error::from_status(Status::GenericFailure) })?, }, )), }; send_to_mpsc(tx, registration_request).await?; Ok(client_login_start_result.state) } else { Err(handle_unexpected_response(message)) } } async fn handle_registration_credential_response( message: Option, client_login: ClientLogin, tx: mpsc::Sender, ) -> Result<()> { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(PakeCredentialResponse(credential_response_bytes)), })), }) = message { let registration_request = RegistrationRequest { data: Some(RegistrationPakeCredentialFinalization( pake_login_finish(&credential_response_bytes, client_login)? .serialize() .map_err(|e| { error!("Could not serialize credential request: {}", e); Error::from_status(Status::GenericFailure) })?, )), }; send_to_mpsc(tx, registration_request).await } else { Err(handle_unexpected_response(message)) } } fn handle_registration_token_response( message: Option, ) -> Result { if let Some(RegistrationResponseMessage { data: Some(RegistrationPakeLoginResponse(PakeLoginResponseStruct { data: Some(AccessToken(access_token)), })), }) = message { Ok(access_token) } else { Err(handle_unexpected_response(message)) } } +fn pake_registration_start( + rng: &mut (impl Rng + CryptoRng), + user_id: String, + signing_public_key: String, + password: &str, + username: String, + session_initialization_info: SessionInitializationInfo, +) -> Result<(RegistrationRequest, ClientRegistration)> { + let client_registration_start_result = + ClientRegistration::::start(rng, password.as_bytes()).map_err( + |e| { + error!("Failed to start PAKE registration: {}", e); + Error::from_status(Status::GenericFailure) + }, + )?; + let pake_registration_request = + client_registration_start_result.message.serialize(); + Ok(( + RegistrationRequest { + data: Some(PakeRegistrationRequestAndUserId( + PakeRegistrationRequestAndUserIdStruct { + user_id, + pake_registration_request, + username, + signing_public_key, + session_initialization_info: Some(session_initialization_info), + }, + )), + }, + client_registration_start_result.state, + )) +} + fn pake_registration_finish( rng: &mut (impl Rng + CryptoRng), registration_response_bytes: &[u8], client_registration: ClientRegistration, ) -> Result> { client_registration .finish( rng, RegistrationResponse::deserialize(registration_response_bytes).map_err( |e| { error!("Could not deserialize registration response bytes: {}", e); Error::from_status(Status::GenericFailure) }, )?, ClientRegistrationFinishParameters::default(), ) .map_err(|e| { error!("Failed to finish PAKE registration: {}", e); Error::from_status(Status::GenericFailure) }) .map(|res| res.message) } diff --git a/keyserver/addons/rust-node-addon/src/lib.rs b/keyserver/addons/rust-node-addon/src/lib.rs index 7371ad075..0de42eacc 100644 --- a/keyserver/addons/rust-node-addon/src/lib.rs +++ b/keyserver/addons/rust-node-addon/src/lib.rs @@ -1,20 +1,5 @@ -pub mod delete_user; pub mod identity_client; -pub mod identity { - tonic::include_proto!("identity"); -} pub mod tunnelbroker_client; #[macro_use] extern crate napi_derive; - -use lazy_static::lazy_static; -use std::env::var; - -lazy_static! { - pub static ref IDENTITY_SERVICE_SOCKET_ADDR: String = - var("COMM_IDENTITY_SERVICE_SOCKET_ADDR") - .unwrap_or_else(|_| "https://[::1]:50051".to_string()); - pub static ref AUTH_TOKEN: String = var("COMM_IDENTITY_SERVICE_AUTH_TOKEN") - .unwrap_or_else(|_| "test".to_string()); -} diff --git a/keyserver/addons/rust-node-addon/src/tunnelbroker_client.rs b/keyserver/addons/rust-node-addon/src/tunnelbroker_client.rs index 6f8231980..1bc71c32c 100644 --- a/keyserver/addons/rust-node-addon/src/tunnelbroker_client.rs +++ b/keyserver/addons/rust-node-addon/src/tunnelbroker_client.rs @@ -1,94 +1,97 @@ use lazy_static::lazy_static; use napi::threadsafe_function::{ ThreadsafeFunction, ThreadsafeFunctionCallMode, }; use napi_derive::napi; use std::env::var; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::Request; use tracing::error; use tunnelbroker::tunnelbroker_pb::message_to_client::Data::MessagesToDeliver; use tunnelbroker_client as tunnelbroker; lazy_static! { static ref TUNNELBROKER_SERVICE_ADDR: String = var("COMM_TUNNELBROKER_SERVICE_ADDR") - .unwrap_or("https://[::1]:50051".to_string()); + .unwrap_or_else(|_| "https://[::1]:50051".to_string()); } #[napi] pub struct TunnelbrokerClient { tx: mpsc::Sender, } #[napi] impl TunnelbrokerClient { #[napi(constructor)] pub fn new( device_id: String, on_receive_callback: ThreadsafeFunction, ) -> Self { let mut client = tunnelbroker::initialize_client(TUNNELBROKER_SERVICE_ADDR.to_string()); let (tx, rx) = mpsc::channel(1); let stream = ReceiverStream::new(rx); // Spawning asynchronous Tokio task for handling incoming messages from the client // and calling the callback function with the received payload tunnelbroker::RUNTIME.spawn({ async move { let mut request = Request::new(stream); request.metadata_mut().insert( "deviceid", device_id.parse().expect("Failed to parse deviceID"), ); let response = client .messages_stream(request) .await .expect("Failed to receive messages stream from Tunnelbroker"); let mut resp_stream = response.into_inner(); while let Some(received) = resp_stream.next().await { if let Some(message_data) = received.expect("Error on getting messages data").data { match message_data { MessagesToDeliver(messages_to_send) => { for message in messages_to_send.messages { on_receive_callback.call( Ok(message.payload), ThreadsafeFunctionCallMode::NonBlocking, ); } } _ => { error!("Received an unexpected message type"); } } } } } }); TunnelbrokerClient { tx } } #[napi] pub async fn publish( &self, to_device_id: String, payload: String, ) -> napi::Result<()> { let messages = vec![tunnelbroker::tunnelbroker_pb::MessageToTunnelbrokerStruct { to_device_id, payload, blob_hashes: vec![], }]; - if let Err(_) = tunnelbroker::publish_messages(&self.tx, messages).await { + if tunnelbroker::publish_messages(&self.tx, messages) + .await + .is_err() + { return Err(napi::Error::from_status(napi::Status::GenericFailure)); } Ok(()) } }