Page MenuHomePhabricator

D8918.id31125.diff
No OneTemporary

D8918.id31125.diff

diff --git a/services/tunnelbroker/Cargo.lock b/services/tunnelbroker/Cargo.lock
--- a/services/tunnelbroker/Cargo.lock
+++ b/services/tunnelbroker/Cargo.lock
@@ -1077,6 +1077,18 @@
"wasi",
]
+[[package]]
+name = "grpc_clients"
+version = "0.1.0"
+dependencies = [
+ "derive_more",
+ "prost",
+ "tonic 0.9.2",
+ "tonic-build 0.9.2",
+ "tracing",
+ "tracing-subscriber",
+]
+
[[package]]
name = "h2"
version = "0.3.18"
@@ -1208,7 +1220,7 @@
"rustls 0.20.8",
"rustls-native-certs",
"tokio",
- "tokio-rustls",
+ "tokio-rustls 0.23.4",
]
[[package]]
@@ -1232,8 +1244,8 @@
"hyper",
"pin-project-lite",
"tokio",
- "tokio-tungstenite",
- "tungstenite",
+ "tokio-tungstenite 0.20.0",
+ "tungstenite 0.20.0",
]
[[package]]
@@ -2273,6 +2285,16 @@
"webpki",
]
+[[package]]
+name = "tokio-rustls"
+version = "0.24.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
+dependencies = [
+ "rustls 0.21.1",
+ "tokio",
+]
+
[[package]]
name = "tokio-stream"
version = "0.1.14"
@@ -2284,6 +2306,18 @@
"tokio",
]
+[[package]]
+name = "tokio-tungstenite"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite 0.18.0",
+]
+
[[package]]
name = "tokio-tungstenite"
version = "0.20.0"
@@ -2293,7 +2327,7 @@
"futures-util",
"log",
"tokio",
- "tungstenite",
+ "tungstenite 0.20.0",
]
[[package]]
@@ -2342,6 +2376,37 @@
"tracing-futures",
]
+[[package]]
+name = "tonic"
+version = "0.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
+dependencies = [
+ "async-stream",
+ "async-trait",
+ "axum",
+ "base64 0.21.0",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "hyper",
+ "hyper-timeout",
+ "percent-encoding",
+ "pin-project",
+ "prost",
+ "rustls-pemfile",
+ "tokio",
+ "tokio-rustls 0.24.1",
+ "tokio-stream",
+ "tower",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
[[package]]
name = "tonic-build"
version = "0.8.4"
@@ -2355,6 +2420,19 @@
"syn 1.0.109",
]
+[[package]]
+name = "tonic-build"
+version = "0.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07"
+dependencies = [
+ "prettyplease",
+ "proc-macro2",
+ "prost-build",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "tower"
version = "0.4.13"
@@ -2466,6 +2544,25 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
+[[package]]
+name = "tungstenite"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
+dependencies = [
+ "base64 0.13.1",
+ "byteorder",
+ "bytes",
+ "http",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
[[package]]
name = "tungstenite"
version = "0.20.0"
@@ -2495,6 +2592,7 @@
"clap",
"derive_more",
"futures-util",
+ "grpc_clients",
"hyper",
"hyper-tungstenite",
"lapin",
@@ -2502,8 +2600,9 @@
"prost",
"serde_json",
"tokio",
- "tonic",
- "tonic-build",
+ "tokio-tungstenite 0.18.0",
+ "tonic 0.8.3",
+ "tonic-build 0.8.4",
"tracing",
"tracing-subscriber",
"tunnelbroker_messages",
diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml
--- a/services/tunnelbroker/Cargo.toml
+++ b/services/tunnelbroker/Cargo.toml
@@ -13,12 +13,14 @@
aws-sdk-dynamodb = "0.27"
clap = { version = "4.2", features = ["derive", "env"] }
futures-util = "0.3"
+grpc_clients = { path = "../../shared/grpc_clients" }
hyper = "0.14"
hyper-tungstenite = "0.11"
once_cell = "1.17"
prost = "0.11"
serde_json = "1.0"
-tokio = { version = "1.24", features = ["rt-multi-thread"] }
+tokio = { version = "1.24", features = ["rt-multi-thread"]}
+tokio-tungstenite = { version = "0.18.0", features = [ ] }
tonic = "0.8"
tracing = "0.1"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
diff --git a/services/tunnelbroker/src/config.rs b/services/tunnelbroker/src/config.rs
--- a/services/tunnelbroker/src/config.rs
+++ b/services/tunnelbroker/src/config.rs
@@ -21,6 +21,10 @@
#[arg(env = "LOCALSTACK_ENDPOINT")]
#[arg(long)]
pub localstack_endpoint: Option<String>,
+ /// Comm Identity service URL
+ #[arg(env = "COMM_TUNNELBROKER_IDENTITY_ENDPOINT")]
+ #[arg(long, default_value = "http://localhost:50054")]
+ pub identity_endpoint: String,
}
/// Stores configuration parsed from command-line arguments
diff --git a/services/tunnelbroker/src/database/message.rs b/services/tunnelbroker/src/database/message.rs
--- a/services/tunnelbroker/src/database/message.rs
+++ b/services/tunnelbroker/src/database/message.rs
@@ -13,7 +13,7 @@
pub payload: String,
}
-#[derive(Debug, derive_more::Display)]
+#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum MessageErrors {
SerializationError,
}
diff --git a/services/tunnelbroker/src/error.rs b/services/tunnelbroker/src/error.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/error.rs
@@ -0,0 +1,19 @@
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
+pub enum Error {
+ #[display(...)]
+ TonicError(tonic::transport::Error),
+ #[display(...)]
+ ClientError(grpc_clients::tonic::Status),
+ #[display(...)]
+ ServerError(tonic::Status),
+ #[display(...)]
+ GrpcClient(grpc_clients::error::Error),
+ #[display(...)]
+ SessionError(crate::websockets::session::SessionError),
+ #[display(...)]
+ LapinError(lapin::Error),
+ #[display(...)]
+ SerdeError(serde_json::Error),
+}
diff --git a/services/tunnelbroker/src/identity/mod.rs b/services/tunnelbroker/src/identity/mod.rs
new file mode 100644
--- /dev/null
+++ b/services/tunnelbroker/src/identity/mod.rs
@@ -0,0 +1,27 @@
+use client_proto::VerifyUserAccessTokenRequest;
+use grpc_clients::identity;
+use grpc_clients::tonic::Request;
+use identity::get_unauthenticated_client;
+use identity::protos::unauthenticated as client_proto;
+
+use crate::config::CONFIG;
+use crate::error::Error;
+
+/// Returns true if access token is valid
+pub async fn verify_user_access_token(
+ user_id: &str,
+ device_id: &str,
+ access_token: &str,
+) -> Result<bool, Error> {
+ let mut grpc_client =
+ get_unauthenticated_client(&CONFIG.identity_endpoint).await?;
+ let message = VerifyUserAccessTokenRequest {
+ user_id: user_id.to_string(),
+ signing_public_key: device_id.to_string(),
+ access_token: access_token.to_string(),
+ };
+
+ let request = Request::new(message);
+ let response = grpc_client.verify_user_access_token(request).await?;
+ Ok(response.into_inner().token_valid)
+}
diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs
--- a/services/tunnelbroker/src/main.rs
+++ b/services/tunnelbroker/src/main.rs
@@ -2,7 +2,9 @@
pub mod config;
pub mod constants;
pub mod database;
+pub mod error;
pub mod grpc;
+pub mod identity;
pub mod websockets;
use anyhow::{anyhow, Result};
diff --git a/services/tunnelbroker/src/websockets/mod.rs b/services/tunnelbroker/src/websockets/mod.rs
--- a/services/tunnelbroker/src/websockets/mod.rs
+++ b/services/tunnelbroker/src/websockets/mod.rs
@@ -1,4 +1,4 @@
-mod session;
+pub mod session;
use crate::database::DatabaseClient;
use crate::websockets::session::SessionError;
diff --git a/services/tunnelbroker/src/websockets/session.rs b/services/tunnelbroker/src/websockets/session.rs
--- a/services/tunnelbroker/src/websockets/session.rs
+++ b/services/tunnelbroker/src/websockets/session.rs
@@ -8,10 +8,12 @@
use lapin::types::FieldTable;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
-use tracing::{debug, error};
+use tracing::{debug, error, info};
use tunnelbroker_messages::{session::DeviceTypes, Messages};
use crate::database::{self, DatabaseClient, DeviceMessage};
+use crate::error::Error;
+use crate::identity;
pub struct DeviceInfo {
pub device_id: String,
@@ -29,12 +31,16 @@
amqp_consumer: lapin::Consumer,
}
-#[derive(Debug, derive_more::Display, derive_more::From)]
+#[derive(
+ Debug, derive_more::Display, derive_more::From, derive_more::Error,
+)]
pub enum SessionError {
InvalidMessage,
SerializationError(serde_json::Error),
MessageError(database::MessageErrors),
AmqpError(lapin::Error),
+ InternalError,
+ UnauthorizedDevice,
}
pub fn consume_error<T>(result: Result<T, SessionError>) {
@@ -44,9 +50,9 @@
}
// Parse a session request and retrieve the device information
-pub fn handle_first_message_from_device(
+pub async fn handle_first_message_from_device(
message: &str,
-) -> Result<DeviceInfo, SessionError> {
+) -> Result<DeviceInfo, Error> {
let serialized_message = serde_json::from_str::<Messages>(message)?;
match serialized_message {
@@ -59,11 +65,37 @@
device_os: session_info.device_os.take(),
};
+ // Authenticate device
+ debug!("Authenticating device: {}", &session_info.device_id);
+ let auth_request = identity::verify_user_access_token(
+ &session_info.user_id,
+ &device_info.device_id,
+ &session_info.access_token,
+ )
+ .await;
+
+ match auth_request {
+ Err(e) => {
+ debug!("Failed to complete request to identity service: {:?}", e);
+ return Err(SessionError::InternalError.into());
+ }
+ Ok(false) => {
+ info!("Device failed authentication: {}", &session_info.device_id);
+ return Err(SessionError::UnauthorizedDevice.into());
+ }
+ Ok(true) => {
+ debug!(
+ "Successfully authenticated device: {}",
+ &session_info.device_id
+ );
+ }
+ }
+
Ok(device_info)
}
_ => {
debug!("Received invalid request");
- Err(SessionError::InvalidMessage)
+ Err(SessionError::InvalidMessage.into())
}
}
}
@@ -74,12 +106,14 @@
db_client: DatabaseClient,
frame: Message,
amqp_channel: &lapin::Channel,
- ) -> Result<WebsocketSession<S>, SessionError> {
+ ) -> Result<WebsocketSession<S>, Error> {
let device_info = match frame {
- Message::Text(payload) => handle_first_message_from_device(&payload)?,
+ Message::Text(payload) => {
+ handle_first_message_from_device(&payload).await?
+ }
_ => {
error!("Client sent wrong frame type for establishing connection");
- return Err(SessionError::InvalidMessage);
+ return Err(SessionError::InvalidMessage.into());
}
};

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 4:56 AM (18 h, 3 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2569000
Default Alt Text
D8918.id31125.diff (10 KB)

Event Timeline