diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs index 2f7437ee5..a05d65769 100644 --- a/services/blob/src/main.rs +++ b/services/blob/src/main.rs @@ -1,36 +1,37 @@ pub mod config; pub mod constants; pub mod database; pub mod grpc; pub mod http; pub mod s3; +pub mod service; pub mod tools; use anyhow::Result; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; fn configure_logging() -> Result<()> { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var(constants::LOG_LEVEL_ENV_VAR) .from_env_lossy(); let subscriber = tracing_subscriber::fmt().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { configure_logging()?; config::parse_cmdline_args()?; let aws_config = config::load_aws_config().await; let db = database::old::DatabaseClient::new(&aws_config); let s3 = s3::S3Client::new(&aws_config); tokio::select! { http_result = crate::http::run_http_server(db.clone(), s3.clone()) => http_result, grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, } } diff --git a/services/blob/src/service.rs b/services/blob/src/service.rs new file mode 100644 index 000000000..8b720a08e --- /dev/null +++ b/services/blob/src/service.rs @@ -0,0 +1,77 @@ +#![allow(unused)] +use std::ops::RangeInclusive; +use std::sync::Arc; + +use async_stream::try_stream; +use chrono::Duration; +use tokio_stream::StreamExt; +use tonic::codegen::futures_core::Stream; +use tracing::{debug, error, trace, warn}; + +use crate::constants::S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE; +use crate::database::types::{ + BlobItemInput, BlobItemRow, PrimaryKey, UncheckedKind, +}; +use crate::database::DBError; +use crate::s3::{Error as S3Error, S3Client, S3Path}; +use crate::tools::{BoxedError, MemOps}; +use crate::{constants::BLOB_DOWNLOAD_CHUNK_SIZE, database::DatabaseClient}; + +#[derive( + Debug, derive_more::Display, derive_more::From, derive_more::Error, +)] +pub enum BlobServiceError { + BlobNotFound, + BlobAlreadyExists, + InvalidState, + DB(DBError), + S3(S3Error), + InputError(#[error(ignore)] BoxedError), +} + +type BlobServiceResult = Result; + +#[derive(Clone, Debug)] +pub struct BlobServiceConfig { + /// Blob data is streamed from S3 in chunks of this size. + pub download_chunk_size: usize, + /// If enabled, orphaned blobs will be deleted immediately after + /// last holder is removed. This option should be enabled + /// if maintenance garbage collection tasks are not run. + pub instant_delete_orphaned_blobs: bool, + /// Minimum age that a orphan must stay unmodified + /// before it can be deleted by a garbage collection task + /// This option is ignored if `instant_delete_orphaned_blobs` is `true` + pub orphan_protection_period: chrono::Duration, +} + +impl Default for BlobServiceConfig { + fn default() -> Self { + BlobServiceConfig { + download_chunk_size: BLOB_DOWNLOAD_CHUNK_SIZE as usize, + instant_delete_orphaned_blobs: false, + orphan_protection_period: Duration::hours(1), + } + } +} + +#[derive(Clone)] +pub struct BlobService { + db: Arc, + s3: S3Client, + config: BlobServiceConfig, +} + +impl BlobService { + pub fn new( + db: DatabaseClient, + s3: S3Client, + config: BlobServiceConfig, + ) -> Self { + Self { + db: Arc::new(db), + s3, + config, + } + } +} diff --git a/services/blob/src/tools.rs b/services/blob/src/tools.rs index 74aaf333e..6569f9e81 100644 --- a/services/blob/src/tools.rs +++ b/services/blob/src/tools.rs @@ -1,59 +1,61 @@ +use std::{env, error::Error as StdError}; + use crate::constants; -use std::env; fn is_env_flag_set(env_var_name: &str) -> bool { let flag_value = env::var(env_var_name).unwrap_or_default().to_lowercase(); return ["1", "true"].contains(&flag_value.as_str()); } /// Returns true if the `COMM_SERVICES_SANDBOX` environment variable is set pub fn is_sandbox_env() -> bool { return is_env_flag_set(constants::SANDBOX_ENV_VAR); } +pub type BoxedError = Box; pub trait MemOps { fn take_out(&mut self) -> Self; } impl MemOps for Vec { /// Moves all the elements of `self` into a new [`Vec`] instance, /// leaving `self` empty. **No copying is performed.** /// The memory capacity of `self` stays unchanged. /// /// In fact, this is the same as [`std::mem::take`] but maintains capacity. /// /// # Example /// ``` /// let mut a = vec![1,2,3,4]; /// let b = a.take_out(); /// assert_eq!(b.len(), 4); /// assert!(a.is_empty()); /// assert_eq!(a.capacity(), b.capacity()); /// ``` fn take_out(&mut self) -> Self { let mut new_vec = Vec::with_capacity(self.capacity()); std::mem::swap(self, &mut new_vec); new_vec } } #[cfg(test)] mod tests { use super::*; #[test] fn test_memops_move_and_clear() { let mut a = vec![1, 2, 3, 4]; let a_ptr_before = a.as_ptr(); let b = a.take_out(); let a_ptr_after = a.as_ptr(); let b_ptr_after = b.as_ptr(); assert_ne!(a_ptr_before, a_ptr_after, "Old ptr didn't change"); assert_eq!(a_ptr_before, b_ptr_after, "Ptr addresses don't match"); assert_eq!(a.capacity(), b.capacity(), "Capacities don't match"); assert!(a.is_empty(), "Original vec isn't empty after move"); assert_eq!(b.len(), 4, "Moved length don't match"); } }