Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3366933
D8959.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
30 KB
Referenced Files
None
Subscribers
None
D8959.diff
View Options
diff --git a/services/blob/Dockerfile b/services/blob/Dockerfile
--- a/services/blob/Dockerfile
+++ b/services/blob/Dockerfile
@@ -21,7 +21,6 @@
# Copy actual application sources
COPY services/blob .
-COPY shared/protos/blob.proto ../../shared/protos/
# Remove the previously-built binary so that only the application itself is
# rebuilt
diff --git a/services/blob/build.rs b/services/blob/build.rs
deleted file mode 100644
--- a/services/blob/build.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-fn main() {
- println!("cargo:rerun-if-changed=src/main.rs");
-
- println!("cargo:rerun-if-changed=../../shared/protos/blob.proto");
- tonic_build::compile_protos("../../shared/protos/blob.proto")
- .expect("Failed to compile protobuf file");
-}
diff --git a/services/blob/src/config.rs b/services/blob/src/config.rs
--- a/services/blob/src/config.rs
+++ b/services/blob/src/config.rs
@@ -1,19 +1,15 @@
-use anyhow::{ensure, Result};
+use anyhow::Result;
use clap::Parser;
use once_cell::sync::Lazy;
use tracing::info;
use crate::constants::{
- DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME,
- S3_BUCKET_ENV_VAR,
+ DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, S3_BUCKET_ENV_VAR,
};
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct AppConfig {
- /// gRPC server listening port
- #[arg(long, default_value_t = DEFAULT_GRPC_PORT)]
- pub grpc_port: u16,
/// HTTP server listening port
#[arg(long, default_value_t = DEFAULT_HTTP_PORT)]
pub http_port: u16,
@@ -32,21 +28,14 @@
/// Processes the command-line arguments and environment variables.
/// Should be called at the beginning of the `main()` function.
-pub(super) fn parse_cmdline_args() -> Result<()> {
+pub(super) fn parse_cmdline_args() -> Result<&'static AppConfig> {
// force evaluation of the lazy initialized config
let cfg = Lazy::force(&CONFIG);
- // Perform some additional validation for CLI args
- ensure!(
- cfg.grpc_port != cfg.http_port,
- "gRPC and HTTP ports cannot be the same: {}",
- cfg.grpc_port
- );
-
if cfg.s3_bucket_name != DEFAULT_S3_BUCKET_NAME {
info!("Using custom S3 bucket: {}", &cfg.s3_bucket_name);
}
- Ok(())
+ Ok(cfg)
}
/// Provides region/credentials configuration for AWS SDKs
diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -1,28 +1,8 @@
// Assorted constants
-pub const DEFAULT_GRPC_PORT: u16 = 50051;
-pub const DEFAULT_HTTP_PORT: u16 = 51001;
+pub const DEFAULT_HTTP_PORT: u16 = 50053;
pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
-/// 4MB limit
-///
-/// WARNING: use keeping in mind that grpc adds its own headers to messages
-/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
-/// so the message that actually is being sent over the network looks like this
-/// ```
-/// [Compressed-Flag] [Message-Length] [Message]
-/// [Compressed-Flag] 1 byte - added by grpc
-/// [Message-Length] 4 bytes - added by grpc
-/// [Message] N bytes - actual data
-/// ```
-/// so for every message we get 5 additional bytes of data
-/// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671),
-/// gRPC stream may contain more than one message
-pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024;
-
-/// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details
-pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5;
-
// HTTP constants
pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
@@ -49,18 +29,6 @@
pub const ATTR_UNCHECKED: &str = "unchecked";
}
-// old DynamoDB constants
-
-pub const BLOB_TABLE_NAME: &str = "blob-service-blob";
-pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash";
-pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path";
-pub const BLOB_TABLE_CREATED_FIELD: &str = "created";
-
-pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index";
-pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder";
-pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash";
-pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index";
-
// Environment variables
pub const LOG_LEVEL_ENV_VAR: &str =
diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs
--- a/services/blob/src/database/mod.rs
+++ b/services/blob/src/database/mod.rs
@@ -1,6 +1,5 @@
pub mod client;
pub mod errors;
-pub mod old;
pub mod types;
pub use client::DatabaseClient;
diff --git a/services/blob/src/database/old.rs b/services/blob/src/database/old.rs
deleted file mode 100644
--- a/services/blob/src/database/old.rs
+++ /dev/null
@@ -1,308 +0,0 @@
-#![allow(deprecated)]
-
-use aws_sdk_dynamodb::{
- operation::get_item::GetItemOutput, types::AttributeValue,
-};
-use chrono::{DateTime, Utc};
-use comm_services_lib::database;
-use std::{collections::HashMap, sync::Arc};
-use tracing::error;
-
-use crate::{
- config::CONFIG,
- constants::{
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME,
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME,
- BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME,
- BLOB_TABLE_S3_PATH_FIELD,
- },
- s3::S3Path,
-};
-
-use super::errors::{BlobDBError, Error};
-
-#[derive(Clone, Debug)]
-pub struct BlobItem {
- pub blob_hash: String,
- pub s3_path: S3Path,
- pub created: DateTime<Utc>,
-}
-
-impl BlobItem {
- pub fn new(blob_hash: impl Into<String>) -> Self {
- let hash_str = blob_hash.into();
- BlobItem {
- blob_hash: hash_str.clone(),
- s3_path: S3Path {
- bucket_name: CONFIG.s3_bucket_name.clone(),
- object_name: hash_str,
- },
- created: Utc::now(),
- }
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct ReverseIndexItem {
- pub holder: String,
- pub blob_hash: String,
-}
-
-#[derive(Clone)]
-pub struct DatabaseClient {
- client: Arc<aws_sdk_dynamodb::Client>,
-}
-
-impl DatabaseClient {
- pub fn new(aws_config: &aws_types::SdkConfig) -> Self {
- DatabaseClient {
- client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)),
- }
- }
-
- // Blob item
-
- pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> {
- let item = HashMap::from([
- (
- BLOB_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(blob_item.blob_hash),
- ),
- (
- BLOB_TABLE_S3_PATH_FIELD.to_string(),
- AttributeValue::S(blob_item.s3_path.to_full_path()),
- ),
- (
- BLOB_TABLE_CREATED_FIELD.to_string(),
- AttributeValue::S(blob_item.created.to_rfc3339()),
- ),
- ]);
-
- self
- .client
- .put_item()
- .table_name(BLOB_TABLE_NAME)
- .set_item(Some(item))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to put blob item");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- pub async fn find_blob_item(
- &self,
- blob_hash: &str,
- ) -> Result<Option<BlobItem>, Error> {
- let item_key = HashMap::from([(
- BLOB_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(blob_hash.to_string()),
- )]);
- match self
- .client
- .get_item()
- .table_name(BLOB_TABLE_NAME)
- .set_key(Some(item_key))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find blob item");
- Error::AwsSdk(e.into())
- })? {
- GetItemOutput {
- item: Some(mut item),
- ..
- } => {
- let blob_hash = database::parse_string_attribute(
- BLOB_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_TABLE_BLOB_HASH_FIELD),
- )?;
- let s3_path = database::parse_string_attribute(
- BLOB_TABLE_S3_PATH_FIELD,
- item.remove(BLOB_TABLE_S3_PATH_FIELD),
- )?;
- let created = database::parse_datetime_attribute(
- BLOB_TABLE_CREATED_FIELD,
- item.remove(BLOB_TABLE_CREATED_FIELD),
- )?;
- Ok(Some(BlobItem {
- blob_hash,
- s3_path: S3Path::from_full_path(&s3_path)
- .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?,
- created,
- }))
- }
- _ => Ok(None),
- }
- }
-
- pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> {
- self
- .client
- .delete_item()
- .table_name(BLOB_TABLE_NAME)
- .key(
- BLOB_TABLE_BLOB_HASH_FIELD,
- AttributeValue::S(blob_hash.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to remove blob item");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- // Reverse index item
-
- pub async fn put_reverse_index_item(
- &self,
- reverse_index_item: ReverseIndexItem,
- ) -> Result<(), Error> {
- let holder = &reverse_index_item.holder;
- if self.find_reverse_index_by_holder(holder).await?.is_some() {
- error!("Failed to put reverse index. Holder already exists.");
- return Err(Error::Blob(BlobDBError::HolderAlreadyExists(
- holder.to_string(),
- )));
- }
-
- let item = HashMap::from([
- (
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(),
- AttributeValue::S(reverse_index_item.holder),
- ),
- (
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(reverse_index_item.blob_hash),
- ),
- ]);
- self
- .client
- .put_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .set_item(Some(item))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to put reverse index");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- pub async fn find_reverse_index_by_holder(
- &self,
- holder: &str,
- ) -> Result<Option<ReverseIndexItem>, Error> {
- let item_key = HashMap::from([(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(),
- AttributeValue::S(holder.to_string()),
- )]);
- match self
- .client
- .get_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .set_key(Some(item_key))
- .consistent_read(true)
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find reverse index by holder");
- Error::AwsSdk(e.into())
- })? {
- GetItemOutput {
- item: Some(mut item),
- ..
- } => {
- let holder = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD),
- )?;
- let blob_hash = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD),
- )?;
-
- Ok(Some(ReverseIndexItem { holder, blob_hash }))
- }
- _ => Ok(None),
- }
- }
-
- pub async fn find_reverse_index_by_hash(
- &self,
- blob_hash: &str,
- ) -> Result<Vec<ReverseIndexItem>, Error> {
- let response = self
- .client
- .query()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME)
- .key_condition_expression("#blobHash = :valueToMatch")
- .expression_attribute_names(
- "#blobHash",
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- )
- .expression_attribute_values(
- ":valueToMatch",
- AttributeValue::S(blob_hash.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find reverse indexes by hash");
- Error::AwsSdk(e.into())
- })?;
-
- if response.count == 0 {
- return Ok(vec![]);
- }
-
- let mut results: Vec<ReverseIndexItem> =
- Vec::with_capacity(response.count() as usize);
- for mut item in response.items.unwrap_or_default() {
- let holder = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD),
- )?;
- let blob_hash = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD),
- )?;
-
- results.push(ReverseIndexItem { holder, blob_hash });
- }
-
- return Ok(results);
- }
-
- pub async fn remove_reverse_index_item(
- &self,
- holder: &str,
- ) -> Result<(), Error> {
- self
- .client
- .delete_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .key(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- AttributeValue::S(holder.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to remove reverse index");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-}
diff --git a/services/blob/src/grpc.rs b/services/blob/src/grpc.rs
deleted file mode 100644
--- a/services/blob/src/grpc.rs
+++ /dev/null
@@ -1,493 +0,0 @@
-use anyhow::Result;
-use aws_sdk_dynamodb::Error as DynamoDBError;
-use blob::blob_service_server::BlobService;
-use std::{net::SocketAddr, pin::Pin};
-use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
-use tonic::{transport::Server, Request, Response, Status};
-use tracing::{debug, error, info, instrument, trace, warn, Instrument};
-
-use crate::{
- config::CONFIG,
- constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
- },
- database::errors::Error as DBError,
- database::old::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::{MultiPartUploadSession, S3Client, S3Path},
- tools::MemOps,
-};
-
-mod blob {
- tonic::include_proto!("blob");
-}
-use blob::blob_service_server::BlobServiceServer;
-
-pub async fn run_grpc_server(
- db_client: DatabaseClient,
- s3_client: S3Client,
-) -> Result<()> {
- let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?;
- let blob_service = MyBlobService::new(db_client, s3_client);
-
- info!("Starting gRPC server listening at {}", CONFIG.grpc_port);
- Server::builder()
- .add_service(BlobServiceServer::new(blob_service))
- .serve(addr)
- .await?;
-
- Ok(())
-}
-
-struct MyBlobService {
- db: DatabaseClient,
- s3: S3Client,
-}
-
-impl MyBlobService {
- pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self {
- MyBlobService {
- db: db_client,
- s3: s3_client,
- }
- }
-
- async fn find_s3_path_by_reverse_index(
- &self,
- reverse_index_item: &ReverseIndexItem,
- ) -> Result<S3Path, Status> {
- let blob_hash = &reverse_index_item.blob_hash;
- match self.db.find_blob_item(&blob_hash).await {
- Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => {
- debug!("No blob found for {:?}", reverse_index_item);
- Err(Status::not_found("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }
- }
-
- async fn find_s3_path_by_holder(
- &self,
- holder: &str,
- ) -> Result<S3Path, Status> {
- match self.db.find_reverse_index_by_holder(holder).await {
- Ok(Some(reverse_index)) => {
- self.find_s3_path_by_reverse_index(&reverse_index).await
- }
- Ok(None) => {
- debug!("No db entry found for holder {:?}", holder);
- Err(Status::not_found("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }
- }
-}
-
-// gRPC implementation
-#[tonic::async_trait]
-impl BlobService for MyBlobService {
- type PutStream =
- Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
-
- #[instrument(skip_all, fields(holder))]
- async fn put(
- &self,
- request: Request<tonic::Streaming<blob::PutRequest>>,
- ) -> Result<Response<Self::PutStream>, Status> {
- info!("Put blob request: {:?}", request);
- let mut in_stream = request.into_inner();
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let db = self.db.clone();
- let s3 = self.s3.clone();
- let worker = async move {
- let mut put_handler = PutHandler::new(&db, &s3);
-
- while let Some(message) = in_stream.next().await {
- let response = match message {
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::Holder(new_holder)),
- }) => put_handler.handle_holder(new_holder).await,
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::BlobHash(new_hash)),
- }) => put_handler.handle_blob_hash(new_hash).await,
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::DataChunk(new_data)),
- }) => put_handler.handle_data_chunk(new_data).await,
- unexpected => {
- error!("Received an unexpected Result: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
- trace!("Sending response: {:?}", response);
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if put_handler.should_close_stream {
- trace!("Put handler requested to close stream");
- break;
- }
- }
-
- if let Err(status) = put_handler.finish().await {
- trace!("Sending error response: {:?}", status);
- let _ = tx.send(Err(status)).await;
- }
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
- }
-
- type GetStream =
- Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>;
-
- #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))]
- async fn get(
- &self,
- request: Request<blob::GetRequest>,
- ) -> Result<Response<Self::GetStream>, Status> {
- info!("Get blob request: {:?}", request);
- let message: blob::GetRequest = request.into_inner();
- let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
- tracing::Span::current().record("s3_path", s3_path.to_full_path());
-
- let object_metadata =
- self.s3.get_object_metadata(&s3_path).await.map_err(|err| {
- error!("Failed to get S3 object metadata: {:?}", err);
- Status::aborted("server error")
- })?;
-
- let file_size: u64 =
- object_metadata.content_length().try_into().map_err(|err| {
- error!("Failed to get S3 object content length: {:?}", err);
- Status::aborted("server error")
- })?;
- let chunk_size: u64 =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
-
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let s3 = self.s3.clone();
-
- let worker = async move {
- let mut offset: u64 = 0;
- while offset < file_size {
- let next_size = std::cmp::min(chunk_size, file_size - offset);
- let range = offset..(offset + next_size);
- trace!(?range, "Getting {} bytes of data", next_size);
-
- let response = match s3.get_object_bytes(&s3_path, range).await {
- Ok(data) => Ok(blob::GetResponse { data_chunk: data }),
- Err(err) => {
- error!("Failed to download data chunk: {:?}", err);
- Err(Status::aborted("download failed"))
- }
- };
-
- let should_abort = response.is_err();
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if should_abort {
- trace!("Error response, aborting");
- break;
- }
-
- offset += chunk_size;
- }
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream) as Self::GetStream))
- }
-
- #[instrument(skip_all, fields(holder = %request.get_ref().holder))]
- async fn remove(
- &self,
- request: Request<blob::RemoveRequest>,
- ) -> Result<Response<()>, Status> {
- info!("Remove blob request: {:?}", request);
- let message = request.into_inner();
- let holder = message.holder.as_str();
- let reverse_index_item = self
- .db
- .find_reverse_index_by_holder(holder)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Blob not found");
- Status::not_found("Blob not found")
- })?;
- let blob_hash = &reverse_index_item.blob_hash;
-
- self
- .db
- .remove_reverse_index_item(holder)
- .await
- .map_err(handle_db_error)?;
-
- // TODO handle cleanup here properly
- // for now the object's being removed right away
- // after the last holder was removed
- if self
- .db
- .find_reverse_index_by_hash(blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_empty()
- {
- let s3_path = self
- .find_s3_path_by_reverse_index(&reverse_index_item)
- .await?;
-
- self.s3.delete_object(&s3_path).await.map_err(|err| {
- error!("Failed to delete S3 object: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- self
- .db
- .remove_blob_item(blob_hash)
- .await
- .map_err(handle_db_error)?;
- }
-
- Ok(Response::new(()))
- }
-}
-
-type PutResult = Result<blob::PutResponse, Status>;
-
-enum PutAction {
- AssignHolder,
- UploadNewBlob(BlobItem),
-}
-
-/// A helper for handling Put RPC requests
-struct PutHandler {
- /// Should the stream be closed by server
- pub should_close_stream: bool,
- action: Option<PutAction>,
-
- holder: Option<String>,
- blob_hash: Option<String>,
- current_chunk: Vec<u8>,
-
- uploader: Option<MultiPartUploadSession>,
- db: DatabaseClient,
- s3: S3Client,
-}
-
-impl PutHandler {
- fn new(db: &DatabaseClient, s3: &S3Client) -> Self {
- PutHandler {
- should_close_stream: false,
- action: None,
- holder: None,
- blob_hash: None,
- current_chunk: Vec::new(),
- uploader: None,
- db: db.clone(),
- s3: s3.clone(),
- }
- }
-
- pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
- if self.holder.is_some() {
- warn!("Holder already provided");
- return Err(Status::invalid_argument("Holder already provided"));
- }
- tracing::Span::current().record("holder", &new_holder);
- self.holder = Some(new_holder);
- self.determine_action().await
- }
-
- pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
- if self.blob_hash.is_some() {
- warn!("Blob hash already provided");
- return Err(Status::invalid_argument("Blob hash already provided"));
- }
- debug!("Blob hash: {}", new_hash);
- self.blob_hash = Some(new_hash);
- self.determine_action().await
- }
-
- /// private helper function to determine purpose of this RPC call
- async fn determine_action(&mut self) -> PutResult {
- // this should be called only if action isn't determined yet
- // this error should actually never happen
- if self.action.is_some() {
- error!("Put action is already started");
- return Err(Status::failed_precondition("Put action is already started"));
- }
-
- // holder and hash need both to be set in order to continue
- // otherwise we send a standard response
- if self.holder.is_none() || self.blob_hash.is_none() {
- return Ok(blob::PutResponse { data_exists: false });
- }
- let blob_hash = self
- .blob_hash
- .as_ref()
- .ok_or_else(|| Status::failed_precondition("Internal error"))?;
-
- match self.db.find_blob_item(blob_hash).await {
- // Hash already exists, so we're only assigning a new holder to it
- Ok(Some(_)) => {
- debug!("Blob found, assigning holder");
- self.action = Some(PutAction::AssignHolder);
- self.should_close_stream = true;
- Ok(blob::PutResponse { data_exists: true })
- }
- // Hash doesn't exist, so we're starting a new upload session
- Ok(None) => {
- debug!("Blob not found, starting upload action");
- self.action = Some(PutAction::UploadNewBlob(BlobItem::new(blob_hash)));
- Ok(blob::PutResponse { data_exists: false })
- }
- Err(db_err) => {
- self.should_close_stream = true;
- Err(handle_db_error(db_err))
- }
- }
- }
-
- pub async fn handle_data_chunk(
- &mut self,
- mut new_data: Vec<u8>,
- ) -> PutResult {
- let blob_item = match &self.action {
- Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
- _ => {
- self.should_close_stream = true;
- error!("Data chunk sent before upload action is started");
- return Err(Status::invalid_argument(
- "Holder and hash should be provided before data",
- ));
- }
- };
- trace!("Received {} bytes of data", new_data.len());
-
- // create upload session if it doesn't already exist
- if self.uploader.is_none() {
- debug!("Uploader doesn't exist, starting new session");
- self.uploader =
- match self.s3.start_upload_session(&blob_item.s3_path).await {
- Ok(session) => Some(session),
- Err(err) => {
- self.should_close_stream = true;
- error!("Failed to create upload session: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
- }
- let uploader = self.uploader.as_mut().unwrap();
-
- // New parts should be added to AWS only if they exceed minimum part size,
- // Otherwise AWS returns error
- self.current_chunk.append(&mut new_data);
- if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
- {
- trace!("Chunk size exceeded, adding new S3 part");
- if let Err(err) = uploader.add_part(self.current_chunk.take_out()).await {
- self.should_close_stream = true;
- error!("Failed to upload S3 part: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
-
- Ok(blob::PutResponse { data_exists: false })
- }
-
- /// This function should be called after the input stream is finished.
- /// This consumes `self` so this put handler instance cannot be used
- /// after this is called.
- pub async fn finish(self) -> Result<(), Status> {
- if self.action.is_none() {
- debug!("No action to perform, finishing now");
- return Ok(());
- }
- let holder = self.holder.ok_or_else(|| {
- error!("Cannot finish action. No holder provided!");
- Status::aborted("Internal error")
- })?;
- let blob_hash = self.blob_hash.ok_or_else(|| {
- error!("Cannot finish action. No blob hash provided!");
- Status::aborted("Internal error")
- })?;
- let blob_item = match self.action {
- None => return Ok(()),
- Some(PutAction::AssignHolder) => {
- return assign_holder_to_blob(&self.db, holder, blob_hash).await;
- }
- Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
- };
-
- let mut uploader = self.uploader.ok_or_else(|| {
- // This also happens when client cancels before sending any data chunk
- warn!("No uploader was created, finishing now");
- Status::aborted("Internal error")
- })?;
-
- if !self.current_chunk.is_empty() {
- if let Err(err) = uploader.add_part(self.current_chunk).await {
- error!("Failed to upload final part: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
-
- if let Err(err) = uploader.finish_upload().await {
- error!("Failed to finish upload session: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
-
- self
- .db
- .put_blob_item(blob_item)
- .await
- .map_err(handle_db_error)?;
-
- assign_holder_to_blob(&self.db, holder, blob_hash).await?;
-
- debug!("Upload finished successfully");
- Ok(())
- }
-}
-
-async fn assign_holder_to_blob(
- db: &DatabaseClient,
- holder: String,
- blob_hash: String,
-) -> Result<(), Status> {
- let reverse_index_item = ReverseIndexItem { holder, blob_hash };
-
- db.put_reverse_index_item(reverse_index_item)
- .await
- .map_err(handle_db_error)
-}
-
-fn handle_db_error(db_error: DBError) -> Status {
- match db_error {
- DBError::AwsSdk(DynamoDBError::InternalServerError(_))
- | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException(
- _,
- ))
- | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => {
- warn!("AWS transient error occurred");
- Status::unavailable("please retry")
- }
- DBError::Blob(e) => {
- error!("Encountered Blob database error: {}", e);
- Status::failed_precondition("Internal error")
- }
- e => {
- error!("Encountered an unexpected error: {}", e);
- Status::failed_precondition("unexpected error")
- }
- }
-}
diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs
--- a/services/blob/src/main.rs
+++ b/services/blob/src/main.rs
@@ -1,7 +1,6 @@
pub mod config;
pub mod constants;
pub mod database;
-pub mod grpc;
pub mod http;
pub mod s3;
pub mod service;
@@ -29,21 +28,17 @@
config::parse_cmdline_args()?;
let aws_config = config::load_aws_config().await;
- let db = database::old::DatabaseClient::new(&aws_config);
+ let db = database::DatabaseClient::new(&aws_config);
let s3 = s3::S3Client::new(&aws_config);
- let new_db = database::DatabaseClient::new(&aws_config);
let service = service::BlobService::new(
- new_db,
- s3.clone(),
+ db,
+ s3,
BlobServiceConfig {
instant_delete_orphaned_blobs: true,
..Default::default()
},
);
- tokio::select! {
- http_result = crate::http::run_http_server(service) => http_result,
- grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result,
- }
+ crate::http::run_http_server(service).await
}
diff --git a/services/docker-compose.yml b/services/docker-compose.yml
--- a/services/docker-compose.yml
+++ b/services/docker-compose.yml
@@ -44,7 +44,7 @@
- COMM_SERVICES_SANDBOX=${COMM_SERVICES_SANDBOX}
image: commapp/blob-server:0.1
ports:
- - '${COMM_SERVICES_PORT_BLOB}:50051'
+ - '${COMM_SERVICES_PORT_BLOB}:50053'
volumes:
- $HOME/.aws/config:/home/comm/.aws/config:ro
- $HOME/.aws/credentials:/home/comm/.aws/credentials:ro
diff --git a/shared/protos/blob.proto b/shared/protos/blob.proto
deleted file mode 100644
--- a/shared/protos/blob.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-syntax = "proto3";
-
-package blob;
-
-import "google/protobuf/empty.proto";
-
-service BlobService {
- rpc Put(stream PutRequest) returns (stream PutResponse) {}
- rpc Get(GetRequest) returns (stream GetResponse) {}
- rpc Remove(RemoveRequest) returns (google.protobuf.Empty) {}
-}
-
-// Put
-
-message PutRequest {
- oneof data {
- string holder = 1;
- string blobHash = 2;
- bytes dataChunk = 3;
- }
-}
-
-message PutResponse {
- bool dataExists = 1;
-}
-
-// Get
-
-message GetRequest {
- string holder = 1;
-}
-
-message GetResponse {
- bytes dataChunk = 1;
-}
-
-// Remove
-
-message RemoveRequest {
- string holder = 1;
-}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 26, 1:07 PM (21 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2584812
Default Alt Text
D8959.diff (30 KB)
Attached To
Mode
D8959: [blob-service] Remove gRPC service, change default port
Attached
Detach File
Event Timeline
Log In to Comment