Page MenuHomePhabricator

No OneTemporary

diff --git a/services/identity/src/constants.rs b/services/identity/src/constants.rs
index 74ce62ea4..cae4a3dce 100644
--- a/services/identity/src/constants.rs
+++ b/services/identity/src/constants.rs
@@ -1,221 +1,221 @@
use tokio::time::Duration;
// Secrets
pub const SECRETS_DIRECTORY: &str = "secrets";
pub const SECRETS_SETUP_FILE: &str = "server_setup.txt";
// DynamoDB
// User table information, supporting opaque_ke 2.0 and X3DH information
// Users can sign in either through username+password or Eth wallet.
//
// This structure should be aligned with the messages defined in
// shared/protos/identity_unauthenticated.proto
//
// Structure for a user should be:
// {
// userID: String,
// opaqueRegistrationData: Option<String>,
// username: Option<String>,
// walletAddress: Option<String>,
// devices: HashMap<String, Device>
// }
//
// A device is defined as:
// {
// deviceType: String, # client or keyserver
// keyPayload: String,
// keyPayloadSignature: String,
// identityPreKey: String,
// identityPreKeySignature: String,
// identityOneTimeKeys: Vec<String>,
// notifPreKey: String,
// notifPreKeySignature: String,
// notifOneTimeKeys: Vec<String>,
// socialProof: Option<String>
// }
// }
//
// Additional context:
// "devices" uses the signing public identity key of the device as a key for the devices map
// "keyPayload" is a JSON encoded string containing identity and notif keys (both signature and verification)
// if "deviceType" == "keyserver", then the device will not have any notif key information
pub const USERS_TABLE: &str = "identity-users";
pub const USERS_TABLE_PARTITION_KEY: &str = "userID";
pub const USERS_TABLE_REGISTRATION_ATTRIBUTE: &str = "opaqueRegistrationData";
pub const USERS_TABLE_USERNAME_ATTRIBUTE: &str = "username";
pub const USERS_TABLE_DEVICES_MAP_DEVICE_TYPE_ATTRIBUTE_NAME: &str =
"deviceType";
pub const USERS_TABLE_WALLET_ADDRESS_ATTRIBUTE: &str = "walletAddress";
pub const USERS_TABLE_SOCIAL_PROOF_ATTRIBUTE_NAME: &str = "socialProof";
pub const USERS_TABLE_DEVICELIST_TIMESTAMP_ATTRIBUTE_NAME: &str =
"deviceListTimestamp";
pub const USERS_TABLE_USERNAME_INDEX: &str = "username-index";
pub const USERS_TABLE_WALLET_ADDRESS_INDEX: &str = "walletAddress-index";
pub const ACCESS_TOKEN_TABLE: &str = "identity-tokens";
pub const ACCESS_TOKEN_TABLE_PARTITION_KEY: &str = "userID";
pub const ACCESS_TOKEN_SORT_KEY: &str = "signingPublicKey";
pub const ACCESS_TOKEN_TABLE_CREATED_ATTRIBUTE: &str = "created";
pub const ACCESS_TOKEN_TABLE_AUTH_TYPE_ATTRIBUTE: &str = "authType";
pub const ACCESS_TOKEN_TABLE_VALID_ATTRIBUTE: &str = "valid";
pub const ACCESS_TOKEN_TABLE_TOKEN_ATTRIBUTE: &str = "token";
pub const NONCE_TABLE: &str = "identity-nonces";
pub const NONCE_TABLE_PARTITION_KEY: &str = "nonce";
pub const NONCE_TABLE_CREATED_ATTRIBUTE: &str = "created";
pub const NONCE_TABLE_EXPIRATION_TIME_ATTRIBUTE: &str = "expirationTime";
pub const NONCE_TABLE_EXPIRATION_TIME_UNIX_ATTRIBUTE: &str =
"expirationTimeUnix";
// Usernames reserved because they exist in Ashoat's keyserver already
pub const RESERVED_USERNAMES_TABLE: &str = "identity-reserved-usernames";
pub const RESERVED_USERNAMES_TABLE_PARTITION_KEY: &str = "username";
pub const RESERVED_USERNAMES_TABLE_USER_ID_ATTRIBUTE: &str = "userID";
pub mod devices_table {
/// table name
pub const NAME: &str = "identity-devices";
pub const TIMESTAMP_INDEX_NAME: &str = "deviceList-timestamp-index";
/// partition key
pub const ATTR_USER_ID: &str = "userID";
/// sort key
pub const ATTR_ITEM_ID: &str = "itemID";
// itemID prefixes (one shouldn't be a prefix of the other)
pub const DEVICE_ITEM_KEY_PREFIX: &str = "device-";
pub const DEVICE_LIST_KEY_PREFIX: &str = "devicelist-";
// device-specific attrs
pub const ATTR_DEVICE_TYPE: &str = "deviceType";
pub const ATTR_DEVICE_KEY_INFO: &str = "deviceKeyInfo";
pub const ATTR_CONTENT_PREKEY: &str = "contentPreKey";
pub const ATTR_NOTIF_PREKEY: &str = "notifPreKey";
// IdentityKeyInfo constants
pub const ATTR_KEY_PAYLOAD: &str = "keyPayload";
pub const ATTR_KEY_PAYLOAD_SIGNATURE: &str = "keyPayloadSignature";
// PreKey constants
pub const ATTR_PREKEY: &str = "preKey";
pub const ATTR_PREKEY_SIGNATURE: &str = "preKeySignature";
// device-list-specific attrs
pub const ATTR_TIMESTAMP: &str = "timestamp";
pub const ATTR_DEVICE_IDS: &str = "deviceIDs";
// migration-specific attrs
pub const ATTR_CODE_VERSION: &str = "codeVersion";
pub const ATTR_LOGIN_TIME: &str = "loginTime";
}
// One time keys table, which need to exist in their own table to ensure
// atomicity of additions and removals
pub mod one_time_keys_table {
// The `PARTITION_KEY` will contain "notification_${deviceID}" or
// "content_${deviceID}" to allow for both key sets to coexist in the same table
pub const NAME: &str = "identity-one-time-keys";
pub const PARTITION_KEY: &str = "deviceID";
pub const DEVICE_ID: &str = PARTITION_KEY;
pub const SORT_KEY: &str = "oneTimeKey";
pub const ONE_TIME_KEY: &str = SORT_KEY;
}
// One-time key constants for device info map
pub const CONTENT_ONE_TIME_KEY: &str = "contentOneTimeKey";
pub const NOTIF_ONE_TIME_KEY: &str = "notifOneTimeKey";
// Tokio
pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
pub const IDENTITY_SERVICE_SOCKET_ADDR: &str = "[::]:50054";
pub const IDENTITY_SERVICE_WEBSOCKET_ADDR: &str = "[::]:51004";
pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3);
// Token
pub const ACCESS_TOKEN_LENGTH: usize = 512;
// Temporary config
pub const AUTH_TOKEN: &str = "COMM_IDENTITY_SERVICE_AUTH_TOKEN";
pub const KEYSERVER_PUBLIC_KEY: &str = "KEYSERVER_PUBLIC_KEY";
// Nonce
pub const NONCE_LENGTH: usize = 17;
pub const NONCE_TTL_DURATION: i64 = 120; // seconds
// Identity
pub const DEFAULT_IDENTITY_ENDPOINT: &str = "http://localhost:50054";
// LocalStack
pub const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT";
// OPAQUE Server Setup
pub const OPAQUE_SERVER_SETUP: &str = "OPAQUE_SERVER_SETUP";
-// Opensearch Domain
+// Identity Search
pub const OPENSEARCH_ENDPOINT: &str = "OPENSEARCH_ENDPOINT";
pub const DEFAULT_OPENSEARCH_ENDPOINT: &str =
"identity-search-domain.us-east-2.opensearch.localhost.local
stack.cloud:4566";
pub const IDENTITY_SEARCH_INDEX: &str = "users";
pub const IDENTITY_SEARCH_RESULT_SIZE: u32 = 20;
// Tunnelbroker
pub const TUNNELBROKER_GRPC_ENDPOINT: &str = "TUNNELBROKER_GRPC_ENDPOINT";
pub const DEFAULT_TUNNELBROKER_ENDPOINT: &str = "http://localhost:50051";
// X3DH key management
// Threshold for requesting more one_time keys
pub const ONE_TIME_KEY_MINIMUM_THRESHOLD: usize = 5;
// Number of keys to be refreshed when below the threshold
pub const ONE_TIME_KEY_REFRESH_NUMBER: u32 = 5;
// Minimum supported code versions
pub const MIN_SUPPORTED_NATIVE_VERSION: u64 = 270;
// Request metadata
pub mod request_metadata {
pub const CODE_VERSION: &str = "code_version";
pub const DEVICE_TYPE: &str = "device_type";
pub const USER_ID: &str = "user_id";
pub const DEVICE_ID: &str = "device_id";
pub const ACCESS_TOKEN: &str = "access_token";
}
// CORS
pub mod cors {
use std::time::Duration;
pub const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
pub const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
["grpc-status", "grpc-message", "grpc-status-details-bin"];
pub const DEFAULT_ALLOW_HEADERS: [&str; 9] = [
"x-grpc-web",
"content-type",
"x-user-agent",
"grpc-timeout",
super::request_metadata::CODE_VERSION,
super::request_metadata::DEVICE_TYPE,
super::request_metadata::USER_ID,
super::request_metadata::DEVICE_ID,
super::request_metadata::ACCESS_TOKEN,
];
pub const DEFAULT_ALLOW_ORIGIN: [&str; 2] =
["https://web.comm.app", "http://localhost:3000"];
}
diff --git a/services/identity/src/websockets/mod.rs b/services/identity/src/websockets/mod.rs
index 0b22f60f2..a2d9de0e5 100644
--- a/services/identity/src/websockets/mod.rs
+++ b/services/identity/src/websockets/mod.rs
@@ -1,334 +1,336 @@
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use elastic::client::responses::SearchResponse as ElasticSearchResponse;
use futures::lock::Mutex;
use futures_util::{SinkExt, StreamExt};
use hyper::{Body, Request, Response, StatusCode};
use hyper_tungstenite::tungstenite::Message;
use hyper_tungstenite::HyperWebsocket;
use identity_search_messages::{
ConnectionInitializationResponse, ConnectionInitializationStatus, Heartbeat,
IdentitySearchFailure, IdentitySearchMethod, IdentitySearchResponse,
IdentitySearchResult, IdentitySearchUser, MessagesToServer,
};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tracing::{debug, error, info};
mod auth;
mod send;
use crate::config::CONFIG;
use crate::constants::{
IDENTITY_SEARCH_INDEX, IDENTITY_SEARCH_RESULT_SIZE,
IDENTITY_SERVICE_WEBSOCKET_ADDR, SOCKET_HEARTBEAT_TIMEOUT,
};
use send::{send_message, WebsocketSink};
pub mod errors;
#[derive(Serialize, Deserialize)]
struct Query {
size: u32,
query: Prefix,
}
#[derive(Serialize, Deserialize)]
struct Prefix {
prefix: Username,
}
#[derive(Serialize, Deserialize)]
struct Username {
username: String,
}
struct WebsocketService {
addr: SocketAddr,
}
impl hyper::service::Service<Request<Body>> for WebsocketService {
type Response = Response<Body>;
type Error = errors::BoxedError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let addr = self.addr;
let future = async move {
tracing::debug!(
"Incoming HTTP request on WebSocket port: {} {}",
req.method(),
req.uri().path()
);
if hyper_tungstenite::is_upgrade_request(&req) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?;
tokio::spawn(async move {
accept_connection(websocket, addr).await;
});
return Ok(response);
}
debug!(
"Incoming HTTP request on WebSocket port: {} {}",
req.method(),
req.uri().path()
);
let response = match req.uri().path() {
"/health" => Response::new(Body::from("OK")),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))?,
};
Ok(response)
};
Box::pin(future)
}
}
pub async fn run_server() -> Result<(), errors::BoxedError> {
let addr: SocketAddr = IDENTITY_SERVICE_WEBSOCKET_ADDR.parse()?;
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("Listening to WebSocket traffic on {}", addr);
let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
while let Ok((stream, addr)) = listener.accept().await {
let connection = http
.serve_connection(stream, WebsocketService { addr })
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
error!("Error serving HTTP/WebSocket connection: {:?}", err);
}
});
}
Ok(())
}
async fn send_search_request<T: Serialize>(
url: &str,
json_body: T,
) -> Result<reqwest::Response, reqwest::Error> {
let client = reqwest::Client::new();
client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&json_body)
.send()
.await
}
async fn close_connection(outgoing: WebsocketSink) {
if let Err(e) = outgoing.lock().await.close().await {
error!("Error closing connection: {}", e);
}
}
async fn handle_prefix_search(
request_id: &str,
prefix_request: identity_search_messages::IdentitySearchPrefix,
) -> Result<IdentitySearchResult, errors::WebsocketError> {
let prefix_query = Query {
size: IDENTITY_SEARCH_RESULT_SIZE,
query: Prefix {
prefix: Username {
username: prefix_request.prefix.trim().to_string(),
},
},
};
- let opensearch_url =
- format!("https://{}/users/_search/", &CONFIG.opensearch_endpoint);
+ let opensearch_url = format!(
+ "https://{}/{}/_search/",
+ &CONFIG.opensearch_endpoint, IDENTITY_SEARCH_INDEX
+ );
let search_response = send_search_request(&opensearch_url, prefix_query)
.await?
.json::<ElasticSearchResponse<IdentitySearchUser>>()
.await?;
let usernames: Vec<IdentitySearchUser> =
search_response.into_documents().collect();
let search_result = IdentitySearchResult {
id: request_id.to_string(),
hits: usernames,
};
Ok(search_result)
}
async fn handle_websocket_frame(
text: String,
outgoing: WebsocketSink,
) -> Result<(), errors::WebsocketError> {
let Ok(serialized_message) = serde_json::from_str::<MessagesToServer>(&text)
else {
return Err(errors::WebsocketError::SerializationError);
};
match serialized_message {
MessagesToServer::Heartbeat(Heartbeat {}) => {
debug!("Received heartbeat");
Ok(())
}
MessagesToServer::IdentitySearchQuery(search_query) => {
let handler_result = match search_query.search_method {
IdentitySearchMethod::IdentitySearchPrefix(prefix_query) => {
handle_prefix_search(&search_query.id, prefix_query).await
}
};
let search_response = match handler_result {
Ok(search_result) => IdentitySearchResponse::Success(search_result),
Err(e) => IdentitySearchResponse::Error(IdentitySearchFailure {
id: search_query.id,
error: e.to_string(),
}),
};
let serialized_message = serde_json::to_string(&search_response)?;
send_message(Message::Text(serialized_message), outgoing.clone()).await;
Ok(())
}
_ => Err(errors::WebsocketError::InvalidMessage),
}
}
async fn accept_connection(hyper_ws: HyperWebsocket, addr: SocketAddr) {
debug!("Incoming WebSocket connection from {}", addr);
let ws_stream = match hyper_ws.await {
Ok(stream) => stream,
Err(e) => {
error!("WebSocket handshake error: {}", e);
return;
}
};
let (outgoing, mut incoming) = ws_stream.split();
let outgoing = Arc::new(Mutex::new(outgoing));
if let Some(Ok(auth_message)) = incoming.next().await {
match auth_message {
Message::Text(text) => {
if let Err(auth_error) = auth::handle_auth_message(&text).await {
let error_response = ConnectionInitializationResponse {
status: ConnectionInitializationStatus::Error(
auth_error.to_string(),
),
};
let serialized_response = serde_json::to_string(&error_response)
.expect("Error serializing auth error response");
send_message(Message::Text(serialized_response), outgoing.clone())
.await;
close_connection(outgoing).await;
return;
} else {
let success_response = ConnectionInitializationResponse {
status: ConnectionInitializationStatus::Success,
};
let serialized_response = serde_json::to_string(&success_response)
.expect("Error serializing auth success response");
send_message(Message::Text(serialized_response), outgoing.clone())
.await;
}
}
_ => {
error!("Invalid authentication message from {}", addr);
close_connection(outgoing).await;
return;
}
}
} else {
error!("No authentication message from {}", addr);
close_connection(outgoing).await;
return;
}
let mut ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
let mut got_heartbeat_response = true;
loop {
tokio::select! {
client_message = incoming.next() => {
let message: Message = match client_message {
Some(Ok(msg)) => msg,
_ => {
debug!("Connection to {} closed remotely.", addr);
break;
}
};
match message {
Message::Close(_) => {
debug!("Connection to {} closed.", addr);
break;
}
Message::Pong(_) => {
debug!("Received Pong message from {}", addr);
}
Message::Ping(msg) => {
debug!("Received Ping message from {}", addr);
send_message(Message::Pong(msg), outgoing.clone()).await;
}
Message::Text(text) => {
got_heartbeat_response = true;
ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
if let Err(e) = handle_websocket_frame(text, outgoing.clone()).await {
error!("Error handling WebSocket frame: {}", e);
continue;
};
}
_ => {
error!("Client sent invalid message type");
break;
}
}
}
_ = &mut ping_timeout => {
if !got_heartbeat_response {
error!("Connection to {} died.", addr);
break;
}
let serialized = serde_json::to_string(&Heartbeat {}).unwrap();
send_message(Message::text(serialized), outgoing.clone()).await;
got_heartbeat_response = false;
ping_timeout = Box::pin(tokio::time::sleep(SOCKET_HEARTBEAT_TIMEOUT));
}
else => {
debug!("Unhealthy connection for: {}", addr);
break;
}
}
}
info!("unregistering connection to: {}", addr);
close_connection(outgoing).await;
}

File Metadata

Mime Type
text/x-diff
Expires
Wed, Dec 25, 6:25 PM (7 h, 3 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2700695
Default Alt Text
(17 KB)

Event Timeline