Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3525727
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs
index 74ce62ea4..cae4a3dce 100644
--- a/services/identity/src/constants.rs
+++ b/services/identity/src/constants.rs
@@ -1,221 +1,221 @@
use tokio::time::Duration;
// Secrets
pub const SECRETS_DIRECTORY: &str = "secrets";
pub const SECRETS_SETUP_FILE: &str = "server_setup.txt";
// DynamoDB
// User table information, supporting opaque_ke 2.0 and X3DH information
// Users can sign in either through username+password or Eth wallet.
//
// This structure should be aligned with the messages defined in
// shared/protos/identity_unauthenticated.proto
//
// Structure for a user should be:
// {
// userID: String,
// opaqueRegistrationData: Option<String>,
// username: Option<String>,
// walletAddress: Option<String>,
// devices: HashMap<String, Device>
// }
//
// A device is defined as:
// {
// deviceType: String, # client or keyserver
// keyPayload: String,
// keyPayloadSignature: String,
// identityPreKey: String,
// identityPreKeySignature: String,
// identityOneTimeKeys: Vec<String>,
// notifPreKey: String,
// notifPreKeySignature: String,
// notifOneTimeKeys: Vec<String>,
// socialProof: Option<String>
// }
// }
//
// Additional context:
// "devices" uses the signing public identity key of the device as a key for the devices map
// "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification)
// if "deviceType" == "keyserver", then the device will not have any notif key information
pub const USERS_TABLE: &str = "identity-users";
pub const USERS_TABLE_PARTITION_KEY: &str = "userID";
pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData";
pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username";
pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str =
"deviceType";
pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress";
pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof";
pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str =
"deviceListTimestamp";
pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index";
pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index";
pub const ACCESS_TOKEN_TABLE: &str = "identity-tokens";
pub const ACCESS_TOKEN_TABLE_PARTITION_KEY: &str = "userID";
pub const ACCESS_TOKEN_SORT_KEY: &str = "signingPublicKey";
pub const ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE: &str = "created";
pub const ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE: &str = "authType";
pub const ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE: &str = "valid";
pub const ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE: &str = "token";
pub const NONCE_TABLE: &str = "identity-nonces";
pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce";
pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created";
pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime";
pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str =
"expirationTimeUnix";
// Usernames reserved because they exist in Ashoat's keyserver already
pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames";
pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username";
pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID";
pub mod devices_table {
/// table name
pub const NAME: &str = "identity-devices";
pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index";
/// partition key
pub const ATTR_USER_ID: &str = "userID";
/// sort key
pub const ATTR_ITEM_ID: &str = "itemID";
// itemID prefixes (one shouldn't be a prefix of the other)
pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-";
pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-";
// device-specific attrs
pub const ATTR_DEVICE_TYPE: &str = "deviceType";
pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo";
pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey";
pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey";
// IdentityKeyInfo constants
pub const ATTR_KEY_PAYLOAD: &str = "keyPayload";
pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature";
// PreKey constants
pub const ATTR_PREKEY: &str = "preKey";
pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature";
// device-list-specific attrs
pub const ATTR_TIMESTAMP: &str = "timestamp";
pub const ATTR_DEVICE_IDS: &str = "deviceIDs";
// migration-specific attrs
pub const ATTR_CODE_VERSION: &str = "codeVersion";
pub const ATTR_LOGIN_TIME: &str = "loginTime";
}
// One time keys table, which need to exist in their own table to ensure
// atomicity of additions and removals
pub mod one_time_keys_table {
// The `PARTITION_KEY` will contain "notification_${deviceID}" or
// "content_${deviceID}" to allow for both key sets to coexist in the same table
pub const NAME: &str = "identity-one-time-keys";
pub const PARTITION_KEY: &str = "deviceID";
pub const DEVICE_ID: &str = PARTITION_KEY;
pub const SORT_KEY: &str = "oneTimeKey";
pub const ONE_TIME_KEY: &str = SORT_KEY;
}
// One-time key constants for device info map
pub const CONTENT_ONE_TIME_KEY: &str = "contentOneTimeKey";
pub const NOTIF_ONE_TIME_KEY: &str = "notifOneTimeKey";
// Tokio
pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054";
pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004";
pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3);
// Token
pub const ACCESS_TOKEN_LENGTH: usize = 512;
// Temporary config
pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN";
pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY";
// Nonce
pub const NONCE_LENGTH: usize = 17;
pub const NONCE_TTL_DURATION: i64 = 120; // seconds
// Identity
pub const DEFAULT_IDENTITY_ENDPOINT: &str = "http://localhost:50054";
// LocalStack
pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT";
// OPAQUE Server Setup
pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP";
-// Opensearch Domain
+// Identity Search
pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT";
pub const DEFAULT_OPENSEARCH_ENDPOINT: &str =
"identity-search-domain.us-east-2.opensearch.localhost.local
stack.cloud:4566";
pub const IDENTITY_SEARCH_INDEX: &str = "users";
pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20;
// Tunnelbroker
pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT";
pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051";
// X3DH key management
// Threshold for requesting more one_time keys
pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5;
// Number of keys to be refreshed when below the threshold
pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5;
// Minimum supported code versions
pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270;
// Request metadata
pub mod request_metadata {
pub const CODE_VERSION: &str = "code_version";
pub const DEVICE_TYPE: &str = "device_type";
pub const USER_ID: &str = "user_id";
pub const DEVICE_ID: &str = "device_id";
pub const ACCESS_TOKEN: &str = "access_token";
}
// CORS
pub mod cors {
use std::time::Duration;
pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
["grpc-status", "grpc-message", "grpc-status-details-bin"];
pub const DEFAULT_ALLOW_HEADERS: [&str; 9] = [
"x-grpc-web",
"content-type",
"x-user-agent",
"grpc-timeout",
super::request_metadata::CODE_VERSION,
super::request_metadata::DEVICE_TYPE,
super::request_metadata::USER_ID,
super::request_metadata::DEVICE_ID,
super::request_metadata::ACCESS_TOKEN,
];
pub const DEFAULT_ALLOW_ORIGIN: [&str; 2] =
["https://web.comm.app", "http://localhost:3000"];
}
diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs
index 0b22f60f2..a2d9de0e5 100644
--- a/services/identity/src/websockets/mod.rs
+++ b/services/identity/src/websockets/mod.rs
@@ -1,334 +1,336 @@
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use elastic::client::responses::SearchResponse as ElasticSearchResponse;
use futures::lock::Mutex;
use futures_util::{SinkExt, StreamExt};
use hyper::{Body, Request, Response, StatusCode};
use hyper_tungstenite::tungstenite::Message;
use hyper_tungstenite::HyperWebsocket;
use identity_search_messages::{
ConnectionInitializationResponse, ConnectionInitializationStatus, Heartbeat,
IdentitySearchFailure, IdentitySearchMethod, IdentitySearchResponse,
IdentitySearchResult, IdentitySearchUser, MessagesToServer,
};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tracing::{debug, error, info};
mod auth;
mod send;
use crate::config::CONFIG;
use crate::constants::{
IDENTITY_SEARCH_INDEX, IDENTITY_SEARCH_RESULT_SIZE,
IDENTITY_SERVICE_WEBSOCKET_ADDR, SOCKET_HEARTBEAT_TIMEOUT,
};
use send::{send_message, WebsocketSink};
pub mod errors;
#[derive(Serialize, Deserialize)]
struct Query {
size: u32,
query: Prefix,
}
#[derive(Serialize, Deserialize)]
struct Prefix {
prefix: Username,
}
#[derive(Serialize, Deserialize)]
struct Username {
username: String,
}
struct WebsocketService {
addr: SocketAddr,
}
impl hyper::service::Service<Request<Body>> for WebsocketService {
type Response = Response<Body>;
type Error = errors::BoxedError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let addr = self.addr;
let future = async move {
tracing::debug!(
"Incoming HTTP request on WebSocket port: {} {}",
req.method(),
req.uri().path()
);
if hyper_tungstenite::is_upgrade_request(&req) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?;
tokio::spawn(async move {
accept_connection(websocket, addr).await;
});
return Ok(response);
}
debug!(
"Incoming HTTP request on WebSocket port: {} {}",
req.method(),
req.uri().path()
);
let response = match req.uri().path() {
"/health" => Response::new(Body::from("OK")),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))?,
};
Ok(response)
};
Box::pin(future)
}
}
pub async fn run_server() -> Result<(), errors::BoxedError> {
let addr: SocketAddr = IDENTITY_SERVICE_WEBSOCKET_ADDR.parse()?;
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("Listening to WebSocket traffic on {}", addr);
let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
while let Ok((stream, addr)) = listener.accept().await {
let connection = http
.serve_connection(stream, WebsocketService { addr })
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
error!("Error serving HTTP/WebSocket connection: {:?}", err);
}
});
}
Ok(())
}
async fn send_search_request<T: Serialize>(
url: &str,
json_body: T,
) -> Result<reqwest::Response, reqwest::Error> {
let client = reqwest::Client::new();
client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&json_body)
.send()
.await
}
async fn close_connection(outgoing: WebsocketSink) {
if let Err(e) = outgoing.lock().await.close().await {
error!("Error closing connection: {}", e);
}
}
async fn handle_prefix_search(
request_id: &str,
prefix_request: identity_search_messages::IdentitySearchPrefix,
) -> Result<IdentitySearchResult, errors::WebsocketError> {
let prefix_query = Query {
size: IDENTITY_SEARCH_RESULT_SIZE,
query: Prefix {
prefix: Username {
username: prefix_request.prefix.trim().to_string(),
},
},
};
- let opensearch_url =
- format!("https://{}/users/_search/", &CONFIG.opensearch_endpoint);
+ let opensearch_url = format!(
+ "https://{}/{}/_search/",
+ &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX
+ );
let search_response = send_search_request(&opensearch_url, prefix_query)
.await?
.json::<ElasticSearchResponse<IdentitySearchUser>>()
.await?;
let usernames: Vec<IdentitySearchUser> =
search_response.into_documents().collect();
let search_result = IdentitySearchResult {
id: request_id.to_string(),
hits: usernames,
};
Ok(search_result)
}
async fn handle_websocket_frame(
text: String,
outgoing: WebsocketSink,
) -> Result<(), errors::WebsocketError> {
let Ok(serialized_message) = serde_json::from_str::<MessagesToServer>(&text)
else {
return Err(errors::WebsocketError::SerializationError);
};
match serialized_message {
MessagesToServer::Heartbeat(Heartbeat {}) => {
debug!("Received heartbeat");
Ok(())
}
MessagesToServer::IdentitySearchQuery(search_query) => {
let handler_result = match search_query.search_method {
IdentitySearchMethod::IdentitySearchPrefix(prefix_query) => {
handle_prefix_search(&search_query.id, prefix_query).await
}
};
let search_response = match handler_result {
Ok(search_result) => IdentitySearchResponse::Success(search_result),
Err(e) => IdentitySearchResponse::Error(IdentitySearchFailure {
id: search_query.id,
error: e.to_string(),
}),
};
let serialized_message = serde_json::to_string(&search_response)?;
send_message(Message::Text(serialized_message), outgoing.clone()).await;
Ok(())
}
_ => Err(errors::WebsocketError::InvalidMessage),
}
}
async fn accept_connection(hyper_ws: HyperWebsocket, addr: SocketAddr) {
debug!("Incoming WebSocket connection from {}", addr);
let ws_stream = match hyper_ws.await {
Ok(stream) => stream,
Err(e) => {
error!("WebSocket handshake error: {}", e);
return;
}
};
let (outgoing, mut incoming) = ws_stream.split();
let outgoing = Arc::new(Mutex::new(outgoing));
if let Some(Ok(auth_message)) = incoming.next().await {
match auth_message {
Message::Text(text) => {
if let Err(auth_error) = auth::handle_auth_message(&text).await {
let error_response = ConnectionInitializationResponse {
status: ConnectionInitializationStatus::Error(
auth_error.to_string(),
),
};
let serialized_response = serde_json::to_string(&error_response)
.expect("Error serializing auth error response");
send_message(Message::Text(serialized_response), outgoing.clone())
.await;
close_connection(outgoing).await;
return;
} else {
let success_response = ConnectionInitializationResponse {
status: ConnectionInitializationStatus::Success,
};
let serialized_response = serde_json::to_string(&success_response)
.expect("Error serializing auth success response");
send_message(Message::Text(serialized_response), outgoing.clone())
.await;
}
}
_ => {
error!("Invalid authentication message from {}", addr);
close_connection(outgoing).await;
return;
}
}
} else {
error!("No authentication message from {}", addr);
close_connection(outgoing).await;
return;
}
let mut ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
let mut got_heartbeat_response = true;
loop {
tokio::select! {
client_message = incoming.next() => {
let message: Message = match client_message {
Some(Ok(msg)) => msg,
_ => {
debug!("Connection to {} closed remotely.", addr);
break;
}
};
match message {
Message::Close(_) => {
debug!("Connection to {} closed.", addr);
break;
}
Message::Pong(_) => {
debug!("Received Pong message from {}", addr);
}
Message::Ping(msg) => {
debug!("Received Ping message from {}", addr);
send_message(Message::Pong(msg), outgoing.clone()).await;
}
Message::Text(text) => {
got_heartbeat_response = true;
ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
if let Err(e) = handle_websocket_frame(text, outgoing.clone()).await {
error!("Error handling WebSocket frame: {}", e);
continue;
};
}
_ => {
error!("Client sent invalid message type");
break;
}
}
}
_ = &mut ping_timeout => {
if !got_heartbeat_response {
error!("Connection to {} died.", addr);
break;
}
let serialized = serde_json::to_string(&Heartbeat {}).unwrap();
send_message(Message::text(serialized), outgoing.clone()).await;
got_heartbeat_response = false;
ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
}
else => {
debug!("Unhealthy connection for: {}", addr);
break;
}
}
}
info!("unregistering connection to: {}", addr);
close_connection(outgoing).await;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Dec 25, 6:25 PM (7 h, 3 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2700695
Default Alt Text
(17 KB)
Attached To
Mode
rCOMM Comm
Attached
Detach File
Event Timeline
Log In to Comment