diff --git a/services/reports/Cargo.lock b/services/reports/Cargo.lock --- a/services/reports/Cargo.lock +++ b/services/reports/Cargo.lock @@ -382,6 +382,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-trait" +version = "0.1.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -1101,9 +1112,9 @@ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", "miniz_oxide", @@ -1907,6 +1918,7 @@ "serde", "serde_json", "serde_repr", + "sha256", "tokio", "tokio-stream", "tracing", @@ -1918,9 +1930,9 @@ [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ "base64 0.21.3", "bytes", @@ -2177,6 +2189,19 @@ "digest", ] +[[package]] +name = "sha256" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7895c8ae88588ccead14ff438b939b0c569cd619116f14b4d13fdff7b8333386" +dependencies = [ + "async-trait", + "bytes", + "hex", + "sha2", + "tokio", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -2272,9 +2297,9 @@ [[package]] name = "tempfile" -version = "3.8.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", @@ -2540,9 +2565,9 @@ [[package]] name = "unicase" -version = "2.7.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" dependencies = [ "version_check", ] @@ -2719,9 +2744,9 @@ [[package]] name = "wasm-streams" -version = "0.3.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" dependencies = [ "futures-util", "js-sys", @@ -2849,12 +2874,11 @@ [[package]] name = "winreg" -version = "0.50.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ - "cfg-if", - "windows-sys", + "winapi", ] [[package]] diff --git a/services/reports/Cargo.toml b/services/reports/Cargo.toml --- a/services/reports/Cargo.toml +++ b/services/reports/Cargo.toml @@ -25,6 +25,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_repr = "0.1" +sha256 = "1.4" tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1" tracing = "0.1" diff --git a/services/reports/src/database/item.rs b/services/reports/src/database/item.rs --- a/services/reports/src/database/item.rs +++ b/services/reports/src/database/item.rs @@ -5,12 +5,14 @@ client::{BlobServiceClient, BlobServiceError}, types::BlobInfo, }, + bytes::Bytes, constants::DDB_ITEM_SIZE_LIMIT, database::{ self, AttributeExtractor, AttributeMap, DBItemError, TryFromAttribute, }, }; use num_traits::FromPrimitive; +use tokio_stream::StreamExt; use tracing::debug; use super::constants::*; @@ -82,6 +84,10 @@ return Ok(()); }; + debug!( + report_id = ?self.id, + "Report content exceeds DDB item size limit, moving to blob storage" + ); self.content.move_to_blob(blob_client).await } @@ -211,7 +217,24 @@ &mut self, blob_client: &BlobServiceClient, ) -> Result<(), BlobServiceError> { - todo!() + let Self::Database(ref mut contents) = self else { return Ok(()); }; + let data = std::mem::take(contents); + + let blob_hash = sha256::digest(&data); + let holder = uuid::Uuid::new_v4().to_string(); + + // NOTE: We send the data as a single chunk. This shouldn't be a problem + // unless we start receiving very large reports. In that case, we should + // consider splitting the data into chunks and sending them as a stream. + let data_stream = tokio_stream::once(Result::<_, std::io::Error>::Ok(data)); + + blob_client + .simple_put(&blob_hash, &holder, data_stream) + .await?; + + let new_blob_info = BlobInfo::new(blob_hash, holder); + *self = Self::Blob(new_blob_info); + Ok(()) } /// Fetches report content bytes @@ -222,7 +245,10 @@ match self { ReportContent::Database(data) => Ok(data), ReportContent::Blob(BlobInfo { blob_hash, .. }) => { - todo!() + let stream = blob_client.get(&blob_hash).await?; + let chunks: Vec = stream.collect::>().await?; + let data = chunks.into_iter().flatten().collect(); + Ok(data) } } }