diff --git a/services/reports/src/constants.rs b/services/reports/src/constants.rs index 5b34c98bf..fdf75364e 100644 --- a/services/reports/src/constants.rs +++ b/services/reports/src/constants.rs @@ -1,2 +1,2 @@ -pub const REPORT_LIST_PAGE_SIZE: i32 = 20; +pub const REPORT_LIST_DEFAULT_PAGE_SIZE: u32 = 20; pub const REQUEST_BODY_JSON_SIZE_LIMIT: usize = 10 * 1024 * 1024; // 10MB diff --git a/services/reports/src/database/client.rs b/services/reports/src/database/client.rs index 72064a72b..27f70adce 100644 --- a/services/reports/src/database/client.rs +++ b/services/reports/src/database/client.rs @@ -1,75 +1,127 @@ +use aws_sdk_dynamodb::types::AttributeValue; use comm_services_lib::database::{ self, batch_operations::ExponentialBackoffConfig, }; +use crate::constants::REPORT_LIST_DEFAULT_PAGE_SIZE; use crate::report_types::ReportID; use super::constants::*; use super::item::ReportItem; #[derive(serde::Serialize)] pub struct ReportsPage { pub reports: Vec, /// Report ID that can be used as a cursor to retrieve the next page #[serde(rename(serialize = "nextPage"))] pub last_evaluated_report: Option, } #[derive(Clone)] pub struct DatabaseClient { ddb: aws_sdk_dynamodb::Client, } impl DatabaseClient { pub fn new(aws_config: &aws_config::SdkConfig) -> Self { DatabaseClient { ddb: aws_sdk_dynamodb::Client::new(aws_config), } } /// Gets a single [`ReportItem`] given its [`ReportID`] pub async fn get_report( &self, report_id: &ReportID, ) -> Result, database::Error> { let response = self .ddb .get_item() .table_name(TABLE_NAME) .key(ATTR_REPORT_ID, report_id.into()) .send() .await .map_err(|err| database::Error::AwsSdk(err.into()))?; response .item .map(ReportItem::try_from) .transpose() .map_err(database::Error::from) } + /// Performs a scan operation to get reports, returns 20 items and a cursor + /// that can be used to get next 20 items + pub async fn scan_reports( + &self, + cusror: Option, + page_size: Option, + ) -> Result { + let query = self + .ddb + .scan() + .table_name(TABLE_NAME) + .limit(page_size.unwrap_or(REPORT_LIST_DEFAULT_PAGE_SIZE) as i32); + + let request = if let Some(last_evaluated_item) = cusror { + query.exclusive_start_key( + ATTR_REPORT_ID, + AttributeValue::S(last_evaluated_item), + ) + } else { + query + }; + + let output = request + .send() + .await + .map_err(|err| database::Error::AwsSdk(err.into()))?; + + let last_evaluated_report = output + .last_evaluated_key + .map(|mut attrs| ReportID::try_from(attrs.remove(ATTR_REPORT_ID))) + .transpose()?; + + let Some(items) = output.items else { + return Ok(ReportsPage { + reports: Vec::new(), + last_evaluated_report, + }); + }; + + let reports = items + .into_iter() + .map(ReportItem::try_from) + .collect::, _>>()?; + + Ok(ReportsPage { + reports, + last_evaluated_report, + }) + } + /// Saves multiple reports to DB in batch pub async fn save_reports( &self, reports: impl IntoIterator, ) -> Result<(), database::Error> { use aws_sdk_dynamodb::types::{PutRequest, WriteRequest}; let requests = reports .into_iter() .map(|item| { let attrs = item.into_attrs(); let put_request = PutRequest::builder().set_item(Some(attrs)).build(); WriteRequest::builder().put_request(put_request).build() }) .collect::>(); database::batch_operations::batch_write( &self.ddb, TABLE_NAME, requests, ExponentialBackoffConfig::default(), ) .await } } diff --git a/services/reports/src/http/handlers.rs b/services/reports/src/http/handlers.rs index 65895c175..3865e48a8 100644 --- a/services/reports/src/http/handlers.rs +++ b/services/reports/src/http/handlers.rs @@ -1,72 +1,91 @@ use actix_web::{get, post, web, HttpResponse}; use http::header; use serde::Deserialize; use super::NotFoundHandler; use crate::report_types::ReportInput; use crate::service::ReportsService; /// POST endpoint accepts either a single report Object /// or an array of reports #[derive(Debug, Deserialize)] #[serde(untagged)] enum PostReportsPayload { Single(ReportInput), Multiple(Vec), } impl PostReportsPayload { pub fn into_vec(self) -> Vec { match self { Self::Single(report) => vec![report], Self::Multiple(reports) => reports, } } } #[post("")] async fn post_reports( payload: web::Json, service: ReportsService, ) -> actix_web::Result { use serde_json::json; let payload = payload.into_inner(); let ids = service.save_reports(payload.into_vec()).await?; let response = HttpResponse::Created().json(json!({ "reportIDs": ids })); Ok(response) } +#[derive(Debug, Deserialize)] +struct QueryOptions { + cursor: Option, + page_size: Option, + // there can be more options here in the future + // e.g. filter by platform, report type, user, etc. +} + +#[get("")] +async fn query_reports( + query: web::Query, + service: ReportsService, +) -> actix_web::Result { + let QueryOptions { cursor, page_size } = query.into_inner(); + let page = service.list_reports(cursor, page_size).await?; + let response = HttpResponse::Ok().json(page); + Ok(response) +} + #[get("/{report_id}")] async fn get_single_report( path: web::Path, service: ReportsService, ) -> actix_web::Result { let report_id = path.into_inner(); let report = service .get_report(report_id.into()) .await? .unwrap_or_404()?; let response = HttpResponse::Ok().json(report); Ok(response) } #[get("/{report_id}/redux-devtools.json")] async fn redux_devtools_import( path: web::Path, service: ReportsService, ) -> actix_web::Result { let report_id = path.into_inner(); let devtools_json = service .get_redux_devtools_import(report_id.clone().into()) .await? .unwrap_or_404()?; let response = HttpResponse::Ok() .insert_header(( header::CONTENT_DISPOSITION, format!("attachment; filename=report-{}.json", report_id), )) .json(devtools_json); Ok(response) } diff --git a/services/reports/src/http/mod.rs b/services/reports/src/http/mod.rs index fc2a0433d..dce8d847b 100644 --- a/services/reports/src/http/mod.rs +++ b/services/reports/src/http/mod.rs @@ -1,118 +1,119 @@ use actix_web::error::{ ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, ErrorServiceUnavailable, ErrorUnsupportedMediaType, }; use actix_web::{web, App, HttpResponse, HttpServer, ResponseError}; use anyhow::Result; use http::StatusCode; use tracing::{debug, error, info, trace, warn}; use crate::config::CONFIG; use crate::constants::REQUEST_BODY_JSON_SIZE_LIMIT; use crate::service::{ReportsService, ReportsServiceError}; mod handlers; pub async fn run_http_server(service: ReportsService) -> Result<()> { use actix_web::middleware::{Logger, NormalizePath}; use comm_services_lib::http::cors_config; use tracing_actix_web::TracingLogger; info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { let json_cfg = web::JsonConfig::default().limit(REQUEST_BODY_JSON_SIZE_LIMIT); App::new() .app_data(json_cfg) .app_data(service.to_owned()) .wrap(Logger::default()) .wrap(TracingLogger::default()) .wrap(NormalizePath::trim()) .wrap(cors_config(CONFIG.is_dev())) // Health endpoint for load balancers checks .route("/health", web::get().to(HttpResponse::Ok)) .service( web::scope("/reports") .service(handlers::post_reports) + .service(handlers::query_reports) .service(handlers::get_single_report) .service(handlers::redux_devtools_import), ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } fn handle_reports_service_error(err: &ReportsServiceError) -> actix_web::Error { use aws_sdk_dynamodb::Error as DynamoDBError; use comm_services_lib::database::Error as DBError; trace!("Handling reports service error: {:?}", err); match err { ReportsServiceError::UnsupportedReportType => { ErrorUnsupportedMediaType("unsupported report type") } ReportsServiceError::SerdeError(err) => { error!("Serde error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::ParseError(err) => { debug!("Parse error: {0:?} - {0}", err); ErrorBadRequest("invalid input format") } ReportsServiceError::BlobError(err) => { error!("Blob Service error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::DatabaseError(db_err) => match db_err { // retriable errors DBError::MaxRetriesExceeded | DBError::AwsSdk( DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_), ) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } err => { error!("Unexpected database error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } }, #[allow(unreachable_patterns)] err => { error!("Received an unexpected error: {0:?} - {0}", err); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for ReportsServiceError { fn error_response(&self) -> HttpResponse { handle_reports_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_reports_service_error(self) .as_response_error() .status_code() } } trait NotFoundHandler { /// Returns `Ok(T)` if `self` is `Some(T)`, /// otherwise returns a `404 Not Found` error. fn unwrap_or_404(self) -> actix_web::Result; } impl NotFoundHandler for Option { fn unwrap_or_404(self) -> actix_web::Result { self.ok_or_else(|| ErrorNotFound("not found")) } } diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs index de421197f..17f2d46e6 100644 --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -1,207 +1,219 @@ use actix_web::FromRequest; use comm_services_lib::{ auth::UserIdentity, blob::client::{BlobServiceClient, BlobServiceError}, database, }; use derive_more::{Display, Error, From}; use std::{ collections::HashMap, future::{ready, Ready}, }; use tracing::error; use crate::{ - database::{client::DatabaseClient, item::ReportItem}, + database::{ + client::{DatabaseClient, ReportsPage}, + item::ReportItem, + }, report_types::{ReportID, ReportInput, ReportOutput, ReportType}, }; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { DatabaseError(database::Error), BlobError(BlobServiceError), /// Error during parsing user input /// Usually this indicates user error #[from(ignore)] ParseError(serde_json::Error), /// Error during serializing/deserializing internal data /// This is usually a service bug / data inconsistency #[from(ignore)] SerdeError(serde_json::Error), /// Unsupported report type /// Returned when trying to perform an operation on an incompatible report type /// e.g. create a Redux Devtools import from a media mission report UnsupportedReportType, /// Unexpected error Unexpected, } type ServiceResult = Result; #[derive(Clone)] pub struct ReportsService { db: DatabaseClient, blob_client: BlobServiceClient, requesting_user_id: Option, } impl ReportsService { pub fn new(db: DatabaseClient, blob_client: BlobServiceClient) -> Self { Self { db, blob_client, requesting_user_id: None, } } pub fn authenticated(&self, user: UserIdentity) -> Self { let user_id = user.user_id.to_string(); Self { db: self.db.clone(), blob_client: self.blob_client.with_user_identity(user), requesting_user_id: Some(user_id), } } pub async fn save_reports( &self, reports: Vec, ) -> ServiceResult> { let mut items = Vec::with_capacity(reports.len()); let mut tasks = tokio::task::JoinSet::new(); // 1. Concurrently upload reports to blob service if needed for input in reports { let blob_client = self.blob_client.clone(); let user_id = self.requesting_user_id.clone(); tasks.spawn(async move { let mut item = ReportItem::from_input(input, user_id) .map_err(ReportsServiceError::SerdeError)?; item.ensure_size_constraints(&blob_client).await?; Ok(item) }); } // 2. Wait for all uploads to complete and collect results // If any of them failed, abort while let Some(task) = tasks.join_next().await { let result: Result<_, ReportsServiceError> = task.map_err(|err| { error!("Task failed to join: {err}"); ReportsServiceError::Unexpected })?; items.push(result?); } // 3. Store reports in database let ids = items.iter().map(|item| item.id.clone()).collect(); self.db.save_reports(items).await?; Ok(ids) } pub async fn get_report( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report_item) = self.db.get_report(&report_id).await? else { return Ok(None); }; let ReportItem { user_id, report_type, platform, creation_time, content, .. } = report_item; let report_data = content.fetch_bytes(&self.blob_client).await?; let report_json = serde_json::from_slice(report_data.as_slice()) .map_err(ReportsServiceError::SerdeError)?; let output = ReportOutput { id: report_id, user_id, platform, report_type, creation_time, content: report_json, }; Ok(Some(output)) } pub async fn get_redux_devtools_import( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report) = self.get_report(report_id).await? else { return Ok(None); }; if !matches!(report.report_type, ReportType::ErrorReport) { return Err(ReportsServiceError::UnsupportedReportType); }; let redux_devtools_payload = prepare_redux_devtools_import(report.content) .map_err(ReportsServiceError::SerdeError)?; Ok(Some(redux_devtools_payload)) } + + pub async fn list_reports( + &self, + cursor: Option, + page_size: Option, + ) -> ServiceResult { + let page = self.db.scan_reports(cursor, page_size).await?; + Ok(page) + } } impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Ready>; #[inline] fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::HttpMessage; let Some(service) = req.app_data::() else { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); return ready(Err(actix_web::error::ErrorInternalServerError("Internal server error"))); }; let auth_service = if let Some(user_identity) = req.extensions().get::() { tracing::trace!("Found user identity. Creating authenticated service"); service.authenticated(user_identity.clone()) } else { tracing::trace!( "No user identity found. Leaving unauthenticated service" ); service.clone() }; ready(Ok(auth_service)) } } /// Transforms report content JSON into format that can be /// imported into Redux DevTools. fn prepare_redux_devtools_import( mut error_report: HashMap, ) -> Result { use serde_json::{json, map::Map, Value}; let nav_state = error_report.remove("navState"); let actions = error_report.remove("actions"); let mut preloaded_state = error_report .remove("preloadedState") .unwrap_or_else(|| Value::Object(Map::new())); preloaded_state["navState"] = nav_state.into(); preloaded_state["frozen"] = true.into(); preloaded_state["_persist"]["rehydrated"] = false.into(); let preload_state_str = serde_json::to_string(&preloaded_state)?; let payload_str = serde_json::to_string(&actions)?; Ok(json!({ "preloadedState": preload_state_str, "payload": payload_str, })) }