diff --git a/services/reports/src/database/item.rs b/services/reports/src/database/item.rs index 6d8527fc6..fdcde749f 100644 --- a/services/reports/src/database/item.rs +++ b/services/reports/src/database/item.rs @@ -1,365 +1,327 @@ use aws_sdk_dynamodb::{primitives::Blob, types::AttributeValue}; use chrono::{DateTime, Utc}; use comm_services_lib::{ blob::{ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, bytes::Bytes, constants::DDB_ITEM_SIZE_LIMIT, database::{ self, AttributeExtractor, AttributeMap, DBItemError, TryFromAttribute, }, }; use hex::ToHex; use num_traits::FromPrimitive; use sha2::{Digest, Sha256}; use tokio_stream::StreamExt; use tracing::debug; use super::constants::*; use crate::report_types::*; /// Represents a report item row in DynamoDB /// This is serializable to display a list of reports #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct ReportItem { pub id: ReportID, #[serde(rename = "userID")] pub user_id: String, pub report_type: ReportType, pub platform: ReportPlatform, pub creation_time: DateTime, #[serde(skip_serializing)] pub content: ReportContent, #[serde(skip_serializing)] pub encryption_key: Option, } /// contains some redundancy as not all keys are always present static REPORT_ITEM_KEYS_SIZE: usize = { let mut size: usize = 0; size += ATTR_REPORT_ID.as_bytes().len(); size += ATTR_REPORT_TYPE.as_bytes().len(); size += ATTR_USER_ID.as_bytes().len(); size += ATTR_PLATFORM.as_bytes().len(); size += ATTR_CREATION_TIME.as_bytes().len(); size += ATTR_ENCRYPTION_KEY.as_bytes().len(); size += ATTR_BLOB_INFO.as_bytes().len(); size += ATTR_REPORT_CONTENT.as_bytes().len(); size }; impl ReportItem { pub fn into_attrs(self) -> AttributeMap { let creation_time = self .creation_time .to_rfc3339_opts(chrono::SecondsFormat::Millis, true); let mut attrs = AttributeMap::from([ (ATTR_REPORT_ID.to_string(), self.id.into()), (ATTR_USER_ID.to_string(), AttributeValue::S(self.user_id)), (ATTR_REPORT_TYPE.to_string(), self.report_type.into()), (ATTR_PLATFORM.to_string(), self.platform.into()), ( ATTR_CREATION_TIME.to_string(), AttributeValue::S(creation_time), ), ]); let (content_attr_name, content_attr) = self.content.into_attr_pair(); attrs.insert(content_attr_name, content_attr); if let Some(key) = self.encryption_key { attrs.insert(ATTR_ENCRYPTION_KEY.to_string(), AttributeValue::S(key)); } attrs } pub async fn ensure_size_constraints( &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { if self.total_size() < DDB_ITEM_SIZE_LIMIT { return Ok(()); }; debug!( report_id = ?self.id, "Report content exceeds DDB item size limit, moving to blob storage" ); self.content.move_to_blob(blob_client).await } fn total_size(&self) -> usize { let mut size = REPORT_ITEM_KEYS_SIZE; size += self.id.as_bytes().len(); size += self.user_id.as_bytes().len(); size += self.platform.to_string().as_bytes().len(); size += (self.report_type as u8).to_string().as_bytes().len(); size += match &self.content { ReportContent::Database(data) => data.len(), ReportContent::Blob(info) => { let mut blob_size = 0; blob_size += "holder".as_bytes().len(); blob_size += "blob_hash".as_bytes().len(); blob_size += info.holder.as_bytes().len(); blob_size += info.blob_hash.as_bytes().len(); blob_size } }; if let Some(key) = self.encryption_key.as_ref() { size += key.as_bytes().len(); } size } - - /// Creates a report item from a report input payload - /// - /// WARN: Note that this method stores content as [`ReportStorage::Database`] - /// regardless of its size. Use [`ensure_size_constraints`] to move content to - /// blob storage if necessary. - pub fn from_input( - payload: ReportInput, - user_id: Option, - ) -> Result { - let ReportInput { - platform_details, - report_type, - time, - mut report_content, - } = payload; - - let platform = platform_details.platform.clone(); - - // Add "platformDetails" back to report content - let platform_details_value = serde_json::to_value(platform_details)?; - report_content - .insert("platformDetails".to_string(), platform_details_value); - - // serialize report JSON to bytes - let content_bytes = serde_json::to_vec(&report_content)?; - let content = ReportContent::Database(content_bytes); - - Ok(ReportItem { - id: ReportID::default(), - user_id: user_id.unwrap_or("[null]".to_string()), - platform, - report_type, - creation_time: time.unwrap_or_else(Utc::now), - encryption_key: None, - content, - }) - } } impl TryFrom for ReportItem { type Error = DBItemError; fn try_from(mut row: AttributeMap) -> Result { let id = row.remove(ATTR_REPORT_ID).try_into()?; let user_id = row.take_attr(ATTR_USER_ID)?; let report_type = row.take_attr(ATTR_REPORT_TYPE)?; let platform = row.take_attr(ATTR_PLATFORM)?; let creation_time = row.take_attr(ATTR_CREATION_TIME)?; let content = ReportContent::parse_from_attrs(&mut row)?; let encryption_key = row .remove(ATTR_ENCRYPTION_KEY) .map(|attr| String::try_from_attr(ATTR_ENCRYPTION_KEY, Some(attr))) .transpose()?; Ok(ReportItem { id, user_id, report_type, platform, content, encryption_key, creation_time, }) } } /// Represents the content of a report item stored in DynamoDB #[derive(Clone, Debug)] pub enum ReportContent { Blob(BlobInfo), Database(Vec), } impl ReportContent { /// Returns a tuple of attribute name and value for this content fn into_attr_pair(self) -> (String, AttributeValue) { match self { Self::Blob(blob_info) => (ATTR_BLOB_INFO.to_string(), blob_info.into()), Self::Database(data) => ( ATTR_REPORT_CONTENT.to_string(), AttributeValue::B(Blob::new(data)), ), } } fn parse_from_attrs(attrs: &mut AttributeMap) -> Result { if let Some(blob_info_attr) = attrs.remove(ATTR_BLOB_INFO) { let blob_info = BlobInfo::try_from_attr(ATTR_BLOB_INFO, Some(blob_info_attr))?; return Ok(ReportContent::Blob(blob_info)); } let content_data = attrs.take_attr(ATTR_REPORT_CONTENT)?; Ok(ReportContent::Database(content_data)) } /// Moves report content to blob storage: /// - Switches `self` from [`ReportStorage::Database`] to [`ReportStorage::Blob`] /// - No-op for [`ReportStorage::Blob`] async fn move_to_blob( &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { let Self::Database(ref mut contents) = self else { return Ok(()); }; let data = std::mem::take(contents); let blob_hash: String = Sha256::digest(&data).encode_hex(); let holder = uuid::Uuid::new_v4().to_string(); // NOTE: We send the data as a single chunk. This shouldn't be a problem // unless we start receiving very large reports. In that case, we should // consider splitting the data into chunks and sending them as a stream. let data_stream = tokio_stream::once(Result::<_, std::io::Error>::Ok(data)); blob_client .simple_put(&blob_hash, &holder, data_stream) .await?; let new_blob_info = BlobInfo::new(blob_hash, holder); *self = Self::Blob(new_blob_info); Ok(()) } /// Fetches report content bytes pub async fn fetch_bytes( self, blob_client: &BlobServiceClient, ) -> Result, BlobServiceError> { match self { ReportContent::Database(data) => Ok(data), ReportContent::Blob(BlobInfo { blob_hash, .. }) => { let stream = blob_client.get(&blob_hash).await?; let chunks: Vec = stream.collect::>().await?; let data = chunks.into_iter().flatten().collect(); Ok(data) } } } } // DB conversions for report types // ReportID impl From for AttributeValue { fn from(value: ReportID) -> Self { AttributeValue::S(value.into()) } } impl From<&ReportID> for AttributeValue { fn from(value: &ReportID) -> Self { AttributeValue::S(value.clone().into()) } } impl TryFrom> for ReportID { type Error = database::DBItemError; fn try_from(value: Option) -> Result { let raw = String::try_from_attr(ATTR_REPORT_ID, value)?; Ok(ReportID::from(raw)) } } // ReportType impl From for AttributeValue { fn from(value: ReportType) -> Self { let num = value as u8; AttributeValue::N(num.to_string()) } } impl TryFromAttribute for ReportType { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name = attribute_name.into(); let num: u8 = database::parse_int_attribute(&attr_name, attribute)?; ::from_u8(num).ok_or_else(|| { database::DBItemError::new( attr_name, database::Value::String(num.to_string()), database::DBItemAttributeError::IncorrectType, ) }) } } // ReportPlatform impl From for AttributeValue { fn from(value: ReportPlatform) -> Self { let raw = value.to_string().to_lowercase(); AttributeValue::S(raw) } } impl TryFromAttribute for ReportPlatform { fn try_from_attr( attribute_name: impl Into, attribute: Option, ) -> Result { let attr_name = attribute_name.into(); let raw = String::try_from_attr(&attr_name, attribute)?; // serde_json understands only quoted strings let quoted = format!("\"{raw}\""); serde_json::from_str("ed).map_err(|err| { debug!("Failed to deserialize ReportPlatform: {}", err); DBItemError::new( attr_name, database::Value::String(raw), database::DBItemAttributeError::IncorrectType, ) }) } } #[cfg(test)] mod tests { use comm_services_lib::database::AttributeTryInto; use super::*; #[test] fn test_platform_conversions() -> anyhow::Result<()> { let platform = ReportPlatform::MacOS; let attribute: AttributeValue = platform.into(); assert_eq!(attribute, AttributeValue::S("macos".to_string())); let converted_back: ReportPlatform = Some(attribute).attr_try_into("foo")?; assert!(matches!(converted_back, ReportPlatform::MacOS)); Ok(()) } #[test] fn test_type_conversions() -> anyhow::Result<()> { let report_type = ReportType::MediaMission; let numeric_type = (report_type as u8).to_string(); let attr: AttributeValue = report_type.into(); assert_eq!(attr, AttributeValue::N(numeric_type.to_string())); let converted_back: ReportType = Some(attr).attr_try_into("foo")?; assert!(matches!(converted_back, ReportType::MediaMission)); Ok(()) } } diff --git a/services/reports/src/service.rs b/services/reports/src/service.rs index 34bb2e3c4..4d536463e 100644 --- a/services/reports/src/service.rs +++ b/services/reports/src/service.rs @@ -1,228 +1,281 @@ use actix_web::FromRequest; +use chrono::Utc; use comm_services_lib::{ auth::UserIdentity, blob::client::{BlobServiceClient, BlobServiceError}, database, }; use derive_more::{Display, Error, From}; use std::{ collections::HashMap, future::{ready, Ready}, sync::Arc, }; use tracing::error; use crate::{ database::{ client::{DatabaseClient, ReportsPage}, - item::ReportItem, + item::{ReportContent, ReportItem}, }, - email::config::EmailConfig, + email::{config::EmailConfig, ReportEmail}, report_types::{ReportID, ReportInput, ReportOutput, ReportType}, }; #[derive(Debug, Display, Error, From)] pub enum ReportsServiceError { DatabaseError(database::Error), BlobError(BlobServiceError), /// Error during parsing user input /// Usually this indicates user error #[from(ignore)] ParseError(serde_json::Error), /// Error during serializing/deserializing internal data /// This is usually a service bug / data inconsistency #[from(ignore)] SerdeError(serde_json::Error), /// Unsupported report type /// Returned when trying to perform an operation on an incompatible report type /// e.g. create a Redux Devtools import from a media mission report UnsupportedReportType, /// Unexpected error Unexpected, } type ServiceResult = Result; #[derive(Clone)] pub struct ReportsService { db: DatabaseClient, blob_client: BlobServiceClient, requesting_user_id: Option, email_config: Option>, } impl ReportsService { pub fn new( db: DatabaseClient, blob_client: BlobServiceClient, email_config: Option, ) -> Self { Self { db, blob_client, requesting_user_id: None, email_config: email_config.map(Arc::new), } } pub fn authenticated(&self, user: UserIdentity) -> Self { let user_id = user.user_id.to_string(); Self { db: self.db.clone(), email_config: self.email_config.clone(), blob_client: self.blob_client.with_user_identity(user), requesting_user_id: Some(user_id), } } pub async fn save_reports( &self, - reports: Vec, + inputs: Vec, ) -> ServiceResult> { - let mut items = Vec::with_capacity(reports.len()); + let mut reports = Vec::with_capacity(inputs.len()); let mut tasks = tokio::task::JoinSet::new(); - // 1. Concurrently upload reports to blob service if needed - for input in reports { + // 1. Concurrently prepare reports. Upload them to blob service if needed + for input in inputs { let blob_client = self.blob_client.clone(); let user_id = self.requesting_user_id.clone(); tasks.spawn(async move { - let mut item = ReportItem::from_input(input, user_id) + let mut report = process_report(input, user_id) .map_err(ReportsServiceError::SerdeError)?; - item.ensure_size_constraints(&blob_client).await?; - Ok(item) + report.db_item.ensure_size_constraints(&blob_client).await?; + Ok(report) }); } // 2. Wait for all uploads to complete and collect results // If any of them failed, abort while let Some(task) = tasks.join_next().await { let result: Result<_, ReportsServiceError> = task.map_err(|err| { error!("Task failed to join: {err}"); ReportsServiceError::Unexpected })?; - items.push(result?); + reports.push(result?); } - // 3. Store reports in database - let ids = items.iter().map(|item| item.id.clone()).collect(); - self.db.save_reports(items).await?; + let (ids, (db_items, emails)): (Vec<_>, (Vec<_>, Vec<_>)) = reports + .into_iter() + .map(|ProcessedReport { id, db_item, email }| (id, (db_item, email))) + .unzip(); + + // 3. Store the reports in database + self.db.save_reports(db_items).await?; + + // 4. Send e-mails asynchronously + tokio::spawn(async move { + if let Err(err) = crate::email::send_emails(emails).await { + error!("Failed to send e-mails: {err}"); + } + }); Ok(ids) } pub async fn get_report( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report_item) = self.db.get_report(&report_id).await? else { return Ok(None); }; let ReportItem { user_id, report_type, platform, creation_time, content, .. } = report_item; let report_data = content.fetch_bytes(&self.blob_client).await?; let report_json = serde_json::from_slice(report_data.as_slice()) .map_err(ReportsServiceError::SerdeError)?; let output = ReportOutput { id: report_id, user_id, platform, report_type, creation_time, content: report_json, }; Ok(Some(output)) } pub async fn get_redux_devtools_import( &self, report_id: ReportID, ) -> ServiceResult> { let Some(report) = self.get_report(report_id).await? else { return Ok(None); }; if !matches!(report.report_type, ReportType::ErrorReport) { return Err(ReportsServiceError::UnsupportedReportType); }; let redux_devtools_payload = prepare_redux_devtools_import(report.content) .map_err(ReportsServiceError::SerdeError)?; Ok(Some(redux_devtools_payload)) } pub async fn list_reports( &self, cursor: Option, page_size: Option, ) -> ServiceResult { let page = self.db.scan_reports(cursor, page_size).await?; Ok(page) } } impl FromRequest for ReportsService { type Error = actix_web::Error; type Future = Ready>; #[inline] fn from_request( req: &actix_web::HttpRequest, _payload: &mut actix_web::dev::Payload, ) -> Self::Future { use actix_web::HttpMessage; let Some(service) = req.app_data::() else { tracing::error!( "FATAL! Failed to extract ReportsService from actix app_data. \ Check HTTP server configuration" ); return ready(Err(actix_web::error::ErrorInternalServerError("Internal server error"))); }; let auth_service = if let Some(user_identity) = req.extensions().get::() { tracing::trace!("Found user identity. Creating authenticated service"); service.authenticated(user_identity.clone()) } else { tracing::trace!( "No user identity found. Leaving unauthenticated service" ); service.clone() }; ready(Ok(auth_service)) } } +struct ProcessedReport { + id: ReportID, + db_item: ReportItem, + email: ReportEmail, +} + +fn process_report( + input: ReportInput, + user_id: Option, +) -> Result { + let id = ReportID::default(); + let email = crate::email::prepare_email(&input, &id, user_id.as_deref()); + + let ReportInput { + platform_details, + report_type, + time, + mut report_content, + } = input; + + // Add "platformDetails" back to report content. + // It was deserialized into a separate field. + let platform_details_value = serde_json::to_value(&platform_details)?; + report_content.insert("platformDetails".to_string(), platform_details_value); + + // serialize report JSON to bytes + let content_bytes = serde_json::to_vec(&report_content)?; + + let db_item = ReportItem { + id: id.clone(), + user_id: user_id.unwrap_or("[null]".to_string()), + platform: platform_details.platform.clone(), + report_type, + creation_time: time.unwrap_or_else(Utc::now), + encryption_key: None, + content: ReportContent::Database(content_bytes), + }; + + Ok(ProcessedReport { id, db_item, email }) +} + /// Transforms report content JSON into format that can be /// imported into Redux DevTools. fn prepare_redux_devtools_import( mut error_report: HashMap, ) -> Result { use serde_json::{json, map::Map, Value}; let nav_state = error_report.remove("navState"); let actions = error_report.remove("actions"); let mut preloaded_state = error_report .remove("preloadedState") .unwrap_or_else(|| Value::Object(Map::new())); preloaded_state["navState"] = nav_state.into(); preloaded_state["frozen"] = true.into(); preloaded_state["_persist"]["rehydrated"] = false.into(); let preload_state_str = serde_json::to_string(&preloaded_state)?; let payload_str = serde_json::to_string(&actions)?; Ok(json!({ "preloadedState": preload_state_str, "payload": payload_str, })) }