diff --git a/services/reports/src/database/item.rs b/services/reports/src/database/item.rs index f93627747..6aa516fed 100644 --- a/services/reports/src/database/item.rs +++ b/services/reports/src/database/item.rs @@ -1,324 +1,337 @@ use aws_sdk_dynamodb::{primitives::Blob, types::AttributeValue}; use chrono::{DateTime, Utc}; use comm_services_lib::{ blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, constants::DDB_ITEM_SIZE_LIMIT, database::{ self, AttributeExtractor, AttributeMap, DBItemError, TryFromAttribute, }, }; use num_traits::FromPrimitive; use tracing::debug; use super::constants::*; use crate::report_types::*; /// Represents a report item row in DynamoDB /// This is serializable to display a list of reports #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct ReportItem { pub id: ReportID, #[serde(rename = "userID")] pub user_id: String, pub report_type: ReportType, pub platform: ReportPlatform, pub creation_time: DateTime, #[serde(skip_serializing)] pub content: ReportContent, #[serde(skip_serializing)] pub encryption_key: Option, } /// contains some redundancy as not all keys are always present static REPORT_ITEM_KEYS_SIZE: usize = { let mut size: usize = 0; size += ATTR_REPORT_ID.as_bytes().len(); size += ATTR_REPORT_TYPE.as_bytes().len(); size += ATTR_USER_ID.as_bytes().len(); size += ATTR_PLATFORM.as_bytes().len(); size += ATTR_CREATION_TIME.as_bytes().len(); size += ATTR_ENCRYPTION_KEY.as_bytes().len(); size += ATTR_BLOB_INFO.as_bytes().len(); size += ATTR_REPORT_CONTENT.as_bytes().len(); size }; impl ReportItem { pub fn into_attrs(self) -> AttributeMap { let creation_time = self .creation_time .to_rfc3339_opts(chrono::SecondsFormat::Millis, true); let mut attrs = AttributeMap::from([ (ATTR_REPORT_ID.to_string(), self.id.into()), (ATTR_USER_ID.to_string(), AttributeValue::S(self.user_id)), (ATTR_REPORT_TYPE.to_string(), self.report_type.into()), (ATTR_PLATFORM.to_string(), self.platform.into()), ( ATTR_CREATION_TIME.to_string(), AttributeValue::S(creation_time), ), ]); let (content_attr_name, content_attr) = self.content.into_attr_pair(); attrs.insert(content_attr_name, content_attr); if let Some(key) = self.encryption_key { attrs.insert(ATTR_ENCRYPTION_KEY.to_string(), AttributeValue::S(key)); } attrs } pub async fn ensure_size_constraints( &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { if self.total_size() < DDB_ITEM_SIZE_LIMIT { return Ok(()); }; self.content.move_to_blob(blob_client).await } fn total_size(&self) -> usize { let mut size = REPORT_ITEM_KEYS_SIZE; size += self.id.as_bytes().len(); size += self.user_id.as_bytes().len(); size += self.platform.to_string().as_bytes().len(); size += (self.report_type as u8).to_string().as_bytes().len(); size += match &self.content { ReportContent::Database(data) => data.len(), ReportContent::Blob(info) => { let mut blob_size = 0; blob_size += "holder".as_bytes().len(); blob_size += "blob_hash".as_bytes().len(); blob_size += info.holder.as_bytes().len(); blob_size += info.blob_hash.as_bytes().len(); blob_size } }; if let Some(key) = self.encryption_key.as_ref() { size += key.as_bytes().len(); } size } /// Creates a report item from a report input payload /// /// WARN: Note that this method stores content as [`ReportStorage::Database`] /// regardless of its size. Use [`ensure_size_constraints`] to move content to /// blob storage if necessary. pub fn from_input( payload: ReportInput, user_id: Option, ) -> Result { let ReportInput { platform_details, report_type, time, mut report_content, } = payload; let platform = platform_details.platform.clone(); // Add "platformDetails" back to report content let platform_details_value = serde_json::to_value(platform_details)?; report_content .insert("platformDetails".to_string(), platform_details_value); // serialize report JSON to bytes let content_bytes = serde_json::to_vec(&report_content)?; let content = ReportContent::Database(content_bytes); Ok(ReportItem { id: ReportID::default(), user_id: user_id.unwrap_or("[null]".to_string()), platform, report_type, creation_time: time.unwrap_or_else(Utc::now), encryption_key: None, content, }) } } impl TryFrom for ReportItem { type Error = DBItemError; fn try_from(mut row: AttributeMap) -> Result { let id = row.remove(ATTR_REPORT_ID).try_into()?; let user_id = row.take_attr(ATTR_USER_ID)?; let report_type = row.take_attr(ATTR_REPORT_TYPE)?; let platform = row.take_attr(ATTR_PLATFORM)?; let creation_time = row.take_attr(ATTR_CREATION_TIME)?; let content = ReportContent::parse_from_attrs(&mut row)?; let encryption_key = row .remove(ATTR_ENCRYPTION_KEY) .map(|attr| String::try_from_attr(ATTR_ENCRYPTION_KEY, Some(attr))) .transpose()?; Ok(ReportItem { id, user_id, report_type, platform, content, encryption_key, creation_time, }) } } /// Represents the content of a report item stored in DynamoDB #[derive(Clone, Debug)] pub enum ReportContent { Blob(BlobInfo), Database(Vec), } impl ReportContent { /// Returns a tuple of attribute name and value for this content fn into_attr_pair(self) -> (String, AttributeValue) { match self { Self::Blob(blob_info) => (ATTR_BLOB_INFO.to_string(), blob_info.into()), Self::Database(data) => ( ATTR_REPORT_CONTENT.to_string(), AttributeValue::B(Blob::new(data)), ), } } fn parse_from_attrs(attrs: &mut AttributeMap) -> Result { if let Some(blob_info_attr) = attrs.remove(ATTR_BLOB_INFO) { let blob_info = BlobInfo::try_from_attr(ATTR_BLOB_INFO, Some(blob_info_attr))?; return Ok(ReportContent::Blob(blob_info)); } let content_data = attrs.take_attr(ATTR_REPORT_CONTENT)?; Ok(ReportContent::Database(content_data)) } /// Moves report content to blob storage: /// - Switches `self` from [`ReportStorage::Database`] to [`ReportStorage::Blob`] /// - No-op for [`ReportStorage::Blob`] async fn move_to_blob( &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { todo!() } + + /// Fetches report content bytes + pub async fn fetch_bytes( + self, + blob_client: &BlobServiceClient, + ) -> Result, BlobServiceError> { + match self { + ReportContent::Database(data) => Ok(data), + ReportContent::Blob(BlobInfo { blob_hash, .. }) => { + todo!() + } + } + } } // DB conversions for report types // ReportID impl From for AttributeValue { fn from(value: ReportID) -> Self { AttributeValue::S(value.into()) } } impl From<&ReportID> for AttributeValue { fn from(value: &ReportID) -> Self { AttributeValue::S(value.clone().into()) } } impl TryFrom> for ReportID { type Error = database::DBItemError; fn try_from(value: Option) -> Result { let raw = String::try_from_attr(ATTR_REPORT_ID, value)?; Ok(ReportID::from(raw)) } } // ReportType impl From for AttributeValue { fn from(value: ReportType) -> Self { let num = value as u8; AttributeValue::N(num.to_string()) } } impl TryFromAttribute for ReportType { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name = attribute_name.into(); let num: u8 = database::parse_int_attribute(&attr_name, attribute)?; ::from_u8(num).ok_or_else(|| { database::DBItemError::new( attr_name, database::Value::String(num.to_string()), database::DBItemAttributeError::IncorrectType, ) }) } } // ReportPlatform impl From for AttributeValue { fn from(value: ReportPlatform) -> Self { let raw = value.to_string().to_lowercase(); AttributeValue::S(raw) } } impl TryFromAttribute for ReportPlatform { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name = attribute_name.into(); let raw = String::try_from_attr(&attr_name, attribute)?; // serde_json understands only quoted strings let quoted = format!("\"{raw}\""); serde_json::from_str("ed).map_err(|err| { debug!("Failed to deserialize ReportPlatform: {}", err); DBItemError::new( attr_name, database::Value::String(raw), database::DBItemAttributeError::IncorrectType, ) }) } } #[cfg(test)] mod tests { use comm_services_lib::database::AttributeTryInto; use super::*; #[test] fn test_platform_conversions() -> anyhow::Result<()> { let platform = ReportPlatform::MacOS; let attribute: AttributeValue = platform.into(); assert_eq!(attribute, AttributeValue::S("macos".to_string())); let converted_back: ReportPlatform = Some(attribute).attr_try_into("foo")?; assert!(matches!(converted_back, ReportPlatform::MacOS)); Ok(()) } #[test] fn test_type_conversions() -> anyhow::Result<()> { let report_type = ReportType::MediaMission; let numeric_type = (report_type as u8).to_string(); let attr: AttributeValue = report_type.into(); assert_eq!(attr, AttributeValue::N(numeric_type.to_string())); let converted_back: ReportType = Some(attr).attr_try_into("foo")?; assert!(matches!(converted_back, ReportType::MediaMission)); Ok(()) } } diff --git a/services/reports/src/http/handlers.rs b/services/reports/src/http/handlers.rs index 63020b909..5f55f700d 100644 --- a/services/reports/src/http/handlers.rs +++ b/services/reports/src/http/handlers.rs @@ -1,35 +1,51 @@ -use actix_web::{post, web, HttpResponse}; +use actix_web::{get, post, web, HttpResponse}; use serde::Deserialize; +use super::NotFoundHandler; + use crate::report_types::ReportInput; use crate::service::ReportsService; /// POST endpoint accepts either a single report Object /// or an array of reports #[derive(Debug, Deserialize)] #[serde(untagged)] enum PostReportsPayload { Single(ReportInput), Multiple(Vec), } impl PostReportsPayload { pub fn into_vec(self) -> Vec { match self { Self::Single(report) => vec![report], Self::Multiple(reports) => reports, } } } -#[post("/reports")] +#[post("")] async fn post_reports( payload: web::Json, service: ReportsService, ) -> actix_web::Result { use serde_json::json; let payload = payload.into_inner(); let ids = service.save_reports(payload.into_vec()).await?; let response = HttpResponse::Created().json(json!({ "reportIDs": ids })); Ok(response) } + +#[get("/{report_id}")] +async fn get_single_report( + path: web::Path, + service: ReportsService, +) -> actix_web::Result { + let report_id = path.into_inner(); + let report = service + .get_report(report_id.into()) + .await? + .unwrap_or_404()?; + let response = HttpResponse::Ok().json(report); + Ok(response) +} diff --git a/services/reports/src/http/mod.rs b/services/reports/src/http/mod.rs index b0e0ec7d5..75e5046ea 100644 --- a/services/reports/src/http/mod.rs +++ b/services/reports/src/http/mod.rs @@ -1,102 +1,117 @@ use actix_web::error::{ - ErrorBadRequest, ErrorInternalServerError, ErrorServiceUnavailable, - ErrorUnsupportedMediaType, + ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, + ErrorServiceUnavailable, ErrorUnsupportedMediaType, }; use actix_web::{web, App, HttpResponse, HttpServer, ResponseError}; use anyhow::Result; use http::StatusCode; use tracing::{debug, error, info, trace, warn}; use crate::config::CONFIG; use crate::constants::REQUEST_BODY_JSON_SIZE_LIMIT; use crate::service::{ReportsService, ReportsServiceError}; mod handlers; pub async fn run_http_server(service: ReportsService) -> Result<()> { use actix_web::middleware::{Logger, NormalizePath}; use comm_services_lib::http::cors_config; use tracing_actix_web::TracingLogger; info!( "Starting HTTP server listening at port {}", CONFIG.http_port ); HttpServer::new(move || { let json_cfg = web::JsonConfig::default().limit(REQUEST_BODY_JSON_SIZE_LIMIT); App::new() .app_data(json_cfg) .app_data(service.to_owned()) .wrap(Logger::default()) .wrap(TracingLogger::default()) .wrap(NormalizePath::trim()) .wrap(cors_config(CONFIG.is_dev())) // Health endpoint for load balancers checks .route("/health", web::get().to(HttpResponse::Ok)) - .service(handlers::post_reports) + .service( + web::scope("/reports") + .service(handlers::post_reports) + .service(handlers::get_single_report), + ) }) .bind(("0.0.0.0", CONFIG.http_port))? .run() .await?; Ok(()) } fn handle_reports_service_error(err: &ReportsServiceError) -> actix_web::Error { use aws_sdk_dynamodb::Error as DynamoDBError; use comm_services_lib::database::Error as DBError; trace!("Handling reports service error: {:?}", err); match err { ReportsServiceError::UnsupportedReportType => { ErrorUnsupportedMediaType("unsupported report type") } ReportsServiceError::SerdeError(err) => { error!("Serde error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::ParseError(err) => { debug!("Parse error: {0:?} - {0}", err); ErrorBadRequest("invalid input format") } ReportsServiceError::BlobError(err) => { error!("Blob Service error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } ReportsServiceError::DatabaseError(db_err) => match db_err { // retriable errors DBError::MaxRetriesExceeded | DBError::AwsSdk( DynamoDBError::InternalServerError(_) | DynamoDBError::ProvisionedThroughputExceededException(_) | DynamoDBError::RequestLimitExceeded(_), ) => { warn!("AWS transient error occurred"); ErrorServiceUnavailable("please retry") } err => { error!("Unexpected database error: {0:?} - {0}", err); ErrorInternalServerError("internal error") } }, #[allow(unreachable_patterns)] err => { error!("Received an unexpected error: {0:?} - {0}", err); ErrorInternalServerError("server error") } } } /// This allow us to `await?` blob service calls in HTTP handlers impl ResponseError for ReportsServiceError { fn error_response(&self) -> HttpResponse { handle_reports_service_error(self).error_response() } fn status_code(&self) -> StatusCode { handle_reports_service_error(self) .as_response_error() .status_code() } } + +trait NotFoundHandler { + /// Returns `Ok(T)` if `self` is `Some(T)`, + /// otherwise returns a `404 Not Found` error. + fn unwrap_or_404(self) -> actix_web::Result; +} +impl NotFoundHandler for Option { + fn unwrap_or_404(self) -> actix_web::Result { + self.ok_or_else(|| ErrorNotFound("not found")) + } +} diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs index cd92dc279..0fc59b24e 100644 --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -1,131 +1,162 @@ use actix_web::FromRequest; use comm_services_lib::{ auth::UserIdentity, blob::client::{BlobServiceClient, BlobServiceError}, database, }; use derive_more::{Display, Error, From}; use std::future::{ready, Ready}; use tracing::error; use crate::{ database::{client::DatabaseClient, item::ReportItem}, - report_types::{ReportID, ReportInput}, + report_types::{ReportID, ReportInput, ReportOutput}, }; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { DatabaseError(database::Error), BlobError(BlobServiceError), /// Error during parsing user input /// Usually this indicates user error #[from(ignore)] ParseError(serde_json::Error), /// Error during serializing/deserializing internal data /// This is usually a service bug / data inconsistency #[from(ignore)] SerdeError(serde_json::Error), /// Unsupported report type /// Returned when trying to perform an operation on an incompatible report type /// e.g. create a Redux Devtools import from a media mission report UnsupportedReportType, /// Unexpected error Unexpected, } type ServiceResult = Result; #[derive(Clone)] pub struct ReportsService { db: DatabaseClient, blob_client: BlobServiceClient, requesting_user_id: Option, } impl ReportsService { pub fn new(db: DatabaseClient, blob_client: BlobServiceClient) -> Self { Self { db, blob_client, requesting_user_id: None, } } pub fn authenticated(&self, user: UserIdentity) -> Self { let user_id = user.user_id.to_string(); Self { db: self.db.clone(), blob_client: self.blob_client.with_user_identity(user), requesting_user_id: Some(user_id), } } pub async fn save_reports( &self, reports: Vec, ) -> ServiceResult> { let mut items = Vec::with_capacity(reports.len()); let mut tasks = tokio::task::JoinSet::new(); // 1. Concurrently upload reports to blob service if needed for input in reports { let blob_client = self.blob_client.clone(); let user_id = self.requesting_user_id.clone(); tasks.spawn(async move { let mut item = ReportItem::from_input(input, user_id) .map_err(ReportsServiceError::SerdeError)?; item.ensure_size_constraints(&blob_client).await?; Ok(item) }); } // 2. Wait for all uploads to complete and collect results // If any of them failed, abort while let Some(task) = tasks.join_next().await { let result: Result<_, ReportsServiceError> = task.map_err(|err| { error!("Task failed to join: {err}"); ReportsServiceError::Unexpected })?; items.push(result?); } // 3. Store reports in database let ids = items.iter().map(|item| item.id.clone()).collect(); self.db.save_reports(items).await?; Ok(ids) } + + pub async fn get_report( + &self, + report_id: ReportID, + ) -> ServiceResult> { + let Some(report_item) = self.db.get_report(&report_id).await? else { + return Ok(None); + }; + let ReportItem { + user_id, + report_type, + platform, + creation_time, + content, + .. + } = report_item; + + let report_data = content.fetch_bytes(&self.blob_client).await?; + let report_json = serde_json::from_slice(report_data.as_slice()) + .map_err(ReportsServiceError::SerdeError)?; + + let output = ReportOutput { + id: report_id, + user_id, + platform, + report_type, + creation_time, + content: report_json, + }; + Ok(Some(output)) + } } impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Ready>; #[inline] fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::HttpMessage; let Some(service) = req.app_data::() else { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); return ready(Err(actix_web::error::ErrorInternalServerError("Internal server error"))); }; let auth_service = if let Some(user_identity) = req.extensions().get::() { tracing::trace!("Found user identity. Creating authenticated service"); service.authenticated(user_identity.clone()) } else { tracing::trace!( "No user identity found. Leaving unauthenticated service" ); service.clone() }; ready(Ok(auth_service)) } }