diff --git a/services/terraform/remote/service_tunnelbroker.tf b/services/terraform/remote/service_tunnelbroker.tf index 03ce514b2..836d140d5 100644 --- a/services/terraform/remote/service_tunnelbroker.tf +++ b/services/terraform/remote/service_tunnelbroker.tf @@ -1,344 +1,348 @@ locals { tunnelbroker_config = { docker_image = "commapp/tunnelbroker" docker_tag = local.is_staging ? "0.16-staging" : "0.16" grpc_port = 50051 websocket_port = 51001 container_name = "tunnelbroker-server" domain_name = "tunnelbroker.${local.root_domain}" local_dns_name = "tunnelbroker" grpc_port_name = "tunnelbroker_grpc" } # Used for other services to connect to Tunnelbroker gRPC endpoint tunnelbroker_local_grpc_url = "http://${local.tunnelbroker_config.local_dns_name}:${local.tunnelbroker_config.grpc_port}" # utility locals tunnelbroker_docker_image = "${local.tunnelbroker_config.docker_image}:${local.tunnelbroker_config.docker_tag}" rabbitmq_password = local.secrets.amqpPassword[local.environment] apns_config_secret_name = "tunnelbroker/APNsConfig" fcm_config_secret_name = "tunnelbroker/FCMConfig" web_push_config_secret_name = "tunnelbroker/WebPushConfig" wns_config_secret_name = "tunnelbroker/WNSConfig" } data "aws_secretsmanager_secret" "tunnelbroker_apns" { name = local.apns_config_secret_name } data "aws_secretsmanager_secret" "tunnelbroker_fcm" { name = local.fcm_config_secret_name } data "aws_secretsmanager_secret" "tunnelbroker_web_push" { name = local.web_push_config_secret_name } data "aws_secretsmanager_secret" "tunnelbroker_wns" { name = local.wns_config_secret_name } # RabbitMQ resource "aws_mq_broker" "tunnelbroker_rabbitmq" { broker_name = "tunnelbroker-rabbitmq" # Keep RabbitMQ version in sync with docker-compose.yml engine_type = "RabbitMQ" engine_version = "3.11.16" host_instance_type = local.is_staging ? "mq.t3.micro" : "mq.m5.large" apply_immediately = local.is_staging deployment_mode = "SINGLE_INSTANCE" # Access from outside VPC - this allows to access the RabbitMQ console from browser publicly_accessible = true user { username = "comm" password = local.rabbitmq_password } } locals { amqp_endpoint = aws_mq_broker.tunnelbroker_rabbitmq.instances[0].endpoints[0] } # Task definition - defines container resources, ports, # environment variables, docker image etc. resource "aws_ecs_task_definition" "tunnelbroker" { family = "tunnelbroker-task-def" container_definitions = jsonencode([ { name = local.tunnelbroker_config.container_name image = local.tunnelbroker_docker_image essential = true portMappings = [ { name = "tunnelbroker_ws" containerPort = local.tunnelbroker_config.websocket_port protocol = "tcp" appProtocol = "http" }, { name = local.tunnelbroker_config.grpc_port_name containerPort = local.tunnelbroker_config.grpc_port protocol = "tcp" appProtocol = "grpc" } ] environment = [ { name = "RUST_LOG" value = local.is_staging ? "info,tunnelbroker=debug,comm_lib=debug" : "info" }, { name = "AMQP_URI", value = local.amqp_endpoint }, { name = "AMQP_USERNAME" value = "comm" }, { name = "AMQP_PASSWORD" value = nonsensitive(local.rabbitmq_password) }, { name = "COMM_TUNNELBROKER_IDENTITY_ENDPOINT", value = local.identity_local_url + }, + { + name = "COMM_SERVICES_USE_JSON_LOGS", + value = local.comm_services_use_json_logs } ] secrets = [ { name = "APNS_CONFIG" valueFrom = data.aws_secretsmanager_secret.tunnelbroker_apns.arn }, { name = "FCM_CONFIG" valueFrom = data.aws_secretsmanager_secret.tunnelbroker_fcm.arn }, { name = "WEB_PUSH_CONFIG" valueFrom = data.aws_secretsmanager_secret.tunnelbroker_web_push.arn }, { name = "WNS_CONFIG" valueFrom = data.aws_secretsmanager_secret.tunnelbroker_wns.arn } ] logConfiguration = { "logDriver" = "awslogs" "options" = { "awslogs-create-group" = "true" "awslogs-group" = "/ecs/tunnelbroker-task-def" "awslogs-region" = "us-east-2" "awslogs-stream-prefix" = "ecs" } } } ]) task_role_arn = aws_iam_role.services_ddb_full_access.arn execution_role_arn = aws_iam_role.ecs_task_execution.arn network_mode = "bridge" cpu = "256" memory = "256" requires_compatibilities = ["EC2"] # Set this to true if you want to keep old revisions # when this definition is changed skip_destroy = true } # ECS Service - defines task scaling, load balancer connection, # network configuration etc. resource "aws_ecs_service" "tunnelbroker" { name = "tunnelbroker" cluster = aws_ecs_cluster.comm_services.id launch_type = "EC2" task_definition = aws_ecs_task_definition.tunnelbroker.arn force_new_deployment = true desired_count = 1 # Allow external changes without Terraform plan difference # We can freely specify replica count in AWS Console lifecycle { ignore_changes = [desired_count] } service_connect_configuration { enabled = true service { discovery_name = local.tunnelbroker_config.local_dns_name port_name = local.tunnelbroker_config.grpc_port_name client_alias { port = local.tunnelbroker_config.grpc_port dns_name = local.tunnelbroker_config.local_dns_name } } } # Websocket load_balancer { target_group_arn = aws_lb_target_group.tunnelbroker_ws.arn container_name = local.tunnelbroker_config.container_name container_port = local.tunnelbroker_config.websocket_port } # gRPC dynamic "load_balancer" { for_each = aws_lb_listener.tunnelbroker_grpc content { target_group_arn = aws_lb_target_group.tunnelbroker_grpc.arn container_name = local.tunnelbroker_config.container_name container_port = local.tunnelbroker_config.grpc_port } } deployment_circuit_breaker { enable = true rollback = true } } # Security group to configure access to the service resource "aws_security_group" "tunnelbroker" { name = "tunnelbroker-sg" vpc_id = aws_vpc.default.id # Websocket ingress { from_port = local.tunnelbroker_config.websocket_port to_port = local.tunnelbroker_config.websocket_port protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] description = "Websocket port" } # gRPC ingress { from_port = local.tunnelbroker_config.grpc_port to_port = local.tunnelbroker_config.grpc_port protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] description = "gRPC port" } # Allow all outbound traffic egress { from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] } lifecycle { create_before_destroy = true } } # Running service instances are registered here # to be accessed by the load balancer resource "aws_lb_target_group" "tunnelbroker_ws" { name = "tunnelbroker-ws-tg" port = local.tunnelbroker_config.websocket_port protocol = "HTTP" protocol_version = "HTTP1" vpc_id = aws_vpc.default.id target_type = "instance" health_check { enabled = true healthy_threshold = 2 unhealthy_threshold = 3 protocol = "HTTP" path = "/health" matcher = "200" } } /* This is generally a dead (empty) resource on prod, i.e. it should not have * any targets registered. We have gRPC listener resource disabled on prod, * which results in the following exception if any targets are registered here: * "The target group "tunnelbroker-grpc-tg" does not have * an associated load balancer." * * See also `aws_lb_listener.tunnelbroker_grpc` and the "dynamic" block in * `aws_ecs_service.tunnelbroker` on how this is disabled. * The `count` or `for_each` isn't added here to avoid complicating things more. */ resource "aws_lb_target_group" "tunnelbroker_grpc" { name = "tunnelbroker-grpc-tg" port = local.tunnelbroker_config.grpc_port protocol = "HTTP" protocol_version = "GRPC" vpc_id = aws_vpc.default.id target_type = "instance" health_check { enabled = true healthy_threshold = 2 unhealthy_threshold = 3 } } # Load Balancer resource "aws_lb" "tunnelbroker" { load_balancer_type = "application" name = "tunnelbroker-lb" internal = false subnets = [ aws_subnet.public_a.id, aws_subnet.public_b.id, aws_subnet.public_c.id, ] } resource "aws_lb_listener" "tunnelbroker_ws" { load_balancer_arn = aws_lb.tunnelbroker.arn port = local.tunnelbroker_config.websocket_port protocol = "HTTPS" ssl_policy = "ELBSecurityPolicy-2016-08" certificate_arn = data.aws_acm_certificate.tunnelbroker.arn default_action { type = "forward" target_group_arn = aws_lb_target_group.tunnelbroker_ws.arn } lifecycle { ignore_changes = [default_action[0].forward[0].stickiness[0].duration] replace_triggered_by = [aws_lb_target_group.tunnelbroker_ws] } } resource "aws_lb_listener" "tunnelbroker_grpc" { count = local.is_staging ? 1 : 0 load_balancer_arn = aws_lb.tunnelbroker.arn port = local.tunnelbroker_config.grpc_port protocol = "HTTPS" ssl_policy = "ELBSecurityPolicy-2016-08" certificate_arn = data.aws_acm_certificate.tunnelbroker.arn default_action { type = "forward" target_group_arn = aws_lb_target_group.tunnelbroker_grpc.arn } lifecycle { ignore_changes = [default_action[0].forward[0].stickiness[0].duration] replace_triggered_by = [aws_lb_target_group.tunnelbroker_grpc] } } # SSL Certificate data "aws_acm_certificate" "tunnelbroker" { domain = local.tunnelbroker_config.domain_name statuses = ["ISSUED"] } output "rabbitmq_console_url" { value = aws_mq_broker.tunnelbroker_rabbitmq.instances[0].console_url } diff --git a/services/tunnelbroker/Cargo.toml b/services/tunnelbroker/Cargo.toml index 10e789805..50cb28882 100644 --- a/services/tunnelbroker/Cargo.toml +++ b/services/tunnelbroker/Cargo.toml @@ -1,41 +1,41 @@ [package] name = "tunnelbroker" version = "0.5.0" links = "tunnelbroker" description = "Tunnelbroker server" edition.workspace = true license.workspace = true homepage.workspace = true [dependencies] anyhow = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } comm-lib = { path = "../../shared/comm-lib", features = [ "aws", "grpc_clients", ] } futures-util = { workspace = true } grpc_clients = { path = "../../shared/grpc_clients" } hyper = { workspace = true } hyper-tungstenite = { workspace = true } once_cell = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } tonic = "0.8" tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } tunnelbroker_messages = { path = "../../shared/tunnelbroker_messages" } derive_more = { workspace = true } lapin = { workspace = true } chrono = { workspace = true } uuid = { workspace = true, features = ["v4"] } jsonwebtoken = "9.3.0" web-push = { version = "0.10", features = [ "hyper-client", ], default-features = false } reqwest = { workspace = true, features = ["json", "native-tls", "rustls-tls"] } serde.workspace = true [build-dependencies] tonic-build = "0.8" diff --git a/services/tunnelbroker/src/constants.rs b/services/tunnelbroker/src/constants.rs index 5b9b7c58d..ca3b163bd 100644 --- a/services/tunnelbroker/src/constants.rs +++ b/services/tunnelbroker/src/constants.rs @@ -1,63 +1,64 @@ use tokio::time::Duration; pub const GRPC_TX_QUEUE_SIZE: usize = 32; pub const GRPC_SERVER_PORT: u16 = 50051; pub const GRPC_KEEP_ALIVE_PING_INTERVAL: Duration = Duration::from_secs(3); pub const GRPC_KEEP_ALIVE_PING_TIMEOUT: Duration = Duration::from_secs(10); pub const SOCKET_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(3); pub const MAX_RMQ_MSG_PRIORITY: u8 = 10; pub const DDB_RMQ_MSG_PRIORITY: u8 = 10; pub const CLIENT_RMQ_MSG_PRIORITY: u8 = 1; pub const RMQ_CONSUMER_TAG: &str = "tunnelbroker"; pub const WS_SESSION_CLOSE_AMQP_MSG: &str = "SessionClose"; pub const ENV_APNS_CONFIG: &str = "APNS_CONFIG"; pub const ENV_FCM_CONFIG: &str = "FCM_CONFIG"; pub const ENV_WEB_PUSH_CONFIG: &str = "WEB_PUSH_CONFIG"; pub const ENV_WNS_CONFIG: &str = "WNS_CONFIG"; +pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; pub const FCM_ACCESS_TOKEN_GENERATION_THRESHOLD: u64 = 5 * 60; pub const PUSH_SERVICE_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); pub mod dynamodb { // This table holds messages which could not be immediately delivered to // a device. // // - (primary key) = (deviceID: Partition Key, createdAt: Sort Key) // - deviceID: The public key of a device's olm identity key // - payload: Message to be delivered. See shared/tunnelbroker_messages. // - messageID = [createdAt]#[clientMessageID] // - createdAd: UNIX timestamp of when the item was inserted. // Timestamp is needed to order the messages correctly to the device. // Timestamp format is ISO 8601 to handle lexicographical sorting. // - clientMessageID: Message ID generated on client using UUID Version 4. pub mod undelivered_messages { pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages"; pub const PARTITION_KEY: &str = "deviceID"; pub const DEVICE_ID: &str = "deviceID"; pub const PAYLOAD: &str = "payload"; pub const MESSAGE_ID: &str = "messageID"; pub const SORT_KEY: &str = "messageID"; } // This table holds a device token associated with a device. // // - (primary key) = (deviceID: Partition Key) // - deviceID: The public key of a device's olm identity key. // - deviceToken: Token to push services uploaded by device. // - tokenInvalid: Information is token is invalid. pub mod device_tokens { pub const TABLE_NAME: &str = "tunnelbroker-device-tokens"; pub const PARTITION_KEY: &str = "deviceID"; pub const DEVICE_ID: &str = "deviceID"; pub const DEVICE_TOKEN: &str = "deviceToken"; pub const TOKEN_INVALID: &str = "tokenInvalid"; pub const PLATFORM: &str = "platform"; pub const DEVICE_TOKEN_INDEX_NAME: &str = "deviceToken-index"; } } diff --git a/services/tunnelbroker/src/main.rs b/services/tunnelbroker/src/main.rs index 3e209c88c..b5fec5296 100644 --- a/services/tunnelbroker/src/main.rs +++ b/services/tunnelbroker/src/main.rs @@ -1,132 +1,148 @@ pub mod amqp; pub mod config; pub mod constants; pub mod database; pub mod error; pub mod grpc; pub mod identity; pub mod notifs; pub mod websockets; use crate::notifs::apns::APNsClient; use crate::notifs::fcm::FCMClient; use crate::notifs::web_push::WebPushClient; use crate::notifs::wns::WNSClient; use crate::notifs::NotifClient; use anyhow::{anyhow, Result}; use config::CONFIG; +use constants::COMM_SERVICES_USE_JSON_LOGS; +use std::env; use tracing::{self, error, info, Level}; use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<()> { + let use_json_logs: bool = env::var(COMM_SERVICES_USE_JSON_LOGS) + .unwrap_or("false".to_string()) + .parse() + .unwrap_or_default(); + let filter = EnvFilter::builder() .with_default_directive(Level::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); - let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); - tracing::subscriber::set_global_default(subscriber) - .expect("Unable to configure tracing"); + if use_json_logs { + let subscriber = tracing_subscriber::fmt() + .json() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Unable to configure tracing"); + } else { + let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Unable to configure tracing"); + } config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db_client = database::DatabaseClient::new(&aws_config); let amqp_connection = amqp::connect().await; let apns_config = CONFIG.apns_config.clone(); let apns = match apns_config { Some(config) => match APNsClient::new(&config) { Ok(apns_client) => { info!("APNs client created successfully"); Some(apns_client) } Err(err) => { error!("Error creating APNs client: {}", err); None } }, None => { error!("APNs config is missing"); None } }; let fcm_config = CONFIG.fcm_config.clone(); let fcm = match fcm_config { Some(config) => match FCMClient::new(&config) { Ok(fcm_client) => { info!("FCM client created successfully"); Some(fcm_client) } Err(err) => { error!("Error creating FCM client: {}", err); None } }, None => { error!("FCM config is missing"); None } }; let web_push_config = CONFIG.web_push_config.clone(); let web_push = match web_push_config { Some(config) => match WebPushClient::new(&config) { Ok(web_client) => { info!("Web Push client created successfully"); Some(web_client) } Err(err) => { error!("Error creating Web Push client: {}", err); None } }, None => { error!("Web Push config is missing"); None } }; let wns_config = CONFIG.wns_config.clone(); let wns = match wns_config { Some(config) => match WNSClient::new(&config) { Ok(wns_client) => { info!("WNS client created successfully"); Some(wns_client) } Err(err) => { error!("Error creating WNS client: {}", err); None } }, None => { error!("WNS config is missing"); None } }; let notif_client = NotifClient { apns, fcm, web_push, wns, }; let grpc_server = grpc::run_server(db_client.clone(), &amqp_connection); let websocket_server = websockets::run_server( db_client.clone(), &amqp_connection, notif_client.clone(), ); tokio::select! { Ok(_) = grpc_server => { Ok(()) }, Ok(_) = websocket_server => { Ok(()) }, else => { tracing::error!("A grpc or websocket server crashed."); Err(anyhow!("A grpc or websocket server crashed.")) } } }