diff --git a/services/reports/src/http/handlers.rs b/services/reports/src/http/handlers.rs new file mode 100644 --- /dev/null +++ b/services/reports/src/http/handlers.rs @@ -0,0 +1,35 @@ +use actix_web::{post, web, HttpResponse}; +use serde::Deserialize; + +use crate::report_types::ReportInput; +use crate::service::ReportsService; + +/// POST endpoint accepts either a single report Object +/// or an array of reports +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum PostReportsPayload { + Single(ReportInput), + Multiple(Vec), +} +impl PostReportsPayload { + pub fn into_vec(self) -> Vec { + match self { + Self::Single(report) => vec![report], + Self::Multiple(reports) => reports, + } + } +} + +#[post("/reports")] +async fn post_reports( + payload: web::Json, + service: ReportsService, +) -> actix_web::Result { + use serde_json::json; + + let payload = payload.into_inner(); + let ids = service.save_reports(payload.into_vec()).await?; + let response = HttpResponse::Created().json(json!({ "reportIDs": ids })); + Ok(response) +} diff --git a/services/reports/src/http.rs b/services/reports/src/http/mod.rs rename from services/reports/src/http.rs rename to services/reports/src/http/mod.rs --- a/services/reports/src/http.rs +++ b/services/reports/src/http/mod.rs @@ -11,6 +11,8 @@ use crate::constants::REQUEST_BODY_JSON_SIZE_LIMIT; use crate::service::{ReportsService, ReportsServiceError}; +mod handlers; + pub async fn run_http_server(service: ReportsService) -> Result<()> { use actix_web::middleware::{Logger, NormalizePath}; use comm_services_lib::http::cors_config; @@ -32,6 +34,7 @@ .wrap(cors_config(CONFIG.is_dev())) // Health endpoint for load balancers checks .route("/health", web::get().to(HttpResponse::Ok)) + .service(handlers::post_reports) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() diff --git a/services/reports/src/report_types.rs b/services/reports/src/report_types.rs --- a/services/reports/src/report_types.rs +++ b/services/reports/src/report_types.rs @@ -9,7 +9,7 @@ use serde_repr::Deserialize_repr; #[derive(Clone, Debug, Deref, Serialize, Into)] -#[repr(transparent)] +#[serde(transparent)] pub struct ReportID(String); impl Default for ReportID { fn default() -> Self { @@ -89,9 +89,12 @@ { let mut this = Self::deserialize(deserializer)?; if this.time.is_none() { - if !matches!(this.report_type, ReportType::ThreadInconsistency) { + if !matches!( + this.report_type, + ReportType::ThreadInconsistency | ReportType::ErrorReport + ) { return Err(Error::custom( - "The 'time' field is optional only for thread inconsistency reports", + "The 'time' field is optional only for thread inconsistency and error reports", )); } this.time = Some(Utc::now()); diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -6,8 +6,12 @@ }; use derive_more::{Display, Error, From}; use std::future::{ready, Ready}; +use tracing::error; -use crate::database::client::DatabaseClient; +use crate::{ + database::{client::DatabaseClient, item::ReportItem}, + report_types::{ReportID, ReportInput}, +}; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { @@ -55,6 +59,41 @@ requesting_user_id: Some(user_id), } } + + pub async fn save_reports( + &self, + reports: Vec, + ) -> ServiceResult> { + let mut items = Vec::with_capacity(reports.len()); + let mut tasks = tokio::task::JoinSet::new(); + + // 1. Concurrently upload reports to blob service if needed + for input in reports { + let blob_client = self.blob_client.clone(); + let user_id = self.requesting_user_id.clone(); + tasks.spawn(async move { + let mut item = ReportItem::from_input(input, user_id) + .map_err(ReportsServiceError::SerdeError)?; + item.ensure_size_constraints(&blob_client).await?; + Ok(item) + }); + } + + // 2. Wait for all uploads to complete and collect results + // If any of them failed, abort + while let Some(task) = tasks.join_next().await { + let result: Result<_, ReportsServiceError> = task.map_err(|err| { + error!("Task failed to join: {err}"); + ReportsServiceError::Unexpected + })?; + items.push(result?); + } + + // 3. Store reports in database + let ids = items.iter().map(|item| item.id.clone()).collect(); + self.db.save_reports(items).await?; + Ok(ids) + } } impl FromRequest for ReportsService {