Page MenuHomePhabricator

D8970.diff
No OneTemporary

D8970.diff

diff --git a/services/reports/src/http/handlers.rs b/services/reports/src/http/handlers.rs
new file mode 100644
--- /dev/null
+++ b/services/reports/src/http/handlers.rs
@@ -0,0 +1,35 @@
+use actix_web::{post, web, HttpResponse};
+use serde::Deserialize;
+
+use crate::report_types::ReportInput;
+use crate::service::ReportsService;
+
+/// POST endpoint accepts either a single report Object
+/// or an array of reports
+#[derive(Debug, Deserialize)]
+#[serde(untagged)]
+enum PostReportsPayload {
+ Single(ReportInput),
+ Multiple(Vec<ReportInput>),
+}
+impl PostReportsPayload {
+ pub fn into_vec(self) -> Vec<ReportInput> {
+ match self {
+ Self::Single(report) => vec![report],
+ Self::Multiple(reports) => reports,
+ }
+ }
+}
+
+#[post("/reports")]
+async fn post_reports(
+ payload: web::Json<PostReportsPayload>,
+ service: ReportsService,
+) -> actix_web::Result<HttpResponse> {
+ use serde_json::json;
+
+ let payload = payload.into_inner();
+ let ids = service.save_reports(payload.into_vec()).await?;
+ let response = HttpResponse::Created().json(json!({ "reportIDs": ids }));
+ Ok(response)
+}
diff --git a/services/reports/src/http.rs b/services/reports/src/http/mod.rs
rename from services/reports/src/http.rs
rename to services/reports/src/http/mod.rs
--- a/services/reports/src/http.rs
+++ b/services/reports/src/http/mod.rs
@@ -11,6 +11,8 @@
use crate::constants::REQUEST_BODY_JSON_SIZE_LIMIT;
use crate::service::{ReportsService, ReportsServiceError};
+mod handlers;
+
pub async fn run_http_server(service: ReportsService) -> Result<()> {
use actix_web::middleware::{Logger, NormalizePath};
use comm_services_lib::http::cors_config;
@@ -32,6 +34,7 @@
.wrap(cors_config(CONFIG.is_dev()))
// Health endpoint for load balancers checks
.route("/health", web::get().to(HttpResponse::Ok))
+ .service(handlers::post_reports)
})
.bind(("0.0.0.0", CONFIG.http_port))?
.run()
diff --git a/services/reports/src/report_types.rs b/services/reports/src/report_types.rs
--- a/services/reports/src/report_types.rs
+++ b/services/reports/src/report_types.rs
@@ -9,7 +9,7 @@
use serde_repr::Deserialize_repr;
#[derive(Clone, Debug, Deref, Serialize, Into)]
-#[repr(transparent)]
+#[serde(transparent)]
pub struct ReportID(String);
impl Default for ReportID {
fn default() -> Self {
@@ -89,9 +89,12 @@
{
let mut this = Self::deserialize(deserializer)?;
if this.time.is_none() {
- if !matches!(this.report_type, ReportType::ThreadInconsistency) {
+ if !matches!(
+ this.report_type,
+ ReportType::ThreadInconsistency | ReportType::ErrorReport
+ ) {
return Err(Error::custom(
- "The 'time' field is optional only for thread inconsistency reports",
+ "The 'time' field is optional only for thread inconsistency and error reports",
));
}
this.time = Some(Utc::now());
diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs
--- a/services/reports/src/service.rs
+++ b/services/reports/src/service.rs
@@ -6,8 +6,12 @@
};
use derive_more::{Display, Error, From};
use std::future::{ready, Ready};
+use tracing::error;
-use crate::database::client::DatabaseClient;
+use crate::{
+ database::{client::DatabaseClient, item::ReportItem},
+ report_types::{ReportID, ReportInput},
+};
#[derive(Debug, Display, Error, From)]
pub enum ReportsServiceError {
@@ -55,6 +59,41 @@
requesting_user_id: Some(user_id),
}
}
+
+ pub async fn save_reports(
+ &self,
+ reports: Vec<ReportInput>,
+ ) -> ServiceResult<Vec<ReportID>> {
+ let mut items = Vec::with_capacity(reports.len());
+ let mut tasks = tokio::task::JoinSet::new();
+
+ // 1. Concurrently upload reports to blob service if needed
+ for input in reports {
+ let blob_client = self.blob_client.clone();
+ let user_id = self.requesting_user_id.clone();
+ tasks.spawn(async move {
+ let mut item = ReportItem::from_input(input, user_id)
+ .map_err(ReportsServiceError::SerdeError)?;
+ item.ensure_size_constraints(&blob_client).await?;
+ Ok(item)
+ });
+ }
+
+ // 2. Wait for all uploads to complete and collect results
+ // If any of them failed, abort
+ while let Some(task) = tasks.join_next().await {
+ let result: Result<_, ReportsServiceError> = task.map_err(|err| {
+ error!("Task failed to join: {err}");
+ ReportsServiceError::Unexpected
+ })?;
+ items.push(result?);
+ }
+
+ // 3. Store reports in database
+ let ids = items.iter().map(|item| item.id.clone()).collect();
+ self.db.save_reports(items).await?;
+ Ok(ids)
+ }
}
impl FromRequest for ReportsService {

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 23, 4:32 AM (18 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2568359
Default Alt Text
D8970.diff (4 KB)

Event Timeline