diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -15,12 +15,21 @@ use crate::token_distributor::{TokenDistributor, TokenDistributorConfig}; use amqp_client::amqp; use anyhow::{anyhow, Result}; +use comm_lib::auth::AuthService; use config::CONFIG; use constants::COMM_SERVICES_USE_JSON_LOGS; +use grpc_clients::identity::authenticated::get_services_auth_client; +use grpc_clients::identity::PlatformMetadata; use std::env; use tracing::{self, Level}; use tracing_subscriber::EnvFilter; +// Identity service gRPC clients require a code version and device type. +// We can supply some placeholder values for services for the time being, since +// this metadata is only relevant for devices. +const PLACEHOLDER_CODE_VERSION: u64 = 0; +const DEVICE_TYPE: &str = "service"; + #[tokio::main] async fn main() -> Result<()> { let use_json_logs: bool = env::var(COMM_SERVICES_USE_JSON_LOGS) @@ -71,9 +80,22 @@ farcaster_client.clone(), ); + let auth_service = AuthService::new(&aws_config, &CONFIG.identity_endpoint); + let services_token = auth_service.get_services_token().await?; + let grpc_client = get_services_auth_client( + &CONFIG.identity_endpoint, + services_token.as_str().to_owned(), + PlatformMetadata::new(PLACEHOLDER_CODE_VERSION, DEVICE_TYPE), + ) + .await?; + let token_config = TokenDistributorConfig::default(); - let mut token_distributor = - TokenDistributor::new(db_client.clone(), token_config, &amqp_connection); + let mut token_distributor = TokenDistributor::new( + db_client.clone(), + token_config, + &amqp_connection, + grpc_client, + ); tokio::select! { grpc_result = grpc_server => { diff --git a/services/tunnelbroker/src/token_distributor/mod.rs b/services/tunnelbroker/src/token_distributor/mod.rs --- a/services/tunnelbroker/src/token_distributor/mod.rs +++ b/services/tunnelbroker/src/token_distributor/mod.rs @@ -9,6 +9,7 @@ use crate::token_distributor::token_connection::TokenConnection; use comm_lib::database::Error; use futures_util::future; +use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; use std::collections::HashMap; use tokio::time::interval; use tokio_util::sync::CancellationToken; @@ -19,6 +20,7 @@ config: TokenDistributorConfig, connections: HashMap, amqp_connection: AmqpConnection, + grpc_client: ChainedInterceptedServicesAuthClient, } impl TokenDistributor { @@ -26,6 +28,7 @@ db: DatabaseClient, config: TokenDistributorConfig, amqp_connection: &AmqpConnection, + grpc_client: ChainedInterceptedServicesAuthClient, ) -> Self { info!( "Initializing TokenDistributor - max_connections: {}, \ @@ -53,6 +56,7 @@ config, connections: HashMap::new(), amqp_connection: amqp_connection.clone(), + grpc_client, } } @@ -189,6 +193,7 @@ token_data, self.amqp_connection.clone(), cancel_token.clone(), + self.grpc_client.clone(), ); // Store the cancellation token diff --git a/services/tunnelbroker/src/token_distributor/token_connection.rs b/services/tunnelbroker/src/token_distributor/token_connection.rs --- a/services/tunnelbroker/src/token_distributor/token_connection.rs +++ b/services/tunnelbroker/src/token_distributor/token_connection.rs @@ -3,6 +3,7 @@ use crate::token_distributor::config::TokenDistributorConfig; use crate::token_distributor::error::TokenConnectionError; use futures_util::{SinkExt, StreamExt}; +use grpc_clients::identity::authenticated::ChainedInterceptedServicesAuthClient; use lapin::{options::*, types::FieldTable, ExchangeKind}; use std::time::Duration; use tokio::time::{interval, Instant}; @@ -16,6 +17,7 @@ user_id: String, token_data: String, amqp_connection: AmqpConnection, + grpc_client: ChainedInterceptedServicesAuthClient, } impl TokenConnection { @@ -26,6 +28,7 @@ token_data: String, amqp_connection: AmqpConnection, cancellation_token: CancellationToken, + grpc_client: ChainedInterceptedServicesAuthClient, ) { let connection = Self { db: db.clone(), @@ -33,6 +36,7 @@ user_id: user_id.clone(), token_data, amqp_connection, + grpc_client, }; tokio::spawn(async move { diff --git a/shared/grpc_clients/src/identity/authenticated.rs b/shared/grpc_clients/src/identity/authenticated.rs --- a/shared/grpc_clients/src/identity/authenticated.rs +++ b/shared/grpc_clients/src/identity/authenticated.rs @@ -19,6 +19,7 @@ access_token: String, } +#[derive(Clone, Debug)] pub struct ServicesAuthLayer { services_token: String, } diff --git a/shared/grpc_clients/src/identity/shared.rs b/shared/grpc_clients/src/identity/shared.rs --- a/shared/grpc_clients/src/identity/shared.rs +++ b/shared/grpc_clients/src/identity/shared.rs @@ -12,6 +12,7 @@ pub major_desktop_version: Option, } +#[derive(Clone, Debug)] pub struct CodeVersionLayer { pub(crate) code_version: u64, pub(crate) device_type: String, @@ -74,6 +75,7 @@ } } +#[derive(Clone, Debug)] pub struct ChainedInterceptor where A: Interceptor + Send + Sync + 'static,