diff --git a/services/blob/Cargo.toml b/services/blob/Cargo.toml index 9531f9a71..c29f7b392 100644 --- a/services/blob/Cargo.toml +++ b/services/blob/Cargo.toml @@ -1,37 +1,37 @@ [package] name = "blob" version = "1.0.0" description = "Blob service" edition.workspace = true license.workspace = true homepage.workspace = true [dependencies] actix-multipart = { workspace = true } actix-web = { workspace = true } anyhow = { workspace = true } async-stream = { workspace = true } aws-config = { workspace = true } aws-sdk-dynamodb = { workspace = true } aws-sdk-s3 = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } comm-lib = { path = "../../shared/comm-lib", features = [ "http", "aws", "grpc_clients", ] } derive_more = { workspace = true } http = { workspace = true } once_cell = { workspace = true } prost = { workspace = true } regex = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["rt-multi-thread"] } tokio-stream = { workspace = true } tonic = "0.8" tracing = { workspace = true } tracing-actix-web = { workspace = true } tracing-futures = { workspace = true, features = ["futures-03"] } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } serde_json = { workspace = true } diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs index fdb2a298c..4cf3e0bdc 100644 --- a/services/blob/src/constants.rs +++ b/services/blob/src/constants.rs @@ -1,43 +1,44 @@ // Assorted constants pub const DEFAULT_HTTP_PORT: u16 = 50053; pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1; // HTTP constants pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // DynamoDB constants pub mod db { /// Reserved holder value that indicates the row is a blob item pub const BLOB_ITEM_ROW_HOLDER_VALUE: &str = "_"; pub const BLOB_TABLE_NAME: &str = "blob-service-blobs"; pub const BLOB_PARTITION_KEY: &str = ATTR_BLOB_HASH; pub const BLOB_SORT_KEY: &str = ATTR_HOLDER; pub const UNCHECKED_INDEX_NAME: &str = "unchecked-index"; pub const UNCHECKED_INDEX_PARTITION_KEY: &str = ATTR_UNCHECKED; pub const UNCHECKED_INDEX_SORT_KEY: &str = ATTR_LAST_MODIFIED; /// attribute names pub const ATTR_BLOB_HASH: &str = "blob_hash"; pub const ATTR_HOLDER: &str = "holder"; pub const ATTR_CREATED_AT: &str = "created_at"; pub const ATTR_LAST_MODIFIED: &str = "last_modified"; pub const ATTR_S3_PATH: &str = "s3_path"; pub const ATTR_UNCHECKED: &str = "unchecked"; } // Environment variables +pub const COMM_SERVICES_USE_JSON_LOGS: &str = "COMM_SERVICES_USE_JSON_LOGS"; pub const LOG_LEVEL_ENV_VAR: &str = tracing_subscriber::filter::EnvFilter::DEFAULT_ENV; // S3 constants pub const S3_BUCKET_ENV_VAR: &str = "BLOB_S3_BUCKET_NAME"; pub const DEFAULT_S3_BUCKET_NAME: &str = "commapp-blob"; pub const S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE: u64 = 5 * 1024 * 1024; pub const INVITE_LINK_BLOB_HASH_PREFIX: &str = "invite_"; diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs index 9e8fde369..91e3b9eeb 100644 --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,55 +1,71 @@ pub mod config; pub mod constants; pub mod database; pub mod http; pub mod s3; pub mod service; pub mod tools; use anyhow::Result; use comm_lib::auth::AuthService; use config::Command; +use constants::COMM_SERVICES_USE_JSON_LOGS; +use std::env; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use crate::service::BlobServiceConfig; fn configure_logging() -> Result<()> { + let use_json_logs: bool = env::var(COMM_SERVICES_USE_JSON_LOGS) + .unwrap_or("false".to_string()) + .parse() + .unwrap_or_default(); + let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); - let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); - tracing::subscriber::set_global_default(subscriber)?; + if use_json_logs { + let subscriber = tracing_subscriber::fmt() + .json() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber)?; + } else { + let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); + tracing::subscriber::set_global_default(subscriber)?; + } + Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; let config = config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db = database::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); let auth_service = AuthService::new(&aws_config, &config.identity_endpoint); let blob_service = service::BlobService::new( db, s3, BlobServiceConfig { instant_delete_orphaned_blobs: config.instant_delete, // orphan_protection_period: chrono::Duration::milliseconds(1), ..Default::default() }, ); match &config.command { Some(Command::Cleanup) => blob_service.perform_cleanup().await?, None | Some(Command::Server) => { crate::http::run_http_server(blob_service, auth_service).await? } }; Ok(()) } diff --git a/services/terraform/remote/service_blob.tf b/services/terraform/remote/service_blob.tf index 6bf170519..1d0627094 100644 --- a/services/terraform/remote/service_blob.tf +++ b/services/terraform/remote/service_blob.tf @@ -1,201 +1,205 @@ locals { blob_service_image_tag = local.is_staging ? "1.1.2-staging" : "1.1.2" blob_service_container_name = "blob-service-server" blob_service_server_image = "commapp/blob-server:${local.blob_service_image_tag}" # HTTP port & configuration for ECS Service Connect blob_service_container_http_port = 50053 blob_sc_port_name = "blob-service-ecs-http" blob_sc_dns_name = "blob-service" # URL accessible by other services in the same Service Connect namespace # This renders to 'http://blob-service:50053' blob_local_url = "http://${local.blob_sc_dns_name}:${local.blob_service_container_http_port}" blob_service_container_grpc_port = 50051 blob_service_grpc_public_port = 50053 blob_service_domain_name = "blob.${local.root_domain}" blob_service_s3_bucket = "commapp-blob${local.s3_bucket_name_suffix}" } resource "aws_ecs_task_definition" "blob_service" { family = "blob-service-task-def" container_definitions = jsonencode([ { name = local.blob_service_container_name image = local.blob_service_server_image essential = true portMappings = [ { name = local.blob_sc_port_name containerPort = local.blob_service_container_http_port protocol = "tcp" appProtocol = "http" } ] environment = [ { name = "RUST_LOG" value = local.is_staging ? "info,blob=debug,comm_lib=debug" : "info" }, { name = "BLOB_S3_BUCKET_NAME", value = local.blob_service_s3_bucket }, { name = "IDENTITY_SERVICE_ENDPOINT", value = local.identity_local_url }, { name = "COMM_SERVICES_DISABLE_CSAT_VERIFICATION", value = local.is_staging ? "false" : "true" + }, + { + name = "COMM_SERVICES_USE_JSON_LOGS", + value = local.comm_services_use_json_logs } ] logConfiguration = { "logDriver" = "awslogs" "options" = { "awslogs-create-group" = "true" "awslogs-group" = "/ecs/blob-service-task-def" "awslogs-region" = "us-east-2" "awslogs-stream-prefix" = "ecs" } } } ]) task_role_arn = aws_iam_role.services_ddb_full_access.arn execution_role_arn = aws_iam_role.ecs_task_execution.arn network_mode = "bridge" cpu = "512" memory = "512" requires_compatibilities = ["EC2"] # Set this to true if you want to keep old revisions # when this definition is changed skip_destroy = false } resource "aws_ecs_service" "blob_service" { name = "blob-service" cluster = aws_ecs_cluster.comm_services.id launch_type = "EC2" task_definition = aws_ecs_task_definition.blob_service.arn force_new_deployment = true desired_count = 1 lifecycle { ignore_changes = [desired_count] } # Expose Blob service to other services in the cluster service_connect_configuration { enabled = true service { discovery_name = local.blob_sc_dns_name port_name = local.blob_sc_port_name client_alias { port = local.blob_service_container_http_port dns_name = local.blob_sc_dns_name } } } # HTTP load_balancer { target_group_arn = aws_lb_target_group.blob_service_http.arn container_name = local.blob_service_container_name container_port = local.blob_service_container_http_port } deployment_circuit_breaker { enable = true rollback = true } } # Security group to configure access to the service resource "aws_security_group" "blob_service" { name = "blob-service-ecs-sg" vpc_id = aws_vpc.default.id ingress { from_port = local.blob_service_container_http_port to_port = local.blob_service_container_http_port protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] description = "HTTP port" } # Allow all outbound traffic egress { from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] } lifecycle { create_before_destroy = true } } resource "aws_lb_target_group" "blob_service_http" { name = "blob-service-ecs-http-tg" port = local.blob_service_container_http_port protocol = "HTTP" vpc_id = aws_vpc.default.id # ECS Fargate requires target type set to IP target_type = "instance" health_check { enabled = true healthy_threshold = 2 unhealthy_threshold = 3 protocol = "HTTP" path = "/health" matcher = "200-499" } } # Load Balancer resource "aws_lb" "blob_service" { load_balancer_type = "application" name = "blob-service-lb" internal = false #security_groups = [aws_security_group.blob_service.id] subnets = [ aws_subnet.public_a.id, aws_subnet.public_b.id, aws_subnet.public_c.id, ] } resource "aws_lb_listener" "blob_service_https" { load_balancer_arn = aws_lb.blob_service.arn port = "443" protocol = "HTTPS" ssl_policy = "ELBSecurityPolicy-TLS13-1-2-2021-06" certificate_arn = data.aws_acm_certificate.blob_service.arn default_action { type = "forward" target_group_arn = aws_lb_target_group.blob_service_http.arn } lifecycle { # Required only for existing resources to avoid plan difference ignore_changes = [default_action[0].forward[0].stickiness[0].duration] # Target group cannot be destroyed if it is used replace_triggered_by = [aws_lb_target_group.blob_service_http] } } # SSL Certificate data "aws_acm_certificate" "blob_service" { domain = local.blob_service_domain_name statuses = ["ISSUED"] }