diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs --- a/services/blob/src/service.rs +++ b/services/blob/src/service.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use std::ops::RangeInclusive; +use std::ops::{Bound, Range, RangeBounds, RangeInclusive}; use std::sync::Arc; use async_stream::try_stream; @@ -74,4 +74,104 @@ config, } } + + /// Retrieves blob object metadata and returns a download object + /// that can be used to download the blob data. + pub async fn create_download( + &self, + blob_hash: impl Into, + ) -> BlobServiceResult { + // 1. Get S3 path + let s3_path = match self.db.get_blob_item(blob_hash.into()).await { + Ok(Some(BlobItemRow { s3_path, .. })) => Ok(s3_path), + Ok(None) => { + debug!("Blob not found"); + Err(BlobServiceError::BlobNotFound) + } + Err(err) => Err(BlobServiceError::DB(err)), + }?; + debug!("S3 path: {:?}", s3_path); + + // 2. Get S3 Object metadata + trace!("Getting S3 object metadata..."); + let object_metadata = self.s3.get_object_metadata(&s3_path).await?; + let blob_size: u64 = + object_metadata.content_length().try_into().map_err(|err| { + error!("Failed to parse S3 object content length: {:?}", err); + BlobServiceError::InvalidState + })?; + debug!("S3 object size: {} bytes", blob_size); + + // 3. Create download session + let session = BlobDownloadObject { + s3_path, + blob_size, + byte_range: 0..blob_size, + chunk_size: self.config.download_chunk_size as u64, + s3_client: self.s3.clone(), + }; + Ok(session) + } +} + +pub struct BlobDownloadObject { + /// Size of the whole blob object in bytes. + pub blob_size: u64, + /// Range of bytes to be downloaded (exclusive end). + byte_range: Range, + chunk_size: u64, + s3_client: S3Client, + s3_path: S3Path, +} + +impl BlobDownloadObject { + pub fn set_byte_range(&mut self, range: impl RangeBounds) { + let range_start = match range.start_bound() { + Bound::Included(&start) => start, + Bound::Excluded(&start) => start + 1, + Bound::Unbounded => 0, + }; + let range_end = match range.end_bound() { + Bound::Included(&end) => end + 1, + Bound::Excluded(&end) => end, + Bound::Unbounded => self.blob_size, + }; + // Clamp range to blob size + let start = std::cmp::max(range_start, 0); + let end_exclusive = std::cmp::min(range_end, self.blob_size); + + self.byte_range = start..end_exclusive; + debug!("Requested byte range: {}..{}", start, end_exclusive); + } + + /// Size of the data to be downloaded in bytes. + pub fn download_size(&self) -> u64 { + self.byte_range.end - self.byte_range.start + } + + pub fn into_stream(self) -> impl Stream>> { + let BlobDownloadObject { + byte_range, + chunk_size, + s3_path, + s3_client, + .. + } = self; + + try_stream! { + trace!("Starting download stream"); + let mut offset: u64 = byte_range.start; + while offset < byte_range.end { + let next_size = std::cmp::min(chunk_size, byte_range.end - offset); + let range = offset..(offset + next_size); + trace!(?range, "Getting {} bytes of data", next_size); + + yield s3_client + .get_object_bytes(&s3_path, range) + .await?; + + offset += next_size; + } + } + } }