Page MenuHomePhabricator

D8959.diff
No OneTemporary

D8959.diff

diff --git a/services/blob/Dockerfile b/services/blob/Dockerfile
--- a/services/blob/Dockerfile
+++ b/services/blob/Dockerfile
@@ -21,7 +21,6 @@
# Copy actual application sources
COPY services/blob .
-COPY shared/protos/blob.proto ../../shared/protos/
# Remove the previously-built binary so that only the application itself is
# rebuilt
diff --git a/services/blob/build.rs b/services/blob/build.rs
deleted file mode 100644
--- a/services/blob/build.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-fn main() {
- println!("cargo:rerun-if-changed=src/main.rs");
-
- println!("cargo:rerun-if-changed=../../shared/protos/blob.proto");
- tonic_build::compile_protos("../../shared/protos/blob.proto")
- .expect("Failed to compile protobuf file");
-}
diff --git a/services/blob/src/config.rs b/services/blob/src/config.rs
--- a/services/blob/src/config.rs
+++ b/services/blob/src/config.rs
@@ -1,19 +1,15 @@
-use anyhow::{ensure, Result};
+use anyhow::Result;
use clap::Parser;
use once_cell::sync::Lazy;
use tracing::info;
use crate::constants::{
- DEFAULT_GRPC_PORT, DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME,
- S3_BUCKET_ENV_VAR,
+ DEFAULT_HTTP_PORT, DEFAULT_S3_BUCKET_NAME, S3_BUCKET_ENV_VAR,
};
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct AppConfig {
- /// gRPC server listening port
- #[arg(long, default_value_t = DEFAULT_GRPC_PORT)]
- pub grpc_port: u16,
/// HTTP server listening port
#[arg(long, default_value_t = DEFAULT_HTTP_PORT)]
pub http_port: u16,
@@ -32,21 +28,14 @@
/// Processes the command-line arguments and environment variables.
/// Should be called at the beginning of the `main()` function.
-pub(super) fn parse_cmdline_args() -> Result<()> {
+pub(super) fn parse_cmdline_args() -> Result<&'static AppConfig> {
// force evaluation of the lazy initialized config
let cfg = Lazy::force(&CONFIG);
- // Perform some additional validation for CLI args
- ensure!(
- cfg.grpc_port != cfg.http_port,
- "gRPC and HTTP ports cannot be the same: {}",
- cfg.grpc_port
- );
-
if cfg.s3_bucket_name != DEFAULT_S3_BUCKET_NAME {
info!("Using custom S3 bucket: {}", &cfg.s3_bucket_name);
}
- Ok(())
+ Ok(cfg)
}
/// Provides region/credentials configuration for AWS SDKs
diff --git a/services/blob/src/constants.rs b/services/blob/src/constants.rs
--- a/services/blob/src/constants.rs
+++ b/services/blob/src/constants.rs
@@ -1,28 +1,8 @@
// Assorted constants
-pub const DEFAULT_GRPC_PORT: u16 = 50051;
-pub const DEFAULT_HTTP_PORT: u16 = 51001;
+pub const DEFAULT_HTTP_PORT: u16 = 50053;
pub const MPSC_CHANNEL_BUFFER_CAPACITY: usize = 1;
-/// 4MB limit
-///
-/// WARNING: use keeping in mind that grpc adds its own headers to messages
-/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
-/// so the message that actually is being sent over the network looks like this
-/// ```
-/// [Compressed-Flag] [Message-Length] [Message]
-/// [Compressed-Flag] 1 byte - added by grpc
-/// [Message-Length] 4 bytes - added by grpc
-/// [Message] N bytes - actual data
-/// ```
-/// so for every message we get 5 additional bytes of data
-/// as [mentioned here](https://github.com/grpc/grpc/issues/15734#issuecomment-396962671),
-/// gRPC stream may contain more than one message
-pub const GRPC_CHUNK_SIZE_LIMIT: u64 = 4 * 1024 * 1024;
-
-/// See [`GRPC_CHUNK_SIZE_LIMIT`] description for details
-pub const GRPC_METADATA_SIZE_PER_MESSAGE: u64 = 5;
-
// HTTP constants
pub const BLOB_DOWNLOAD_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
@@ -49,18 +29,6 @@
pub const ATTR_UNCHECKED: &str = "unchecked";
}
-// old DynamoDB constants
-
-pub const BLOB_TABLE_NAME: &str = "blob-service-blob";
-pub const BLOB_TABLE_BLOB_HASH_FIELD: &str = "blobHash";
-pub const BLOB_TABLE_S3_PATH_FIELD: &str = "s3Path";
-pub const BLOB_TABLE_CREATED_FIELD: &str = "created";
-
-pub const BLOB_REVERSE_INDEX_TABLE_NAME: &str = "blob-service-reverse-index";
-pub const BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD: &str = "holder";
-pub const BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD: &str = "blobHash";
-pub const BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME: &str = "blobHash-index";
-
// Environment variables
pub const LOG_LEVEL_ENV_VAR: &str =
diff --git a/services/blob/src/database/mod.rs b/services/blob/src/database/mod.rs
--- a/services/blob/src/database/mod.rs
+++ b/services/blob/src/database/mod.rs
@@ -1,6 +1,5 @@
pub mod client;
pub mod errors;
-pub mod old;
pub mod types;
pub use client::DatabaseClient;
diff --git a/services/blob/src/database/old.rs b/services/blob/src/database/old.rs
deleted file mode 100644
--- a/services/blob/src/database/old.rs
+++ /dev/null
@@ -1,308 +0,0 @@
-#![allow(deprecated)]
-
-use aws_sdk_dynamodb::{
- operation::get_item::GetItemOutput, types::AttributeValue,
-};
-use chrono::{DateTime, Utc};
-use comm_services_lib::database;
-use std::{collections::HashMap, sync::Arc};
-use tracing::error;
-
-use crate::{
- config::CONFIG,
- constants::{
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME,
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD, BLOB_REVERSE_INDEX_TABLE_NAME,
- BLOB_TABLE_BLOB_HASH_FIELD, BLOB_TABLE_CREATED_FIELD, BLOB_TABLE_NAME,
- BLOB_TABLE_S3_PATH_FIELD,
- },
- s3::S3Path,
-};
-
-use super::errors::{BlobDBError, Error};
-
-#[derive(Clone, Debug)]
-pub struct BlobItem {
- pub blob_hash: String,
- pub s3_path: S3Path,
- pub created: DateTime<Utc>,
-}
-
-impl BlobItem {
- pub fn new(blob_hash: impl Into<String>) -> Self {
- let hash_str = blob_hash.into();
- BlobItem {
- blob_hash: hash_str.clone(),
- s3_path: S3Path {
- bucket_name: CONFIG.s3_bucket_name.clone(),
- object_name: hash_str,
- },
- created: Utc::now(),
- }
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct ReverseIndexItem {
- pub holder: String,
- pub blob_hash: String,
-}
-
-#[derive(Clone)]
-pub struct DatabaseClient {
- client: Arc<aws_sdk_dynamodb::Client>,
-}
-
-impl DatabaseClient {
- pub fn new(aws_config: &aws_types::SdkConfig) -> Self {
- DatabaseClient {
- client: Arc::new(aws_sdk_dynamodb::Client::new(aws_config)),
- }
- }
-
- // Blob item
-
- pub async fn put_blob_item(&self, blob_item: BlobItem) -> Result<(), Error> {
- let item = HashMap::from([
- (
- BLOB_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(blob_item.blob_hash),
- ),
- (
- BLOB_TABLE_S3_PATH_FIELD.to_string(),
- AttributeValue::S(blob_item.s3_path.to_full_path()),
- ),
- (
- BLOB_TABLE_CREATED_FIELD.to_string(),
- AttributeValue::S(blob_item.created.to_rfc3339()),
- ),
- ]);
-
- self
- .client
- .put_item()
- .table_name(BLOB_TABLE_NAME)
- .set_item(Some(item))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to put blob item");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- pub async fn find_blob_item(
- &self,
- blob_hash: &str,
- ) -> Result<Option<BlobItem>, Error> {
- let item_key = HashMap::from([(
- BLOB_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(blob_hash.to_string()),
- )]);
- match self
- .client
- .get_item()
- .table_name(BLOB_TABLE_NAME)
- .set_key(Some(item_key))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find blob item");
- Error::AwsSdk(e.into())
- })? {
- GetItemOutput {
- item: Some(mut item),
- ..
- } => {
- let blob_hash = database::parse_string_attribute(
- BLOB_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_TABLE_BLOB_HASH_FIELD),
- )?;
- let s3_path = database::parse_string_attribute(
- BLOB_TABLE_S3_PATH_FIELD,
- item.remove(BLOB_TABLE_S3_PATH_FIELD),
- )?;
- let created = database::parse_datetime_attribute(
- BLOB_TABLE_CREATED_FIELD,
- item.remove(BLOB_TABLE_CREATED_FIELD),
- )?;
- Ok(Some(BlobItem {
- blob_hash,
- s3_path: S3Path::from_full_path(&s3_path)
- .map_err(|e| Error::Blob(BlobDBError::InvalidS3Path(e)))?,
- created,
- }))
- }
- _ => Ok(None),
- }
- }
-
- pub async fn remove_blob_item(&self, blob_hash: &str) -> Result<(), Error> {
- self
- .client
- .delete_item()
- .table_name(BLOB_TABLE_NAME)
- .key(
- BLOB_TABLE_BLOB_HASH_FIELD,
- AttributeValue::S(blob_hash.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to remove blob item");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- // Reverse index item
-
- pub async fn put_reverse_index_item(
- &self,
- reverse_index_item: ReverseIndexItem,
- ) -> Result<(), Error> {
- let holder = &reverse_index_item.holder;
- if self.find_reverse_index_by_holder(holder).await?.is_some() {
- error!("Failed to put reverse index. Holder already exists.");
- return Err(Error::Blob(BlobDBError::HolderAlreadyExists(
- holder.to_string(),
- )));
- }
-
- let item = HashMap::from([
- (
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(),
- AttributeValue::S(reverse_index_item.holder),
- ),
- (
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD.to_string(),
- AttributeValue::S(reverse_index_item.blob_hash),
- ),
- ]);
- self
- .client
- .put_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .set_item(Some(item))
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to put reverse index");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-
- pub async fn find_reverse_index_by_holder(
- &self,
- holder: &str,
- ) -> Result<Option<ReverseIndexItem>, Error> {
- let item_key = HashMap::from([(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD.to_string(),
- AttributeValue::S(holder.to_string()),
- )]);
- match self
- .client
- .get_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .set_key(Some(item_key))
- .consistent_read(true)
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find reverse index by holder");
- Error::AwsSdk(e.into())
- })? {
- GetItemOutput {
- item: Some(mut item),
- ..
- } => {
- let holder = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD),
- )?;
- let blob_hash = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD),
- )?;
-
- Ok(Some(ReverseIndexItem { holder, blob_hash }))
- }
- _ => Ok(None),
- }
- }
-
- pub async fn find_reverse_index_by_hash(
- &self,
- blob_hash: &str,
- ) -> Result<Vec<ReverseIndexItem>, Error> {
- let response = self
- .client
- .query()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .index_name(BLOB_REVERSE_INDEX_TABLE_HASH_INDEX_NAME)
- .key_condition_expression("#blobHash = :valueToMatch")
- .expression_attribute_names(
- "#blobHash",
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- )
- .expression_attribute_values(
- ":valueToMatch",
- AttributeValue::S(blob_hash.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to find reverse indexes by hash");
- Error::AwsSdk(e.into())
- })?;
-
- if response.count == 0 {
- return Ok(vec![]);
- }
-
- let mut results: Vec<ReverseIndexItem> =
- Vec::with_capacity(response.count() as usize);
- for mut item in response.items.unwrap_or_default() {
- let holder = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD),
- )?;
- let blob_hash = database::parse_string_attribute(
- BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD,
- item.remove(BLOB_REVERSE_INDEX_TABLE_BLOB_HASH_FIELD),
- )?;
-
- results.push(ReverseIndexItem { holder, blob_hash });
- }
-
- return Ok(results);
- }
-
- pub async fn remove_reverse_index_item(
- &self,
- holder: &str,
- ) -> Result<(), Error> {
- self
- .client
- .delete_item()
- .table_name(BLOB_REVERSE_INDEX_TABLE_NAME)
- .key(
- BLOB_REVERSE_INDEX_TABLE_HOLDER_FIELD,
- AttributeValue::S(holder.to_string()),
- )
- .send()
- .await
- .map_err(|e| {
- error!("DynamoDB client failed to remove reverse index");
- Error::AwsSdk(e.into())
- })?;
-
- Ok(())
- }
-}
diff --git a/services/blob/src/grpc.rs b/services/blob/src/grpc.rs
deleted file mode 100644
--- a/services/blob/src/grpc.rs
+++ /dev/null
@@ -1,493 +0,0 @@
-use anyhow::Result;
-use aws_sdk_dynamodb::Error as DynamoDBError;
-use blob::blob_service_server::BlobService;
-use std::{net::SocketAddr, pin::Pin};
-use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
-use tonic::{transport::Server, Request, Response, Status};
-use tracing::{debug, error, info, instrument, trace, warn, Instrument};
-
-use crate::{
- config::CONFIG,
- constants::{
- GRPC_CHUNK_SIZE_LIMIT, GRPC_METADATA_SIZE_PER_MESSAGE,
- MPSC_CHANNEL_BUFFER_CAPACITY, S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE,
- },
- database::errors::Error as DBError,
- database::old::{BlobItem, DatabaseClient, ReverseIndexItem},
- s3::{MultiPartUploadSession, S3Client, S3Path},
- tools::MemOps,
-};
-
-mod blob {
- tonic::include_proto!("blob");
-}
-use blob::blob_service_server::BlobServiceServer;
-
-pub async fn run_grpc_server(
- db_client: DatabaseClient,
- s3_client: S3Client,
-) -> Result<()> {
- let addr: SocketAddr = format!("[::]:{}", CONFIG.grpc_port).parse()?;
- let blob_service = MyBlobService::new(db_client, s3_client);
-
- info!("Starting gRPC server listening at {}", CONFIG.grpc_port);
- Server::builder()
- .add_service(BlobServiceServer::new(blob_service))
- .serve(addr)
- .await?;
-
- Ok(())
-}
-
-struct MyBlobService {
- db: DatabaseClient,
- s3: S3Client,
-}
-
-impl MyBlobService {
- pub fn new(db_client: DatabaseClient, s3_client: S3Client) -> Self {
- MyBlobService {
- db: db_client,
- s3: s3_client,
- }
- }
-
- async fn find_s3_path_by_reverse_index(
- &self,
- reverse_index_item: &ReverseIndexItem,
- ) -> Result<S3Path, Status> {
- let blob_hash = &reverse_index_item.blob_hash;
- match self.db.find_blob_item(&blob_hash).await {
- Ok(Some(BlobItem { s3_path, .. })) => Ok(s3_path),
- Ok(None) => {
- debug!("No blob found for {:?}", reverse_index_item);
- Err(Status::not_found("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }
- }
-
- async fn find_s3_path_by_holder(
- &self,
- holder: &str,
- ) -> Result<S3Path, Status> {
- match self.db.find_reverse_index_by_holder(holder).await {
- Ok(Some(reverse_index)) => {
- self.find_s3_path_by_reverse_index(&reverse_index).await
- }
- Ok(None) => {
- debug!("No db entry found for holder {:?}", holder);
- Err(Status::not_found("blob not found"))
- }
- Err(err) => Err(handle_db_error(err)),
- }
- }
-}
-
-// gRPC implementation
-#[tonic::async_trait]
-impl BlobService for MyBlobService {
- type PutStream =
- Pin<Box<dyn Stream<Item = Result<blob::PutResponse, Status>> + Send>>;
-
- #[instrument(skip_all, fields(holder))]
- async fn put(
- &self,
- request: Request<tonic::Streaming<blob::PutRequest>>,
- ) -> Result<Response<Self::PutStream>, Status> {
- info!("Put blob request: {:?}", request);
- let mut in_stream = request.into_inner();
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let db = self.db.clone();
- let s3 = self.s3.clone();
- let worker = async move {
- let mut put_handler = PutHandler::new(&db, &s3);
-
- while let Some(message) = in_stream.next().await {
- let response = match message {
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::Holder(new_holder)),
- }) => put_handler.handle_holder(new_holder).await,
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::BlobHash(new_hash)),
- }) => put_handler.handle_blob_hash(new_hash).await,
- Ok(blob::PutRequest {
- data: Some(blob::put_request::Data::DataChunk(new_data)),
- }) => put_handler.handle_data_chunk(new_data).await,
- unexpected => {
- error!("Received an unexpected Result: {:?}", unexpected);
- Err(Status::unknown("unknown error"))
- }
- };
- trace!("Sending response: {:?}", response);
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if put_handler.should_close_stream {
- trace!("Put handler requested to close stream");
- break;
- }
- }
-
- if let Err(status) = put_handler.finish().await {
- trace!("Sending error response: {:?}", status);
- let _ = tx.send(Err(status)).await;
- }
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream) as Self::PutStream))
- }
-
- type GetStream =
- Pin<Box<dyn Stream<Item = Result<blob::GetResponse, Status>> + Send>>;
-
- #[instrument(skip_all, fields(holder = %request.get_ref().holder, s3_path))]
- async fn get(
- &self,
- request: Request<blob::GetRequest>,
- ) -> Result<Response<Self::GetStream>, Status> {
- info!("Get blob request: {:?}", request);
- let message: blob::GetRequest = request.into_inner();
- let s3_path = self.find_s3_path_by_holder(&message.holder).await?;
- tracing::Span::current().record("s3_path", s3_path.to_full_path());
-
- let object_metadata =
- self.s3.get_object_metadata(&s3_path).await.map_err(|err| {
- error!("Failed to get S3 object metadata: {:?}", err);
- Status::aborted("server error")
- })?;
-
- let file_size: u64 =
- object_metadata.content_length().try_into().map_err(|err| {
- error!("Failed to get S3 object content length: {:?}", err);
- Status::aborted("server error")
- })?;
- let chunk_size: u64 =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
-
- let (tx, rx) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
- let s3 = self.s3.clone();
-
- let worker = async move {
- let mut offset: u64 = 0;
- while offset < file_size {
- let next_size = std::cmp::min(chunk_size, file_size - offset);
- let range = offset..(offset + next_size);
- trace!(?range, "Getting {} bytes of data", next_size);
-
- let response = match s3.get_object_bytes(&s3_path, range).await {
- Ok(data) => Ok(blob::GetResponse { data_chunk: data }),
- Err(err) => {
- error!("Failed to download data chunk: {:?}", err);
- Err(Status::aborted("download failed"))
- }
- };
-
- let should_abort = response.is_err();
- if let Err(e) = tx.send(response).await {
- error!("Response was dropped: {}", e);
- break;
- }
- if should_abort {
- trace!("Error response, aborting");
- break;
- }
-
- offset += chunk_size;
- }
- };
- tokio::spawn(worker.in_current_span());
-
- let out_stream = ReceiverStream::new(rx);
- Ok(Response::new(Box::pin(out_stream) as Self::GetStream))
- }
-
- #[instrument(skip_all, fields(holder = %request.get_ref().holder))]
- async fn remove(
- &self,
- request: Request<blob::RemoveRequest>,
- ) -> Result<Response<()>, Status> {
- info!("Remove blob request: {:?}", request);
- let message = request.into_inner();
- let holder = message.holder.as_str();
- let reverse_index_item = self
- .db
- .find_reverse_index_by_holder(holder)
- .await
- .map_err(handle_db_error)?
- .ok_or_else(|| {
- debug!("Blob not found");
- Status::not_found("Blob not found")
- })?;
- let blob_hash = &reverse_index_item.blob_hash;
-
- self
- .db
- .remove_reverse_index_item(holder)
- .await
- .map_err(handle_db_error)?;
-
- // TODO handle cleanup here properly
- // for now the object's being removed right away
- // after the last holder was removed
- if self
- .db
- .find_reverse_index_by_hash(blob_hash)
- .await
- .map_err(handle_db_error)?
- .is_empty()
- {
- let s3_path = self
- .find_s3_path_by_reverse_index(&reverse_index_item)
- .await?;
-
- self.s3.delete_object(&s3_path).await.map_err(|err| {
- error!("Failed to delete S3 object: {:?}", err);
- Status::aborted("Internal error")
- })?;
-
- self
- .db
- .remove_blob_item(blob_hash)
- .await
- .map_err(handle_db_error)?;
- }
-
- Ok(Response::new(()))
- }
-}
-
-type PutResult = Result<blob::PutResponse, Status>;
-
-enum PutAction {
- AssignHolder,
- UploadNewBlob(BlobItem),
-}
-
-/// A helper for handling Put RPC requests
-struct PutHandler {
- /// Should the stream be closed by server
- pub should_close_stream: bool,
- action: Option<PutAction>,
-
- holder: Option<String>,
- blob_hash: Option<String>,
- current_chunk: Vec<u8>,
-
- uploader: Option<MultiPartUploadSession>,
- db: DatabaseClient,
- s3: S3Client,
-}
-
-impl PutHandler {
- fn new(db: &DatabaseClient, s3: &S3Client) -> Self {
- PutHandler {
- should_close_stream: false,
- action: None,
- holder: None,
- blob_hash: None,
- current_chunk: Vec::new(),
- uploader: None,
- db: db.clone(),
- s3: s3.clone(),
- }
- }
-
- pub async fn handle_holder(&mut self, new_holder: String) -> PutResult {
- if self.holder.is_some() {
- warn!("Holder already provided");
- return Err(Status::invalid_argument("Holder already provided"));
- }
- tracing::Span::current().record("holder", &new_holder);
- self.holder = Some(new_holder);
- self.determine_action().await
- }
-
- pub async fn handle_blob_hash(&mut self, new_hash: String) -> PutResult {
- if self.blob_hash.is_some() {
- warn!("Blob hash already provided");
- return Err(Status::invalid_argument("Blob hash already provided"));
- }
- debug!("Blob hash: {}", new_hash);
- self.blob_hash = Some(new_hash);
- self.determine_action().await
- }
-
- /// private helper function to determine purpose of this RPC call
- async fn determine_action(&mut self) -> PutResult {
- // this should be called only if action isn't determined yet
- // this error should actually never happen
- if self.action.is_some() {
- error!("Put action is already started");
- return Err(Status::failed_precondition("Put action is already started"));
- }
-
- // holder and hash need both to be set in order to continue
- // otherwise we send a standard response
- if self.holder.is_none() || self.blob_hash.is_none() {
- return Ok(blob::PutResponse { data_exists: false });
- }
- let blob_hash = self
- .blob_hash
- .as_ref()
- .ok_or_else(|| Status::failed_precondition("Internal error"))?;
-
- match self.db.find_blob_item(blob_hash).await {
- // Hash already exists, so we're only assigning a new holder to it
- Ok(Some(_)) => {
- debug!("Blob found, assigning holder");
- self.action = Some(PutAction::AssignHolder);
- self.should_close_stream = true;
- Ok(blob::PutResponse { data_exists: true })
- }
- // Hash doesn't exist, so we're starting a new upload session
- Ok(None) => {
- debug!("Blob not found, starting upload action");
- self.action = Some(PutAction::UploadNewBlob(BlobItem::new(blob_hash)));
- Ok(blob::PutResponse { data_exists: false })
- }
- Err(db_err) => {
- self.should_close_stream = true;
- Err(handle_db_error(db_err))
- }
- }
- }
-
- pub async fn handle_data_chunk(
- &mut self,
- mut new_data: Vec<u8>,
- ) -> PutResult {
- let blob_item = match &self.action {
- Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
- _ => {
- self.should_close_stream = true;
- error!("Data chunk sent before upload action is started");
- return Err(Status::invalid_argument(
- "Holder and hash should be provided before data",
- ));
- }
- };
- trace!("Received {} bytes of data", new_data.len());
-
- // create upload session if it doesn't already exist
- if self.uploader.is_none() {
- debug!("Uploader doesn't exist, starting new session");
- self.uploader =
- match self.s3.start_upload_session(&blob_item.s3_path).await {
- Ok(session) => Some(session),
- Err(err) => {
- self.should_close_stream = true;
- error!("Failed to create upload session: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
- }
- let uploader = self.uploader.as_mut().unwrap();
-
- // New parts should be added to AWS only if they exceed minimum part size,
- // Otherwise AWS returns error
- self.current_chunk.append(&mut new_data);
- if self.current_chunk.len() as u64 > S3_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE
- {
- trace!("Chunk size exceeded, adding new S3 part");
- if let Err(err) = uploader.add_part(self.current_chunk.take_out()).await {
- self.should_close_stream = true;
- error!("Failed to upload S3 part: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
-
- Ok(blob::PutResponse { data_exists: false })
- }
-
- /// This function should be called after the input stream is finished.
- /// This consumes `self` so this put handler instance cannot be used
- /// after this is called.
- pub async fn finish(self) -> Result<(), Status> {
- if self.action.is_none() {
- debug!("No action to perform, finishing now");
- return Ok(());
- }
- let holder = self.holder.ok_or_else(|| {
- error!("Cannot finish action. No holder provided!");
- Status::aborted("Internal error")
- })?;
- let blob_hash = self.blob_hash.ok_or_else(|| {
- error!("Cannot finish action. No blob hash provided!");
- Status::aborted("Internal error")
- })?;
- let blob_item = match self.action {
- None => return Ok(()),
- Some(PutAction::AssignHolder) => {
- return assign_holder_to_blob(&self.db, holder, blob_hash).await;
- }
- Some(PutAction::UploadNewBlob(blob_item)) => blob_item,
- };
-
- let mut uploader = self.uploader.ok_or_else(|| {
- // This also happens when client cancels before sending any data chunk
- warn!("No uploader was created, finishing now");
- Status::aborted("Internal error")
- })?;
-
- if !self.current_chunk.is_empty() {
- if let Err(err) = uploader.add_part(self.current_chunk).await {
- error!("Failed to upload final part: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
- }
-
- if let Err(err) = uploader.finish_upload().await {
- error!("Failed to finish upload session: {:?}", err);
- return Err(Status::aborted("Internal error"));
- }
-
- self
- .db
- .put_blob_item(blob_item)
- .await
- .map_err(handle_db_error)?;
-
- assign_holder_to_blob(&self.db, holder, blob_hash).await?;
-
- debug!("Upload finished successfully");
- Ok(())
- }
-}
-
-async fn assign_holder_to_blob(
- db: &DatabaseClient,
- holder: String,
- blob_hash: String,
-) -> Result<(), Status> {
- let reverse_index_item = ReverseIndexItem { holder, blob_hash };
-
- db.put_reverse_index_item(reverse_index_item)
- .await
- .map_err(handle_db_error)
-}
-
-fn handle_db_error(db_error: DBError) -> Status {
- match db_error {
- DBError::AwsSdk(DynamoDBError::InternalServerError(_))
- | DBError::AwsSdk(DynamoDBError::ProvisionedThroughputExceededException(
- _,
- ))
- | DBError::AwsSdk(DynamoDBError::RequestLimitExceeded(_)) => {
- warn!("AWS transient error occurred");
- Status::unavailable("please retry")
- }
- DBError::Blob(e) => {
- error!("Encountered Blob database error: {}", e);
- Status::failed_precondition("Internal error")
- }
- e => {
- error!("Encountered an unexpected error: {}", e);
- Status::failed_precondition("unexpected error")
- }
- }
-}
diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs
--- a/services/blob/src/main.rs
+++ b/services/blob/src/main.rs
@@ -1,7 +1,6 @@
pub mod config;
pub mod constants;
pub mod database;
-pub mod grpc;
pub mod http;
pub mod s3;
pub mod service;
@@ -29,21 +28,17 @@
config::parse_cmdline_args()?;
let aws_config = config::load_aws_config().await;
- let db = database::old::DatabaseClient::new(&aws_config);
+ let db = database::DatabaseClient::new(&aws_config);
let s3 = s3::S3Client::new(&aws_config);
- let new_db = database::DatabaseClient::new(&aws_config);
let service = service::BlobService::new(
- new_db,
- s3.clone(),
+ db,
+ s3,
BlobServiceConfig {
instant_delete_orphaned_blobs: true,
..Default::default()
},
);
- tokio::select! {
- http_result = crate::http::run_http_server(service) => http_result,
- grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result,
- }
+ crate::http::run_http_server(service).await
}
diff --git a/services/docker-compose.yml b/services/docker-compose.yml
--- a/services/docker-compose.yml
+++ b/services/docker-compose.yml
@@ -44,7 +44,7 @@
- COMM_SERVICES_SANDBOX=${COMM_SERVICES_SANDBOX}
image: commapp/blob-server:0.1
ports:
- - '${COMM_SERVICES_PORT_BLOB}:50051'
+ - '${COMM_SERVICES_PORT_BLOB}:50053'
volumes:
- $HOME/.aws/config:/home/comm/.aws/config:ro
- $HOME/.aws/credentials:/home/comm/.aws/credentials:ro
diff --git a/shared/protos/blob.proto b/shared/protos/blob.proto
deleted file mode 100644
--- a/shared/protos/blob.proto
+++ /dev/null
@@ -1,41 +0,0 @@
-syntax = "proto3";
-
-package blob;
-
-import "google/protobuf/empty.proto";
-
-service BlobService {
- rpc Put(stream PutRequest) returns (stream PutResponse) {}
- rpc Get(GetRequest) returns (stream GetResponse) {}
- rpc Remove(RemoveRequest) returns (google.protobuf.Empty) {}
-}
-
-// Put
-
-message PutRequest {
- oneof data {
- string holder = 1;
- string blobHash = 2;
- bytes dataChunk = 3;
- }
-}
-
-message PutResponse {
- bool dataExists = 1;
-}
-
-// Get
-
-message GetRequest {
- string holder = 1;
-}
-
-message GetResponse {
- bytes dataChunk = 1;
-}
-
-// Remove
-
-message RemoveRequest {
- string holder = 1;
-}

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 1:07 PM (21 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2584812
Default Alt Text
D8959.diff (30 KB)

Event Timeline