diff --git a/services/commtest/build.rs b/services/commtest/build.rs index 5772aa137..81c04589b 100644 --- a/services/commtest/build.rs +++ b/services/commtest/build.rs @@ -1,21 +1,21 @@ use std::fs; use std::io::Error; -const PROTO_DIR: &'static str = "../../shared/protos"; +const PROTO_DIR: &str = "../../shared/protos"; fn main() -> Result<(), Error> { let proto_files = fs::read_dir(PROTO_DIR)?; for path in proto_files { let filename: String = path?.file_name().to_string_lossy().to_string(); // Avoid passing non protobuf files to tonic if !filename.ends_with(".proto") { continue; } println!("Compiling protobuf file: {}", filename); println!("cargo:rerun-if-changed={}/{}", PROTO_DIR, filename); tonic_build::compile_protos(format!("{}/{}", PROTO_DIR, filename))?; } Ok(()) } diff --git a/services/commtest/src/identity/device.rs b/services/commtest/src/identity/device.rs index 1a8fa2e0a..20e29abb4 100644 --- a/services/commtest/src/identity/device.rs +++ b/services/commtest/src/identity/device.rs @@ -1,93 +1,93 @@ use comm_opaque2::client::Registration; use rand::{distributions::Alphanumeric, Rng}; mod proto { tonic::include_proto!("identity.client"); } use proto as client; use proto::{ identity_client_service_client::IdentityClientServiceClient, DeviceKeyUpload, IdentityKeyInfo, PreKey, RegistrationFinishRequest, RegistrationStartRequest, }; pub struct DeviceInfo { pub username: String, pub user_id: String, pub device_id: String, pub access_token: String, } pub async fn create_device() -> DeviceInfo { let password = "pass"; let username: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); // TODO: Generate dynamic valid olm account info let example_payload = r#"{\"notificationIdentityPublicKeys\":{\"curve25519\":\"DYmV8VdkjwG/VtC8C53morogNJhpTPT/4jzW0/cxzQo\",\"ed25519\":\"D0BV2Y7Qm36VUtjwyQTJJWYAycN7aMSJmhEsRJpW2mk\"},\"primaryIdentityPublicKeys\":{\"curve25519\":\"Y4ZIqzpE1nv83kKGfvFP6rifya0itRg2hifqYtsISnk\",\"ed25519\":\"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg\"}}"#; // The ed25519 value from the olm payload let device_id = r#"cSlL+VLLJDgtKSPlIwoCZg0h0EmHlQoJC08uV/O+jvg"#; let mut client_registration = Registration::new(); let opaque_registration_request = - client_registration.start(&password).unwrap(); + client_registration.start(password).unwrap(); let registration_start_request = RegistrationStartRequest { opaque_registration_request, username: username.to_string(), device_key_upload: Some(DeviceKeyUpload { device_key_info: Some(IdentityKeyInfo { payload: example_payload.to_string(), payload_signature: "foo".to_string(), social_proof: None, }), content_upload: Some(PreKey { pre_key: "content_prekey".to_string(), pre_key_signature: "content_prekey_sig".to_string(), }), notif_upload: Some(PreKey { pre_key: "notif_prekey".to_string(), pre_key_signature: "notif_prekey_sig".to_string(), }), one_time_content_prekeys: Vec::new(), one_time_notif_prekeys: Vec::new(), device_type: client::DeviceType::Keyserver.into(), }), }; // TODO: allow endpoint to be configured let mut identity_client = IdentityClientServiceClient::connect("http://127.0.0.1:50054") .await .expect("Couldn't connect to identitiy service"); let registration_start_response = identity_client .register_password_user_start(registration_start_request) .await .unwrap() .into_inner(); let opaque_registration_upload = client_registration .finish( - &password, + password, ®istration_start_response.opaque_registration_response, ) .unwrap(); let registration_finish_request = RegistrationFinishRequest { session_id: registration_start_response.session_id, opaque_registration_upload, }; let registration_finish_response = identity_client .register_password_user_finish(registration_finish_request) .await .unwrap() .into_inner(); - return DeviceInfo { + DeviceInfo { username: username.to_string(), device_id: device_id.to_string(), user_id: registration_finish_response.user_id, access_token: registration_finish_response.access_token, - }; + } } diff --git a/services/commtest/src/tools.rs b/services/commtest/src/tools.rs index 73a47fec4..642bb431e 100644 --- a/services/commtest/src/tools.rs +++ b/services/commtest/src/tools.rs @@ -1,64 +1,70 @@ use hex::ToHex; use num_cpus; use sha2::{Digest, Sha512}; use std::env; use url::ParseError; pub fn generate_stable_nbytes( number_of_bytes: usize, predefined_byte_value: Option, ) -> Vec { let byte_value = predefined_byte_value.unwrap_or(b'A'); - return vec![byte_value; number_of_bytes]; + vec![byte_value; number_of_bytes] } #[derive( Debug, derive_more::Display, derive_more::From, derive_more::Error, )] pub enum Error { #[display(...)] Proto(std::io::Error), #[display(...)] Tonic(tonic::transport::Error), #[display(...)] TonicStatus(tonic::Status), #[display(...)] Reqwest(reqwest::Error), #[display(fmt = "HTTP status: {:?}.", _0)] HttpStatus(#[error(ignore)] reqwest::StatusCode), #[display(...)] ParseError(ParseError), #[display(...)] JsonError(serde_json::error::Error), #[display(...)] FromUtf8Error(std::string::FromUtf8Error), } pub fn obtain_number_of_threads() -> usize { let number_of_threads_str: String = env::var("COMM_NUMBER_OF_THREADS").unwrap(); if number_of_threads_str.is_empty() { return num_cpus::get(); } - return number_of_threads_str.parse::().unwrap(); + number_of_threads_str.parse::().unwrap() } pub struct DataHasher { hasher: Sha512, } impl DataHasher { pub fn new() -> DataHasher { - return DataHasher { + DataHasher { hasher: Sha512::new(), - }; + } } pub fn update(data_hasher: &mut DataHasher, bytes: Vec) { data_hasher.hasher.update(bytes); } pub fn get_hash(self) -> String { let hash = self.hasher.finalize(); - return hash.encode_hex::(); + hash.encode_hex::() + } +} + +impl Default for DataHasher { + fn default() -> Self { + Self::new() } } diff --git a/services/commtest/tests/identity_tunnelbroker_tests.rs b/services/commtest/tests/identity_tunnelbroker_tests.rs index 27f44683e..3a05ae329 100644 --- a/services/commtest/tests/identity_tunnelbroker_tests.rs +++ b/services/commtest/tests/identity_tunnelbroker_tests.rs @@ -1,114 +1,114 @@ mod client { tonic::include_proto!("identity.client"); } mod auth_proto { tonic::include_proto!("identity.authenticated"); } use auth_proto::identity_client_service_client::IdentityClientServiceClient as AuthClient; use client::identity_client_service_client::IdentityClientServiceClient; use client::UploadOneTimeKeysRequest; use commtest::identity::device::create_device; use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use tonic::transport::Endpoint; use tonic::Request; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] #[should_panic] async fn test_tunnelbroker_invalid_auth() { let mut device_info = create_device().await; device_info.access_token = "".to_string(); let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_tunnelbroker_valid_auth() { let device_info = create_device().await; let mut socket = create_socket(&device_info).await; socket .next() .await .expect("Failed to receive response") .expect("Failed to read the response"); } #[tokio::test] async fn test_refresh_keys_request_upon_depletion() { let device_info = create_device().await; let mut identity_client = IdentityClientServiceClient::connect("http://127.0.0.1:50054") .await .expect("Couldn't connect to identitiy service"); let upload_request = UploadOneTimeKeysRequest { user_id: device_info.user_id.clone(), device_id: device_info.device_id.clone(), access_token: device_info.access_token.clone(), content_one_time_pre_keys: vec!["content1".to_string()], notif_one_time_pre_keys: vec!["notif1".to_string()], }; identity_client .upload_one_time_keys(upload_request) .await .unwrap(); // Request outbound keys, which should trigger identity service to ask for more keys let channel = Endpoint::from_static("http://[::1]:50054") .connect() .await .unwrap(); let mut client = AuthClient::with_interceptor(channel, |mut inter_request: Request<()>| { let metadata = inter_request.metadata_mut(); metadata.insert("user_id", device_info.user_id.parse().unwrap()); metadata.insert("device_id", device_info.device_id.parse().unwrap()); metadata .insert("access_token", device_info.access_token.parse().unwrap()); Ok(inter_request) }); let keyserver_request = auth_proto::OutboundKeysForUserRequest { user_id: device_info.user_id.clone(), }; println!("Getting keyserver info for user, {}", device_info.user_id); - let first_reponse = client + let _first_reponse = client .get_keyserver_keys(keyserver_request.clone()) .await .expect("Second keyserver keys request failed") .into_inner() .keyserver_info .unwrap(); // The current threshold is 5, but we only upload two. Should receive request - // from tunnelbroker to refresh keys + // from Tunnelbroker to refresh keys // Create session as a keyserver let device_info = create_device().await; let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); let expected_response = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; assert_eq!(serialized_response, expected_response); }; } diff --git a/services/commtest/tests/tunnelbroker_integration_test.rs b/services/commtest/tests/tunnelbroker_integration_tests.rs similarity index 94% rename from services/commtest/tests/tunnelbroker_integration_test.rs rename to services/commtest/tests/tunnelbroker_integration_tests.rs index 87d59474a..766b67f56 100644 --- a/services/commtest/tests/tunnelbroker_integration_test.rs +++ b/services/commtest/tests/tunnelbroker_integration_tests.rs @@ -1,93 +1,92 @@ mod proto { tonic::include_proto!("tunnelbroker"); } use commtest::identity::device::create_device; use commtest::tunnelbroker::socket::create_socket; use futures_util::StreamExt; use proto::tunnelbroker_service_client::TunnelbrokerServiceClient; use proto::MessageToDevice; -use tunnelbroker_messages as messages; use tunnelbroker_messages::RefreshKeyRequest; #[tokio::test] async fn send_refresh_request() { // Create session as a keyserver let device_info = create_device().await; let mut socket = create_socket(&device_info).await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); - let refresh_request = messages::RefreshKeyRequest { + let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.clone(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.clone(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Have keyserver receive any websocket messages let response = socket.next().await.unwrap().unwrap(); // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); } /// Test that a message to an offline device gets pushed to dynamodb /// then recalled once a device connects #[tokio::test] -async fn presist_messages() { +async fn persist_messages() { let device_info = create_device().await; // Send request for keyserver to refresh keys (identity service) let mut tunnelbroker_client = TunnelbrokerServiceClient::connect("http://localhost:50051") .await .unwrap(); - let refresh_request = messages::RefreshKeyRequest { + let refresh_request = RefreshKeyRequest { device_id: device_info.device_id.to_string(), number_of_keys: 5, }; let payload = serde_json::to_string(&refresh_request).unwrap(); let request = MessageToDevice { device_id: device_info.device_id.to_string(), payload, }; let grpc_message = tonic::Request::new(request); tunnelbroker_client .send_message_to_device(grpc_message) .await .unwrap(); // Wait one second to ensure that message had time to persist use std::{thread, time}; let ten_millis = time::Duration::from_millis(50); thread::sleep(ten_millis); let mut socket = create_socket(&device_info).await; // Have keyserver receive any websocket messages if let Some(Ok(response)) = socket.next().await { // Check that message received by keyserver matches what identity server // issued let serialized_response: RefreshKeyRequest = serde_json::from_str(&response.to_text().unwrap()).unwrap(); assert_eq!(serialized_response, refresh_request); }; } diff --git a/services/identity/src/config.rs b/services/identity/src/config.rs index 296c9c690..e2bf32c2e 100644 --- a/services/identity/src/config.rs +++ b/services/identity/src/config.rs @@ -1,131 +1,131 @@ use base64::{engine::general_purpose, DecodeError, Engine as _}; use once_cell::sync::Lazy; use std::{collections::HashSet, env, fmt, fs, io, path}; use tracing::{error, info}; use crate::constants::{ DEFAULT_TUNNELBROKER_ENDPOINT, KEYSERVER_PUBLIC_KEY, LOCALSTACK_ENDPOINT, OPAQUE_SERVER_SETUP, SECRETS_DIRECTORY, SECRETS_SETUP_FILE, TUNNELBROKER_GRPC_ENDPOINT, }; pub static CONFIG: Lazy = Lazy::new(|| Config::load().expect("failed to load config")); pub(super) fn load_config() { Lazy::force(&CONFIG); } #[derive(Clone)] pub struct Config { pub localstack_endpoint: Option, // Opaque 2.0 server secrets pub server_setup: comm_opaque2::ServerSetup, // Reserved usernames pub reserved_usernames: HashSet, pub keyserver_public_key: Option, pub tunnelbroker_endpoint: String, } impl Config { fn load() -> Result { let localstack_endpoint = env::var(LOCALSTACK_ENDPOINT).ok(); let tunnelbroker_endpoint = match env::var(TUNNELBROKER_GRPC_ENDPOINT) { Ok(val) => { - info!("Using tunnelbroker endpoint from env var: {}", val); + info!("Using Tunnelbroker endpoint from env var: {}", val); val } Err(std::env::VarError::NotPresent) => { let val = DEFAULT_TUNNELBROKER_ENDPOINT; - info!("Falling back to default tunnelbroker endpoint: {}", val); + info!("Falling back to default Tunnelbroker endpoint: {}", val); val.to_string() } Err(e) => { error!( "Failed to read environment variable {}: {:?}", TUNNELBROKER_GRPC_ENDPOINT, e ); return Err(Error::Env(e)); } }; let mut path_buf = path::PathBuf::new(); path_buf.push(SECRETS_DIRECTORY); path_buf.push(SECRETS_SETUP_FILE); let server_setup = get_server_setup(path_buf.as_path())?; let reserved_usernames = get_reserved_usernames_set()?; let keyserver_public_key = env::var(KEYSERVER_PUBLIC_KEY).ok(); Ok(Self { localstack_endpoint, server_setup, reserved_usernames, keyserver_public_key, tunnelbroker_endpoint, }) } } impl fmt::Debug for Config { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Config") .field("server_keypair", &"** redacted **") .field("keyserver_auth_token", &"** redacted **") .field("localstack_endpoint", &self.localstack_endpoint) .finish() } } #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { #[display(...)] Opaque(comm_opaque2::ProtocolError), #[display(...)] Io(io::Error), #[display(...)] Env(env::VarError), #[display(...)] Json(serde_json::Error), #[display(...)] Decode(DecodeError), } fn get_server_setup( path: &path::Path, ) -> Result, Error> { let encoded_server_setup = if let Ok(env_setup) = env::var(OPAQUE_SERVER_SETUP) { info!( "Using OPAQUE server setup from env var: {}", OPAQUE_SERVER_SETUP ); env_setup } else if let Ok(file_setup) = fs::read_to_string(path) { info!("Using OPAQUE server setup from file: {}", path.display()); file_setup } else { error!("Unable to locate OPAQUE server setup. Please run `keygen` command and run Identity service again."); return Err(Error::Io(io::Error::new( io::ErrorKind::NotFound, "Missing server credentials", ))); }; let decoded_server_setup = general_purpose::STANDARD_NO_PAD.decode(encoded_server_setup)?; comm_opaque2::ServerSetup::deserialize(&decoded_server_setup) .map_err(Error::Opaque) } fn get_reserved_usernames_set() -> Result, Error> { // All entries in `reserved_usernames.json` must be lowercase and must also be // included in `lib/utils/reserved-users.js`!! let contents = include_str!("../reserved_usernames.json"); let reserved_usernames: Vec = serde_json::from_str(contents)?; Ok(reserved_usernames.into_iter().collect()) } diff --git a/shared/tunnelbroker_messages/src/messages/keys.rs b/shared/tunnelbroker_messages/src/messages/keys.rs index acd249fba..911ada168 100644 --- a/shared/tunnelbroker_messages/src/messages/keys.rs +++ b/shared/tunnelbroker_messages/src/messages/keys.rs @@ -1,29 +1,29 @@ -// Messages sent between tunnelbroker and a device +// Messages sent between Tunnelbroker and a device use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(tag = "type", rename_all = "camelCase")] pub struct RefreshKeyRequest { #[serde(rename = "deviceID")] pub device_id: String, pub number_of_keys: u32, } #[cfg(test)] mod key_tests { use super::*; #[test] fn test_refresh_deserialization() { let example_payload = r#"{ "type": "RefreshKeyRequest", "deviceID": "adfjEDFS", "numberOfKeys": 6 }"#; let request = serde_json::from_str::(example_payload).unwrap(); assert_eq!(request.number_of_keys, 6); } } diff --git a/shared/tunnelbroker_messages/src/messages/mod.rs b/shared/tunnelbroker_messages/src/messages/mod.rs index ce3e9bf21..217799816 100644 --- a/shared/tunnelbroker_messages/src/messages/mod.rs +++ b/shared/tunnelbroker_messages/src/messages/mod.rs @@ -1,15 +1,15 @@ -// Messages sent between tunnelbroker and a device +// Messages sent between Tunnelbroker and a device pub mod keys; pub mod session; pub use keys::*; pub use session::*; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] #[serde(untagged)] pub enum Messages { RefreshKeysRequest(RefreshKeyRequest), ConnectionInitializationMessage(ConnectionInitializationMessage), } diff --git a/shared/tunnelbroker_messages/src/messages/session.rs b/shared/tunnelbroker_messages/src/messages/session.rs index f7aaf81a6..7093786b3 100644 --- a/shared/tunnelbroker_messages/src/messages/session.rs +++ b/shared/tunnelbroker_messages/src/messages/session.rs @@ -1,73 +1,73 @@ -// Messages sent between tunnelbroker and a device +// Messages sent between Tunnelbroker and a device use serde::{Deserialize, Serialize}; -/// The workflow when estabilishing a tunnelbroker connection: +/// The workflow when estabilishing a Tunnelbroker connection: /// - Client sends ConnectionInitializationMessage /// - Tunnelbroker validates access_token with identity service /// - Tunnelbroker emits an AMQP message declaring that it has opened a new /// connection with a given device, so that the respective tunnelbroker /// instance can close the existing connection. /// - Tunnelbroker returns a session_id representing that the connection was /// accepted /// - Tunnelbroker will flush all messages related to device from RabbitMQ. /// This must be done first before flushing DynamoDB to prevent duplicated /// messages. /// - Tunnelbroker flushes all messages in DynamoDB /// - Tunnelbroker orders messages by creation date (oldest first), and sends /// messages to device /// - Tunnelbroker then polls for incoming messages from device #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "camelCase")] pub enum DeviceTypes { Mobile, Web, Keyserver, } -/// Message sent by a client to tunnelbroker to initiate a websocket +/// Message sent by a client to Tunnelbroker to initiate a websocket /// session. Tunnelbroker will then validate the access token with identity /// service before continuing with the request. #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub struct ConnectionInitializationMessage { #[serde(rename = "deviceID")] pub device_id: String, pub access_token: String, #[serde(rename = "userID")] pub user_id: String, pub notify_token: Option, pub device_type: DeviceTypes, pub device_app_version: Option, pub device_os: Option, } #[derive(Serialize, Deserialize)] pub struct ConnectionInitializationResponse { pub session_id: String, } #[cfg(test)] mod session_tests { use super::*; #[test] fn test_session_deserialization() { let example_payload = r#"{ "type": "sessionRequest", "accessToken": "xkdeifjsld", "deviceID": "foo", "userID": "alice", "deviceType": "keyserver" }"#; let request = serde_json::from_str::(example_payload) .unwrap(); assert_eq!(request.device_id, "foo"); assert_eq!(request.access_token, "xkdeifjsld"); assert_eq!(request.device_os, None); assert_eq!(request.device_type, DeviceTypes::Keyserver); } }