diff --git a/services/reports/Cargo.lock b/services/reports/Cargo.lock --- a/services/reports/Cargo.lock +++ b/services/reports/Cargo.lock @@ -1089,9 +1089,9 @@ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", "miniz_oxide", @@ -1888,6 +1888,7 @@ "clap", "comm-services-lib", "derive_more", + "hex", "http", "num-derive", "num-traits", @@ -1895,6 +1896,7 @@ "serde", "serde_json", "serde_repr", + "sha2", "tokio", "tokio-stream", "tracing", @@ -1906,9 +1908,9 @@ [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ "base64 0.21.3", "bytes", @@ -2260,9 +2262,9 @@ [[package]] name = "tempfile" -version = "3.8.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", @@ -2528,9 +2530,9 @@ [[package]] name = "unicase" -version = "2.7.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" dependencies = [ "version_check", ] @@ -2707,9 +2709,9 @@ [[package]] name = "wasm-streams" -version = "0.3.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" dependencies = [ "futures-util", "js-sys", @@ -2837,12 +2839,11 @@ [[package]] name = "winreg" -version = "0.50.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ - "cfg-if", - "windows-sys", + "winapi", ] [[package]] diff --git a/services/reports/Cargo.toml b/services/reports/Cargo.toml --- a/services/reports/Cargo.toml +++ b/services/reports/Cargo.toml @@ -18,6 +18,7 @@ "http", ] } derive_more = "0.99" +hex = "0.4" http = "0.2" num-traits = "0.2" num-derive = "0.4" @@ -25,6 +26,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_repr = "0.1" +sha2 = "0.10" tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1" tracing = "0.1" diff --git a/services/reports/src/database/item.rs b/services/reports/src/database/item.rs --- a/services/reports/src/database/item.rs +++ b/services/reports/src/database/item.rs @@ -5,12 +5,16 @@ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, + bytes::Bytes, constants::DDB_ITEM_SIZE_LIMIT, database::{ self, AttributeExtractor, AttributeMap, DBItemError, TryFromAttribute, }, }; +use hex::ToHex; use num_traits::FromPrimitive; +use sha2::{Digest, Sha256}; +use tokio_stream::StreamExt; use tracing::debug; use super::constants::*; @@ -82,6 +86,10 @@ return Ok(()); }; + debug!( + report_id = ?self.id, + "Report content exceeds DDB item size limit, moving to blob storage" + ); self.content.move_to_blob(blob_client).await } @@ -211,7 +219,24 @@ &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { - todo!() + let Self::Database(ref mut contents) = self else { return Ok(()); }; + let data = std::mem::take(contents); + + let blob_hash: String = Sha256::digest(&data).encode_hex(); + let holder = uuid::Uuid::new_v4().to_string(); + + // NOTE: We send the data as a single chunk. This shouldn't be a problem + // unless we start receiving very large reports. In that case, we should + // consider splitting the data into chunks and sending them as a stream. + let data_stream = tokio_stream::once(Result::<_, std::io::Error>::Ok(data)); + + blob_client + .simple_put(&blob_hash, &holder, data_stream) + .await?; + + let new_blob_info = BlobInfo::new(blob_hash, holder); + *self = Self::Blob(new_blob_info); + Ok(()) } /// Fetches report content bytes @@ -222,7 +247,10 @@ match self { ReportContent::Database(data) => Ok(data), ReportContent::Blob(BlobInfo { blob_hash, .. }) => { - todo!() + let stream = blob_client.get(&blob_hash).await?; + let chunks: Vec = stream.collect::>().await?; + let data = chunks.into_iter().flatten().collect(); + Ok(data) } } }