diff --git a/services/reports/src/http/handlers.rs b/services/reports/src/http/handlers.rs new file mode 100644 index 000000000..63020b909 --- /dev/null +++ b/services/reports/src/http/handlers.rs @@ -0,0 +1,35 @@ +use actix_web::{post, web, HttpResponse}; +use serde::Deserialize; + +use crate::report_types::ReportInput; +use crate::service::ReportsService; + +/// POST endpoint accepts either a single report Object +/// or an array of reports +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum PostReportsPayload { + Single(ReportInput), + Multiple(Vec), +} +impl PostReportsPayload { + pub fn into_vec(self) -> Vec { + match self { + Self::Single(report) => vec![report], + Self::Multiple(reports) => reports, + } + } +} + +#[post("/reports")] +async fn post_reports( + payload: web::Json, + service: ReportsService, +) -> actix_web::Result { + use serde_json::json; + + let payload = payload.into_inner(); + let ids = service.save_reports(payload.into_vec()).await?; + let response = HttpResponse::Created().json(json!({ "reportIDs": ids })); + Ok(response) +} diff --git a/services/reports/src/http.rs b/services/reports/src/http/mod.rs similarity index 98% rename from services/reports/src/http.rs rename to services/reports/src/http/mod.rs index f09d2e2bf..b0e0ec7d5 100644 --- a/services/reports/src/http.rs +++ b/services/reports/src/http/mod.rs @@ -1,99 +1,102 @@ use actix_web::error::{ ErrorBadRequest, ErrorInternalServerError, ErrorServiceUnavailable, ErrorUnsupportedMediaType, }; use actix_web::{web, App, HttpResponse, HttpServer, ResponseError}; use anyhow::Result; use http::StatusCode; use tracing::{debug, error, info, trace, warn}; use crate::config::CONFIG; use crate::constants::REQUEST_BODY_JSON_SIZE_LIMIT; use crate::service::{ReportsService, ReportsServiceError}; +mod handlers; + pub async fn run_http_server(service: ReportsService) -> Result<()> { use actix_web::middleware::{Logger, NormalizePath}; use comm_services_lib::http::cors_config; use tracing_actix_web::TracingLogger; info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { let json_cfg = web::JsonConfig::default().limit(REQUEST_BODY_JSON_SIZE_LIMIT); App::new() .app_data(json_cfg) .app_data(service.to_owned()) .wrap(Logger::default()) .wrap(TracingLogger::default()) .wrap(NormalizePath::trim()) .wrap(cors_config(CONFIG.is_dev())) // Health endpoint for load balancers checks .route("/health", web::get().to(HttpResponse::Ok)) + .service(handlers::post_reports) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } fn handle_reports_service_error(err: &ReportsServiceError) -> actix_web::Error { use aws_sdk_dynamodb::Error as DynamoDBError; use comm_services_lib::database::Error as DBError; trace!("Handling reports service error: {:?}", err); match err { ReportsServiceError::UnsupportedReportType => { ErrorUnsupportedMediaType("unsupported report type") } ReportsServiceError::SerdeError(err) => { error!("Serde error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::ParseError(err) => { debug!("Parse error: {0:?} - {0}", err); ErrorBadRequest("invalid input format") } ReportsServiceError::BlobError(err) => { error!("Blob Service error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::DatabaseError(db_err) => match db_err { // retriable errors DBError::MaxRetriesExceeded | DBError::AwsSdk( DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_), ) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } err => { error!("Unexpected database error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } }, #[allow(unreachable_patterns)] err => { error!("Received an unexpected error: {0:?} - {0}", err); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for ReportsServiceError { fn error_response(&self) -> HttpResponse { handle_reports_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_reports_service_error(self) .as_response_error() .status_code() } } diff --git a/services/reports/src/report_types.rs b/services/reports/src/report_types.rs index 899b77a51..1ebbf13bd 100644 --- a/services/reports/src/report_types.rs +++ b/services/reports/src/report_types.rs @@ -1,114 +1,117 @@ // Report ID use std::collections::HashMap; use chrono::{serde::ts_milliseconds_option, DateTime, Utc}; use derive_more::{Deref, Display, Into}; use num_derive::FromPrimitive; use serde::{de::Error, Deserialize, Serialize}; use serde_repr::Deserialize_repr; #[derive(Clone, Debug, Deref, Serialize, Into)] -#[repr(transparent)] +#[serde(transparent)] pub struct ReportID(String); impl Default for ReportID { fn default() -> Self { let uuid = uuid::Uuid::new_v4(); ReportID(uuid.to_string()) } } impl From for ReportID { fn from(value: String) -> Self { ReportID(value) } } /// Serialized / deserialized report type. /// We receive report type from clients as a number, /// but want to display it as a string. #[derive( Copy, Clone, Debug, Default, FromPrimitive, Serialize, Deserialize_repr, )] #[repr(u8)] #[serde(rename_all(serialize = "snake_case"))] pub enum ReportType { // NOTE: Keep these in sync with `reportTypes` in lib/types/report-types.js #[default] ErrorReport = 0, ThreadInconsistency = 1, EntryInconsistency = 2, MediaMission = 3, UserInconsistency = 4, } /// Report platform #[derive(Clone, Debug, Serialize, Deserialize, Display)] #[serde(rename_all = "lowercase")] pub enum ReportPlatform { Android, IOS, Web, Windows, MacOS, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PlatformDetails { pub platform: ReportPlatform, code_version: Option, state_version: Option, } /// Input report payload - this is the JSON we receive from clients #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] #[serde(remote = "Self")] // we delegate to our custom validation trait pub struct ReportInput { pub platform_details: PlatformDetails, #[serde(rename = "type")] #[serde(default)] pub report_type: ReportType, #[serde(default)] #[serde(with = "ts_milliseconds_option")] pub time: Option>, // we usually don't care about the rest of the fields // so we just keep them as a JSON object #[serde(flatten)] pub report_content: HashMap, } // We can do additional validation here impl<'de> serde::de::Deserialize<'de> for ReportInput { fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { let mut this = Self::deserialize(deserializer)?; if this.time.is_none() { - if !matches!(this.report_type, ReportType::ThreadInconsistency) { + if !matches!( + this.report_type, + ReportType::ThreadInconsistency | ReportType::ErrorReport + ) { return Err(Error::custom( - "The 'time' field is optional only for thread inconsistency reports", + "The 'time' field is optional only for thread inconsistency and error reports", )); } this.time = Some(Utc::now()); } Ok(this) } } /// Report output payload - this is used to view the report #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct ReportOutput { pub id: ReportID, #[serde(rename = "userID")] pub user_id: String, pub platform: ReportPlatform, pub report_type: ReportType, pub creation_time: DateTime, pub content: HashMap, } diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs index 32fbff1a8..cd92dc279 100644 --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -1,92 +1,131 @@ use actix_web::FromRequest; use comm_services_lib::{ auth::UserIdentity, blob::client::{BlobServiceClient, BlobServiceError}, database, }; use derive_more::{Display, Error, From}; use std::future::{ready, Ready}; +use tracing::error; -use crate::database::client::DatabaseClient; +use crate::{ + database::{client::DatabaseClient, item::ReportItem}, + report_types::{ReportID, ReportInput}, +}; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { DatabaseError(database::Error), BlobError(BlobServiceError), /// Error during parsing user input /// Usually this indicates user error #[from(ignore)] ParseError(serde_json::Error), /// Error during serializing/deserializing internal data /// This is usually a service bug / data inconsistency #[from(ignore)] SerdeError(serde_json::Error), /// Unsupported report type /// Returned when trying to perform an operation on an incompatible report type /// e.g. create a Redux Devtools import from a media mission report UnsupportedReportType, /// Unexpected error Unexpected, } type ServiceResult = Result; #[derive(Clone)] pub struct ReportsService { db: DatabaseClient, blob_client: BlobServiceClient, requesting_user_id: Option, } impl ReportsService { pub fn new(db: DatabaseClient, blob_client: BlobServiceClient) -> Self { Self { db, blob_client, requesting_user_id: None, } } pub fn authenticated(&self, user: UserIdentity) -> Self { let user_id = user.user_id.to_string(); Self { db: self.db.clone(), blob_client: self.blob_client.with_user_identity(user), requesting_user_id: Some(user_id), } } + + pub async fn save_reports( + &self, + reports: Vec, + ) -> ServiceResult> { + let mut items = Vec::with_capacity(reports.len()); + let mut tasks = tokio::task::JoinSet::new(); + + // 1. Concurrently upload reports to blob service if needed + for input in reports { + let blob_client = self.blob_client.clone(); + let user_id = self.requesting_user_id.clone(); + tasks.spawn(async move { + let mut item = ReportItem::from_input(input, user_id) + .map_err(ReportsServiceError::SerdeError)?; + item.ensure_size_constraints(&blob_client).await?; + Ok(item) + }); + } + + // 2. Wait for all uploads to complete and collect results + // If any of them failed, abort + while let Some(task) = tasks.join_next().await { + let result: Result<_, ReportsServiceError> = task.map_err(|err| { + error!("Task failed to join: {err}"); + ReportsServiceError::Unexpected + })?; + items.push(result?); + } + + // 3. Store reports in database + let ids = items.iter().map(|item| item.id.clone()).collect(); + self.db.save_reports(items).await?; + Ok(ids) + } } impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Ready>; #[inline] fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::HttpMessage; let Some(service) = req.app_data::() else { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); return ready(Err(actix_web::error::ErrorInternalServerError("Internal server error"))); }; let auth_service = if let Some(user_identity) = req.extensions().get::() { tracing::trace!("Found user identity. Creating authenticated service"); service.authenticated(user_identity.clone()) } else { tracing::trace!( "No user identity found. Leaving unauthenticated service" ); service.clone() }; ready(Ok(auth_service)) } }