diff --git a/services/reports/src/config.rs b/services/reports/src/config.rs index 3ee1a096e..c88d04d60 100644 --- a/services/reports/src/config.rs +++ b/services/reports/src/config.rs @@ -1,53 +1,78 @@ use anyhow::Result; use clap::Parser; use comm_services_lib::blob::client::Url; use once_cell::sync::Lazy; -use tracing::info; +use tracing::{info, warn}; -// environment variabl names +use crate::email::config::{EmailArgs, EmailConfig}; + +// environment variable names const ENV_LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; const ENV_BLOB_SERVICE_URL: &str = "BLOB_SERVICE_URL"; #[derive(Parser)] #[command(version, about, long_about = None)] pub struct AppConfig { /// HTTP server listening port #[arg(long, default_value_t = 50056)] pub http_port: u16, #[arg(env = ENV_BLOB_SERVICE_URL)] #[arg(long, default_value = "http://localhost:50053")] pub blob_service_url: Url, /// AWS Localstack service URL #[arg(env = ENV_LOCALSTACK_ENDPOINT)] #[arg(long)] localstack_endpoint: Option, + + /// This config shouldn't be used directly. It's used for parsing purposes + /// only. Use [`AppConfig::email_config()`] instead. + #[command(flatten)] + email_args: EmailArgs, } impl AppConfig { pub fn is_dev(&self) -> bool { self.localstack_endpoint.is_some() } + + pub fn email_config(&self) -> Option { + // we return None in case of error because this should've already been + // checked by parse_cmdline_args() + self.email_args.parse().ok().flatten() + } } /// Stores configuration parsed from command-line arguments /// and environment variables pub static CONFIG: Lazy = Lazy::new(AppConfig::parse); /// Processes the command-line arguments and environment variables. /// Should be called at the beginning of the `main()` function. pub(super) fn parse_cmdline_args() -> Result<&'static AppConfig> { // force evaluation of the lazy initialized config - Ok(Lazy::force(&CONFIG)) + let cfg = Lazy::force(&CONFIG); + + // initialize e-mail config to check for errors + match cfg.email_args.parse()? { + Some(_) => { + info!("E-mail config found. E-mail notifications are enabled."); + } + None => { + warn!("E-mail config is disabled or missing! E-mails will not be sent."); + } + } + + Ok(cfg) } /// Provides region/credentials configuration for AWS SDKs pub async fn load_aws_config() -> aws_config::SdkConfig { let mut config_builder = aws_config::from_env(); if let Some(endpoint) = &CONFIG.localstack_endpoint { info!("Using Localstack. AWS Endpoint URL: {}", endpoint); config_builder = config_builder.endpoint_url(endpoint); } config_builder.load().await } diff --git a/services/reports/src/main.rs b/services/reports/src/main.rs index fefba5ca6..558232a40 100644 --- a/services/reports/src/main.rs +++ b/services/reports/src/main.rs @@ -1,40 +1,41 @@ pub mod config; pub mod constants; pub mod database; pub mod email; pub mod http; pub mod report_types; pub mod service; use anyhow::Result; use comm_services_lib::blob::client::BlobServiceClient; use service::ReportsService; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(EnvFilter::DEFAULT_ENV) .from_env_lossy(); // init HTTP logger - it relies on 'log' instead of 'tracing' // so we have to initialize a polyfill tracing_log::LogTracer::init()?; let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; let cfg = config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; + let email_config = cfg.email_config(); let db = database::client::DatabaseClient::new(&aws_config); let blob_client = BlobServiceClient::new(cfg.blob_service_url.clone()); - let service = ReportsService::new(db, blob_client); + let service = ReportsService::new(db, blob_client, email_config); crate::http::run_http_server(service).await } diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs index 17f2d46e6..34bb2e3c4 100644 --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -1,219 +1,228 @@ use actix_web::FromRequest; use comm_services_lib::{ auth::UserIdentity, blob::client::{BlobServiceClient, BlobServiceError}, database, }; use derive_more::{Display, Error, From}; use std::{ collections::HashMap, future::{ready, Ready}, + sync::Arc, }; use tracing::error; use crate::{ database::{ client::{DatabaseClient, ReportsPage}, item::ReportItem, }, + email::config::EmailConfig, report_types::{ReportID, ReportInput, ReportOutput, ReportType}, }; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { DatabaseError(database::Error), BlobError(BlobServiceError), /// Error during parsing user input /// Usually this indicates user error #[from(ignore)] ParseError(serde_json::Error), /// Error during serializing/deserializing internal data /// This is usually a service bug / data inconsistency #[from(ignore)] SerdeError(serde_json::Error), /// Unsupported report type /// Returned when trying to perform an operation on an incompatible report type /// e.g. create a Redux Devtools import from a media mission report UnsupportedReportType, /// Unexpected error Unexpected, } type ServiceResult = Result; #[derive(Clone)] pub struct ReportsService { db: DatabaseClient, blob_client: BlobServiceClient, requesting_user_id: Option, + email_config: Option>, } impl ReportsService { - pub fn new(db: DatabaseClient, blob_client: BlobServiceClient) -> Self { + pub fn new( + db: DatabaseClient, + blob_client: BlobServiceClient, + email_config: Option, + ) -> Self { Self { db, blob_client, requesting_user_id: None, + email_config: email_config.map(Arc::new), } } pub fn authenticated(&self, user: UserIdentity) -> Self { let user_id = user.user_id.to_string(); Self { db: self.db.clone(), + email_config: self.email_config.clone(), blob_client: self.blob_client.with_user_identity(user), requesting_user_id: Some(user_id), } } pub async fn save_reports( &self, reports: Vec, ) -> ServiceResult> { let mut items = Vec::with_capacity(reports.len()); let mut tasks = tokio::task::JoinSet::new(); // 1. Concurrently upload reports to blob service if needed for input in reports { let blob_client = self.blob_client.clone(); let user_id = self.requesting_user_id.clone(); tasks.spawn(async move { let mut item = ReportItem::from_input(input, user_id) .map_err(ReportsServiceError::SerdeError)?; item.ensure_size_constraints(&blob_client).await?; Ok(item) }); } // 2. Wait for all uploads to complete and collect results // If any of them failed, abort while let Some(task) = tasks.join_next().await { let result: Result<_, ReportsServiceError> = task.map_err(|err| { error!("Task failed to join: {err}"); ReportsServiceError::Unexpected })?; items.push(result?); } // 3. Store reports in database let ids = items.iter().map(|item| item.id.clone()).collect(); self.db.save_reports(items).await?; Ok(ids) } pub async fn get_report( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report_item) = self.db.get_report(&report_id).await? else { return Ok(None); }; let ReportItem { user_id, report_type, platform, creation_time, content, .. } = report_item; let report_data = content.fetch_bytes(&self.blob_client).await?; let report_json = serde_json::from_slice(report_data.as_slice()) .map_err(ReportsServiceError::SerdeError)?; let output = ReportOutput { id: report_id, user_id, platform, report_type, creation_time, content: report_json, }; Ok(Some(output)) } pub async fn get_redux_devtools_import( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report) = self.get_report(report_id).await? else { return Ok(None); }; if !matches!(report.report_type, ReportType::ErrorReport) { return Err(ReportsServiceError::UnsupportedReportType); }; let redux_devtools_payload = prepare_redux_devtools_import(report.content) .map_err(ReportsServiceError::SerdeError)?; Ok(Some(redux_devtools_payload)) } pub async fn list_reports( &self, cursor: Option, page_size: Option, ) -> ServiceResult { let page = self.db.scan_reports(cursor, page_size).await?; Ok(page) } } impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Ready>; #[inline] fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::HttpMessage; let Some(service) = req.app_data::() else { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); return ready(Err(actix_web::error::ErrorInternalServerError("Internal server error"))); }; let auth_service = if let Some(user_identity) = req.extensions().get::() { tracing::trace!("Found user identity. Creating authenticated service"); service.authenticated(user_identity.clone()) } else { tracing::trace!( "No user identity found. Leaving unauthenticated service" ); service.clone() }; ready(Ok(auth_service)) } } /// Transforms report content JSON into format that can be /// imported into Redux DevTools. fn prepare_redux_devtools_import( mut error_report: HashMap, ) -> Result { use serde_json::{json, map::Map, Value}; let nav_state = error_report.remove("navState"); let actions = error_report.remove("actions"); let mut preloaded_state = error_report .remove("preloadedState") .unwrap_or_else(|| Value::Object(Map::new())); preloaded_state["navState"] = nav_state.into(); preloaded_state["frozen"] = true.into(); preloaded_state["_persist"]["rehydrated"] = false.into(); let preload_state_str = serde_json::to_string(&preloaded_state)?; let payload_str = serde_json::to_string(&actions)?; Ok(json!({ "preloadedState": preload_state_str, "payload": payload_str, })) }