Page MenuHomePhabricator

D5768.id19013.diff
No OneTemporary

D5768.id19013.diff

diff --git a/services/blob/old/CMakeLists.txt b/services/blob/old/CMakeLists.txt
deleted file mode 100644
--- a/services/blob/old/CMakeLists.txt
+++ /dev/null
@@ -1,139 +0,0 @@
-PROJECT(blob C CXX)
-
-cmake_minimum_required(VERSION 3.16)
-
-set(CMAKE_CXX_STANDARD 14)
-set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin)
-
-if(COMMAND cmake_policy)
- cmake_policy(SET CMP0003 NEW)
-endif(COMMAND cmake_policy)
-
-set(CMAKE_CXX_STANDARD 17)
-
-# FIND LIBS
-find_package(glog REQUIRED)
-find_package(Protobuf REQUIRED)
-find_package(gRPC REQUIRED)
-find_package(Folly REQUIRED)
-find_package(AWSSDK REQUIRED COMPONENTS s3 core dynamodb)
-find_package(Boost 1.40
- COMPONENTS program_options filesystem context regex system thread
- REQUIRED
-)
-find_package(OpenSSL REQUIRED)
-find_package(double-conversion REQUIRED)
-
-if(${CMAKE_CURRENT_SOURCE_DIR} MATCHES "^\/transferred.*")
- # Inside the docker build contex
- set(_proto_path "grpc")
- set(_lib_path "../lib/src")
-else()
- # Inside repo
- set(_proto_path "../../../shared/protos")
- set(_lib_path "../../lib/src")
-endif()
-
-# Shared Comm protos
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_proto_path}
- ${CMAKE_CURRENT_BINARY_DIR}/protos
- EXCLUDE_FROM_ALL
-)
-
-# Reference native/cpp projects
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_lib_path}
- ${CMAKE_CURRENT_BINARY_DIR}/common # CMake's build directory
- EXCLUDE_FROM_ALL # Don't build everything, just what we need
-)
-
-file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp")
-
-# SERVER
-add_executable(
- blob
-
- ${SOURCE_CODE}
-)
-
-set(
- INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${CMAKE_CURRENT_SOURCE_DIR}/src/server-base-reactors
- ${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors
-
- ${Boost_INCLUDE_DIR}
-)
-
-target_include_directories(
- blob
- PUBLIC
- ${INCLUDE_DIRS}
-)
-
-set(
- LIBS
-
- ${GRPC_LIBS}
- ${AWSSDK_LINK_LIBRARIES}
- ${Boost_LIBRARIES}
- OpenSSL::SSL
- glog::glog
- gRPC::grpc++
- double-conversion::double-conversion
- Folly::folly
-
- comm-blob-grpc
- comm-server-base-reactors
- comm-services-common
-)
-
-target_link_libraries(
- blob
-
- ${LIBS}
-)
-
-install(
- TARGETS blob
- RUNTIME DESTINATION bin/
-)
-
-# TEST
-option(BUILD_TESTING "Turn on tests" ON)
-if (BUILD_TESTING)
- file(GLOB TEST_CODE "./test/*.cpp")
- list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp")
- enable_testing()
-
- find_package(GTest CONFIG REQUIRED)
- include_directories(
- ${GTEST_INCLUDE_DIR}
- ./test
- )
-
- add_executable(
- runTests
-
- ${SOURCE_CODE}
- ${TEST_CODE}
- )
-
- target_include_directories(
- runTests
- PUBLIC
- ${INCLUDE_DIRS}
- )
-
- target_link_libraries(
- runTests
-
- ${LIBS}
- GTest::gtest_main
- )
-
- include(GoogleTest)
- gtest_discover_tests(runTests)
-endif()
diff --git a/services/blob/old/src/AwsS3Bucket.h b/services/blob/old/src/AwsS3Bucket.h
deleted file mode 100644
--- a/services/blob/old/src/AwsS3Bucket.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-
-#include <functional>
-#include <string>
-#include <vector>
-
-namespace comm {
-namespace network {
-
-class AwsS3Bucket {
- const std::string name;
-
-public:
- AwsS3Bucket(const std::string name);
-
- std::vector<std::string> listObjects() const;
- bool isAvailable() const;
- size_t getObjectSize(const std::string &objectName) const;
- void renameObject(const std::string &currentName, const std::string &newName);
- void writeObject(const std::string &objectName, const std::string &data);
- std::string getObjectData(const std::string &objectName) const;
- void getObjectDataChunks(
- const std::string &objectName,
- const std::function<void(const std::string &)> &callback,
- const size_t chunkSize) const;
- void appendToObject(const std::string &objectName, const std::string &data);
- void clearObject(const std::string &objectName);
- void removeObject(const std::string &objectName);
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/AwsS3Bucket.cpp b/services/blob/old/src/AwsS3Bucket.cpp
deleted file mode 100644
--- a/services/blob/old/src/AwsS3Bucket.cpp
+++ /dev/null
@@ -1,224 +0,0 @@
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-#include "GlobalConstants.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "Tools.h"
-
-#include <aws/s3/model/CopyObjectRequest.h>
-#include <aws/s3/model/DeleteObjectRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/HeadBucketRequest.h>
-#include <aws/s3/model/HeadObjectRequest.h>
-#include <aws/s3/model/ListObjectsRequest.h>
-#include <aws/s3/model/Object.h>
-#include <aws/s3/model/PutObjectRequest.h>
-
-#include <boost/interprocess/streams/bufferstream.hpp>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) {
-}
-
-std::vector<std::string> AwsS3Bucket::listObjects() const {
- Aws::S3::Model::ListObjectsRequest request;
- request.SetBucket(this->name);
- std::vector<std::string> result;
-
- Aws::S3::Model::ListObjectsOutcome outcome =
- getS3Client()->ListObjects(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- Aws::Vector<Aws::S3::Model::Object> objects =
- outcome.GetResult().GetContents();
- for (Aws::S3::Model::Object &object : objects) {
- result.push_back(object.GetKey());
- }
- return result;
-}
-
-bool AwsS3Bucket::isAvailable() const {
- Aws::S3::Model::HeadBucketRequest headRequest;
- headRequest.SetBucket(this->name);
- Aws::S3::Model::HeadBucketOutcome outcome =
- getS3Client()->HeadBucket(headRequest);
- return outcome.IsSuccess();
-}
-
-size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const {
- Aws::S3::Model::HeadObjectRequest headRequest;
- headRequest.SetBucket(this->name);
- headRequest.SetKey(objectName);
- Aws::S3::Model::HeadObjectOutcome headOutcome =
- getS3Client()->HeadObject(headRequest);
- if (!headOutcome.IsSuccess()) {
- throw std::runtime_error(headOutcome.GetError().GetMessage());
- }
- return headOutcome.GetResultWithOwnership().GetContentLength();
-}
-
-void AwsS3Bucket::renameObject(
- const std::string &currentName,
- const std::string &newName) {
- Aws::S3::Model::CopyObjectRequest copyRequest;
- copyRequest.SetCopySource(this->name + "/" + currentName);
- copyRequest.SetKey(newName);
- copyRequest.SetBucket(this->name);
-
- Aws::S3::Model::CopyObjectOutcome copyOutcome =
- getS3Client()->CopyObject(copyRequest);
- if (!copyOutcome.IsSuccess()) {
- throw std::runtime_error(copyOutcome.GetError().GetMessage());
- }
-
- this->removeObject(currentName);
-}
-
-void AwsS3Bucket::writeObject(
- const std::string &objectName,
- const std::string &data) {
- // we don't have to handle multiple write here because the GRPC limit is 4MB
- // and minimum size of data to perform multipart upload is 5MB
- Aws::S3::Model::PutObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
-
- std::shared_ptr<Aws::IOStream> body = std::shared_ptr<Aws::IOStream>(
- new boost::interprocess::bufferstream((char *)data.data(), data.size()));
-
- request.SetBody(body);
-
- Aws::S3::Model::PutObjectOutcome outcome = getS3Client()->PutObject(request);
-
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-std::string AwsS3Bucket::getObjectData(const std::string &objectName) const {
- Aws::S3::Model::GetObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
-
- const size_t size = this->getObjectSize(objectName);
- if (size > GRPC_CHUNK_SIZE_LIMIT) {
- throw tools::invalid_argument_error(std::string(
- "The file is too big(" + std::to_string(size) + " bytes, max is " +
- std::to_string(GRPC_CHUNK_SIZE_LIMIT) +
- "bytes), please, use getObjectDataChunks"));
- }
- Aws::S3::Model::GetObjectOutcome outcome = getS3Client()->GetObject(request);
-
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- std::string cpy = result;
-
- return result;
-}
-
-void AwsS3Bucket::getObjectDataChunks(
- const std::string &objectName,
- const std::function<void(const std::string &)> &callback,
- const size_t chunkSize) const {
- const size_t fileSize = this->getObjectSize(objectName);
-
- if (fileSize == 0) {
- return;
- }
-
- Aws::S3::Model::GetObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
- for (size_t offset = 0; offset < fileSize; offset += chunkSize) {
- const size_t nextSize = std::min(chunkSize, fileSize - offset);
-
- std::string range = "bytes=" + std::to_string(offset) + "-" +
- std::to_string(offset + nextSize);
- request.SetRange(range);
-
- Aws::S3::Model::GetObjectOutcome getOutcome =
- getS3Client()->GetObject(request);
- if (!getOutcome.IsSuccess()) {
- throw std::runtime_error(getOutcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile =
- getOutcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- result.resize(nextSize);
- callback(result);
- }
-}
-
-void AwsS3Bucket::appendToObject(
- const std::string &objectName,
- const std::string &data) {
- const size_t objectSize = this->getObjectSize(objectName);
- if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- std::string currentData = this->getObjectData(objectName);
- currentData += data;
- this->writeObject(objectName, currentData);
- return;
- }
- size_t currentSize = 0;
- MultiPartUploader uploader(
- getS3Client(), this->name, objectName + "-multipart");
- std::function<void(const std::string &)> callback =
- [&uploader, &data, &currentSize, objectSize](const std::string &chunk) {
- currentSize += chunk.size();
- if (currentSize < objectSize) {
- uploader.addPart(chunk);
- } else if (currentSize == objectSize) {
- uploader.addPart(std::string(chunk + data));
- } else {
- throw std::runtime_error(
- "size of chunks exceeds the size of the object");
- }
- };
- this->getObjectDataChunks(
- objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE);
- uploader.finishUpload();
- // this will overwrite the target file
- this->renameObject(objectName + "-multipart", objectName);
- const size_t newSize = this->getObjectSize(objectName);
- if (objectSize + data.size() != newSize) {
- throw std::runtime_error(
- "append to object " + objectName +
- " has been performed but the final sizes don't "
- "match, the size is now [" +
- std::to_string(newSize) + "] but should be [" +
- std::to_string(objectSize + data.size()) + "]");
- }
-}
-
-void AwsS3Bucket::clearObject(const std::string &objectName) {
- this->writeObject(objectName, "");
-}
-
-void AwsS3Bucket::removeObject(const std::string &objectName) {
- Aws::S3::Model::DeleteObjectRequest deleteRequest;
- deleteRequest.SetBucket(this->name);
- deleteRequest.SetKey(objectName);
-
- Aws::S3::Model::DeleteObjectOutcome deleteOutcome =
- getS3Client()->DeleteObject(deleteRequest);
- if (!deleteOutcome.IsSuccess()) {
- throw std::runtime_error(deleteOutcome.GetError().GetMessage());
- }
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/BlobServiceImpl.h b/services/blob/old/src/BlobServiceImpl.h
deleted file mode 100644
--- a/services/blob/old/src/BlobServiceImpl.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include "S3Path.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <aws/core/Aws.h>
-
-#include <grpcpp/grpcpp.h>
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-class BlobServiceImpl final : public blob::BlobService::CallbackService {
- void verifyBlobHash(
- const std::string &expectedBlobHash,
- const database::S3Path &s3Path);
- void assignVariableIfEmpty(
- const std::string &label,
- std::string &lvalue,
- const std::string &rvalue);
-
-public:
- BlobServiceImpl();
- virtual ~BlobServiceImpl();
-
- grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
- Put(grpc::CallbackServerContext *context) override;
- grpc::ServerWriteReactor<blob::GetResponse> *
- Get(grpc::CallbackServerContext *context,
- const blob::GetRequest *request) override;
- grpc::ServerUnaryReactor *Remove(
- grpc::CallbackServerContext *context,
- const blob::RemoveRequest *request,
- google::protobuf::Empty *response) override;
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/BlobServiceImpl.cpp b/services/blob/old/src/BlobServiceImpl.cpp
deleted file mode 100644
--- a/services/blob/old/src/BlobServiceImpl.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-#include "BlobServiceImpl.h"
-
-#include "Constants.h"
-#include "DatabaseManager.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "Tools.h"
-
-#include "GetReactor.h"
-#include "PutReactor.h"
-
-#include <glog/logging.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-BlobServiceImpl::BlobServiceImpl() {
- Aws::InitAPI({});
-
- if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) {
- throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available");
- }
-}
-
-BlobServiceImpl::~BlobServiceImpl() {
- Aws::ShutdownAPI({});
-}
-
-void BlobServiceImpl::verifyBlobHash(
- const std::string &expectedBlobHash,
- const database::S3Path &s3Path) {
- const std::string computedBlobHash = tools::computeHashForFile(s3Path);
- if (expectedBlobHash != computedBlobHash) {
- throw std::runtime_error(
- "blob hash mismatch, expected: [" + expectedBlobHash +
- "], computed: [" + computedBlobHash + "]");
- }
-}
-
-void BlobServiceImpl::assignVariableIfEmpty(
- const std::string &label,
- std::string &lvalue,
- const std::string &rvalue) {
- if (!lvalue.empty()) {
- throw std::runtime_error(
- "multiple assignment for variable " + label + " is not allowed");
- }
- lvalue = rvalue;
-}
-
-grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
-BlobServiceImpl::Put(grpc::CallbackServerContext *context) {
- return new reactor::PutReactor();
-}
-
-grpc::ServerWriteReactor<blob::GetResponse> *BlobServiceImpl::Get(
- grpc::CallbackServerContext *context,
- const blob::GetRequest *request) {
-
- reactor::GetReactor *gr = new reactor::GetReactor(request);
- gr->start();
- return gr;
-}
-
-grpc::ServerUnaryReactor *BlobServiceImpl::Remove(
- grpc::CallbackServerContext *context,
- const blob::RemoveRequest *request,
- google::protobuf::Empty *response) {
- grpc::Status status = grpc::Status::OK;
- const std::string holder = request->holder();
- try {
- std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
- database::DatabaseManager::getInstance().findReverseIndexItemByHolder(
- holder);
- if (reverseIndexItem == nullptr) {
- throw std::runtime_error("no item found for holder: " + holder);
- }
- // TODO handle cleanup here properly
- // for now the object's being removed right away
- const std::string blobHash = reverseIndexItem->getBlobHash();
- database::DatabaseManager::getInstance().removeReverseIndexItem(holder);
- if (database::DatabaseManager::getInstance()
- .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash())
- .size() == 0) {
- database::S3Path s3Path = tools::findS3Path(*reverseIndexItem);
- AwsS3Bucket bucket = getBucket(s3Path.getBucketName());
- bucket.removeObject(s3Path.getObjectName());
-
- database::DatabaseManager::getInstance().removeBlobItem(blobHash);
- }
- } catch (std::runtime_error &e) {
- LOG(ERROR) << e.what();
- status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- auto *reactor = context->DefaultReactor();
- reactor->Finish(status);
- return reactor;
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Constants.h b/services/blob/old/src/Constants.h
deleted file mode 100644
--- a/services/blob/old/src/Constants.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#pragma once
-
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-// 5MB limit
-const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024;
-
-const std::string BLOB_BUCKET_NAME = "commapp-blob";
-
-const std::string BLOB_TABLE_NAME =
- tools::decorateTableName("blob-service-blob");
-const std::string REVERSE_INDEX_TABLE_NAME =
- tools::decorateTableName("blob-service-reverse-index");
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/BlobItem.h b/services/blob/old/src/DatabaseEntities/BlobItem.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/BlobItem.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include "Item.h"
-#include "S3Path.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-class BlobItem : public Item {
-
- std::string blobHash;
- S3Path s3Path;
- uint64_t created = 0;
-
- void validate() const override;
-
-public:
- static const std::string TABLE_NAME;
- static const std::string FIELD_BLOB_HASH;
- static const std::string FIELD_S3_PATH;
- static const std::string FIELD_CREATED;
-
- BlobItem() {
- }
- BlobItem(
- const std::string blobHash,
- const S3Path s3Path,
- uint64_t created = 0);
- BlobItem(const AttributeValues &itemFromDB);
-
- void assignItemFromDatabase(const AttributeValues &itemFromDB) override;
-
- std::string getTableName() const override;
- PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override;
- PrimaryKeyValue getPrimaryKeyValue() const override;
-
- std::string getBlobHash() const;
- S3Path getS3Path() const;
- uint64_t getCreated() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/BlobItem.cpp b/services/blob/old/src/DatabaseEntities/BlobItem.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/BlobItem.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-#include "BlobItem.h"
-
-#include "Constants.h"
-
-namespace comm {
-namespace network {
-namespace database {
-
-const std::string BlobItem::FIELD_BLOB_HASH = "blobHash";
-const std::string BlobItem::FIELD_S3_PATH = "s3Path";
-const std::string BlobItem::FIELD_CREATED = "created";
-
-const std::string BlobItem::TABLE_NAME = BLOB_TABLE_NAME;
-
-BlobItem::BlobItem(
- const std::string blobHash,
- const S3Path s3Path,
- uint64_t created)
- : blobHash(blobHash), s3Path(s3Path), created(created) {
- this->validate();
-}
-
-BlobItem::BlobItem(const AttributeValues &itemFromDB) {
- this->assignItemFromDatabase(itemFromDB);
-}
-
-void BlobItem::validate() const {
- if (!this->blobHash.size()) {
- throw std::runtime_error("blobHash empty");
- }
- this->s3Path.validate();
-}
-
-void BlobItem::assignItemFromDatabase(const AttributeValues &itemFromDB) {
- try {
- this->blobHash = itemFromDB.at(BlobItem::FIELD_BLOB_HASH).GetS();
- this->s3Path = S3Path(itemFromDB.at(BlobItem::FIELD_S3_PATH).GetS());
- this->created = std::stoll(
- std::string(itemFromDB.at(BlobItem::FIELD_CREATED).GetS()).c_str());
- } catch (std::logic_error &e) {
- throw std::runtime_error(
- "invalid blob item provided, " + std::string(e.what()));
- }
- this->validate();
-}
-
-std::string BlobItem::getTableName() const {
- return BlobItem::TABLE_NAME;
-}
-
-PrimaryKeyDescriptor BlobItem::getPrimaryKeyDescriptor() const {
- return PrimaryKeyDescriptor(BlobItem::FIELD_BLOB_HASH);
-}
-
-PrimaryKeyValue BlobItem::getPrimaryKeyValue() const {
- return PrimaryKeyValue(this->blobHash);
-}
-
-std::string BlobItem::getBlobHash() const {
- return this->blobHash;
-}
-
-S3Path BlobItem::getS3Path() const {
- return this->s3Path;
-}
-
-uint64_t BlobItem::getCreated() const {
- return this->created;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h b/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#pragma once
-
-#include "Item.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-/**
- * Needs blobHash(pk)-index that projects:
- * blobHash
- * holder
- */
-class ReverseIndexItem : public Item {
-
- std::string holder;
- std::string blobHash;
-
- void validate() const override;
-
-public:
- static const std::string TABLE_NAME;
- static const std::string FIELD_HOLDER;
- static const std::string FIELD_BLOB_HASH;
-
- ReverseIndexItem() {
- }
- ReverseIndexItem(const std::string holder, const std::string blobHash);
- ReverseIndexItem(const AttributeValues &itemFromDB);
-
- void assignItemFromDatabase(const AttributeValues &itemFromDB) override;
-
- std::string getTableName() const override;
- PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override;
- PrimaryKeyValue getPrimaryKeyValue() const override;
-
- std::string getHolder() const;
- std::string getBlobHash() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp b/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-#include "ReverseIndexItem.h"
-
-#include "Constants.h"
-
-namespace comm {
-namespace network {
-namespace database {
-
-const std::string ReverseIndexItem::FIELD_HOLDER = "holder";
-const std::string ReverseIndexItem::FIELD_BLOB_HASH = "blobHash";
-
-const std::string ReverseIndexItem::TABLE_NAME = REVERSE_INDEX_TABLE_NAME;
-
-ReverseIndexItem::ReverseIndexItem(
- const std::string holder,
- const std::string blobHash)
- : holder(holder), blobHash(blobHash) {
- this->validate();
-}
-ReverseIndexItem::ReverseIndexItem(const AttributeValues &itemFromDB) {
- this->assignItemFromDatabase(itemFromDB);
-}
-
-void ReverseIndexItem::validate() const {
- if (!this->holder.size()) {
- throw std::runtime_error("reverse index empty");
- }
- if (!this->blobHash.size()) {
- throw std::runtime_error("blobHash empty");
- }
-}
-
-void ReverseIndexItem::assignItemFromDatabase(
- const AttributeValues &itemFromDB) {
- this->holder = itemFromDB.at(ReverseIndexItem::FIELD_HOLDER).GetS();
- this->blobHash = itemFromDB.at(ReverseIndexItem::FIELD_BLOB_HASH).GetS();
- this->validate();
-}
-
-std::string ReverseIndexItem::getTableName() const {
- return ReverseIndexItem::TABLE_NAME;
-}
-
-PrimaryKeyDescriptor ReverseIndexItem::getPrimaryKeyDescriptor() const {
- return PrimaryKeyDescriptor(ReverseIndexItem::FIELD_HOLDER);
-}
-
-PrimaryKeyValue ReverseIndexItem::getPrimaryKeyValue() const {
- return PrimaryKeyValue(this->holder);
-}
-
-std::string ReverseIndexItem::getHolder() const {
- return this->holder;
-}
-
-std::string ReverseIndexItem::getBlobHash() const {
- return this->blobHash;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseManager.h b/services/blob/old/src/DatabaseManager.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseManager.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#pragma once
-
-#include "BlobItem.h"
-#include "DatabaseEntitiesTools.h"
-#include "DatabaseManagerBase.h"
-#include "DynamoDBTools.h"
-#include "ReverseIndexItem.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/model/AttributeDefinition.h>
-#include <aws/dynamodb/model/DeleteItemRequest.h>
-#include <aws/dynamodb/model/GetItemRequest.h>
-#include <aws/dynamodb/model/PutItemRequest.h>
-
-#include <memory>
-#include <stdexcept>
-#include <string>
-#include <vector>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// this class should be thread-safe in case any shared resources appear
-class DatabaseManager : public DatabaseManagerBase {
-public:
- static DatabaseManager &getInstance();
-
- void putBlobItem(const BlobItem &item);
- std::shared_ptr<BlobItem> findBlobItem(const std::string &blobHash);
- void removeBlobItem(const std::string &blobHash);
-
- void putReverseIndexItem(const ReverseIndexItem &item);
- std::shared_ptr<ReverseIndexItem>
- findReverseIndexItemByHolder(const std::string &holder);
- std::vector<std::shared_ptr<database::ReverseIndexItem>>
- findReverseIndexItemsByHash(const std::string &blobHash);
- void removeReverseIndexItem(const std::string &holder);
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseManager.cpp b/services/blob/old/src/DatabaseManager.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseManager.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-#include "DatabaseManager.h"
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <aws/core/utils/Outcome.h>
-#include <aws/dynamodb/model/QueryRequest.h>
-#include <aws/dynamodb/model/ScanRequest.h>
-
-#include <vector>
-
-namespace comm {
-namespace network {
-namespace database {
-
-DatabaseManager &DatabaseManager::getInstance() {
- static DatabaseManager instance;
- return instance;
-}
-
-void DatabaseManager::putBlobItem(const BlobItem &item) {
- Aws::DynamoDB::Model::PutItemRequest request;
- request.SetTableName(BlobItem::TABLE_NAME);
- request.AddItem(
- BlobItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(item.getBlobHash()));
- request.AddItem(
- BlobItem::FIELD_S3_PATH,
- Aws::DynamoDB::Model::AttributeValue(item.getS3Path().getFullPath()));
- request.AddItem(
- BlobItem::FIELD_CREATED,
- Aws::DynamoDB::Model::AttributeValue(
- std::to_string(tools::getCurrentTimestamp())));
-
- this->innerPutItem(std::make_shared<BlobItem>(item), request);
-}
-
-std::shared_ptr<BlobItem>
-DatabaseManager::findBlobItem(const std::string &blobHash) {
- Aws::DynamoDB::Model::GetItemRequest request;
- request.AddKey(
- BlobItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(blobHash));
- return this->innerFindItem<BlobItem>(request);
-}
-
-void DatabaseManager::removeBlobItem(const std::string &blobHash) {
- std::shared_ptr<BlobItem> item = this->findBlobItem(blobHash);
- if (item == nullptr) {
- return;
- }
- this->innerRemoveItem(*item);
-}
-
-void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) {
- if (this->findReverseIndexItemByHolder(item.getHolder()) != nullptr) {
- throw std::runtime_error(
- "An item for the given holder [" + item.getHolder() +
- "] already exists");
- }
- Aws::DynamoDB::Model::PutItemRequest request;
- request.SetTableName(ReverseIndexItem::TABLE_NAME);
- request.AddItem(
- ReverseIndexItem::FIELD_HOLDER,
- Aws::DynamoDB::Model::AttributeValue(item.getHolder()));
- request.AddItem(
- ReverseIndexItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(item.getBlobHash()));
-
- this->innerPutItem(std::make_shared<ReverseIndexItem>(item), request);
-}
-
-std::shared_ptr<ReverseIndexItem>
-DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) {
- Aws::DynamoDB::Model::GetItemRequest request;
- request.AddKey(
- ReverseIndexItem::FIELD_HOLDER,
- Aws::DynamoDB::Model::AttributeValue(holder));
-
- return this->innerFindItem<ReverseIndexItem>(request);
-}
-
-std::vector<std::shared_ptr<database::ReverseIndexItem>>
-DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) {
- std::vector<std::shared_ptr<database::ReverseIndexItem>> result;
-
- Aws::DynamoDB::Model::QueryRequest req;
- req.SetTableName(ReverseIndexItem::TABLE_NAME);
- req.SetKeyConditionExpression("blobHash = :valueToMatch");
-
- AttributeValues attributeValues;
- attributeValues.emplace(":valueToMatch", blobHash);
-
- req.SetExpressionAttributeValues(attributeValues);
- req.SetIndexName("blobHash-index");
-
- const Aws::DynamoDB::Model::QueryOutcome &outcome =
- getDynamoDBClient()->Query(req);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- const Aws::Vector<AttributeValues> &items = outcome.GetResult().GetItems();
- for (auto &item : items) {
- result.push_back(std::make_shared<database::ReverseIndexItem>(item));
- }
-
- return result;
-}
-
-void DatabaseManager::removeReverseIndexItem(const std::string &holder) {
- std::shared_ptr<database::ReverseIndexItem> item =
- findReverseIndexItemByHolder(holder);
- if (item == nullptr) {
- return;
- }
- this->innerRemoveItem(*item);
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/MultiPartUploader.h b/services/blob/old/src/MultiPartUploader.h
deleted file mode 100644
--- a/services/blob/old/src/MultiPartUploader.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/CompleteMultipartUploadRequest.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-class MultiPartUploader {
- std::shared_ptr<Aws::S3::S3Client> client;
- const std::string bucketName;
- const std::string objectName;
- std::vector<size_t> partsSizes;
-
- Aws::S3::Model::CompleteMultipartUploadRequest completeMultipartUploadRequest;
- Aws::S3::Model::CompletedMultipartUpload completedMultipartUpload;
- std::string uploadId;
-
- size_t partNumber = 1;
-
-public:
- MultiPartUploader(
- std::shared_ptr<Aws::S3::S3Client> client,
- const std::string bucketName,
- const std::string objectName);
- void addPart(const std::string &part);
- void finishUpload();
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/MultiPartUploader.cpp b/services/blob/old/src/MultiPartUploader.cpp
deleted file mode 100644
--- a/services/blob/old/src/MultiPartUploader.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-#include "MultiPartUploader.h"
-#include "Tools.h"
-
-#include <aws/core/utils/HashingUtils.h>
-#include <aws/s3/model/CreateMultipartUploadRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/Object.h>
-#include <aws/s3/model/UploadPartRequest.h>
-
-#include <boost/interprocess/streams/bufferstream.hpp>
-
-namespace comm {
-namespace network {
-
-MultiPartUploader::MultiPartUploader(
- std::shared_ptr<Aws::S3::S3Client> client,
- const std::string bucketName,
- const std::string objectName)
- : client(client), bucketName(bucketName), objectName(objectName) {
- this->completeMultipartUploadRequest.SetBucket(this->bucketName);
- this->completeMultipartUploadRequest.SetKey(this->objectName);
-
- Aws::S3::Model::CreateMultipartUploadRequest createRequest;
- createRequest.SetBucket(this->bucketName);
- createRequest.SetKey(this->objectName);
- createRequest.SetContentType("application/octet-stream");
-
- Aws::S3::Model::CreateMultipartUploadOutcome createOutcome =
- this->client->CreateMultipartUpload(createRequest);
-
- if (!createOutcome.IsSuccess()) {
- throw std::runtime_error(createOutcome.GetError().GetMessage());
- }
- this->uploadId = createOutcome.GetResult().GetUploadId();
- this->completeMultipartUploadRequest.SetUploadId(this->uploadId);
-}
-
-void MultiPartUploader::addPart(const std::string &part) {
- Aws::S3::Model::UploadPartRequest uploadRequest;
- uploadRequest.SetBucket(this->bucketName);
- uploadRequest.SetKey(this->objectName);
- uploadRequest.SetPartNumber(this->partNumber);
- uploadRequest.SetUploadId(this->uploadId);
-
- std::shared_ptr<Aws::IOStream> body = std::shared_ptr<Aws::IOStream>(
- new boost::interprocess::bufferstream((char *)part.data(), part.size()));
-
- uploadRequest.SetBody(body);
-
- Aws::Utils::ByteBuffer partMd5(Aws::Utils::HashingUtils::CalculateMD5(*body));
- uploadRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(partMd5));
-
- uploadRequest.SetContentLength(part.size());
-
- Aws::S3::Model::UploadPartOutcome uploadPartOutcome =
- this->client->UploadPart(uploadRequest);
- Aws::S3::Model::CompletedPart completedPart;
- completedPart.SetPartNumber(this->partNumber);
- std::string eTag = uploadPartOutcome.GetResult().GetETag();
- if (eTag.empty()) {
- throw std::runtime_error("etag empty");
- }
- completedPart.SetETag(eTag);
- this->completedMultipartUpload.AddParts(completedPart);
- ++this->partNumber;
-}
-
-void MultiPartUploader::finishUpload() {
- if (!this->completedMultipartUpload.PartsHasBeenSet()) {
- return;
- }
- this->completeMultipartUploadRequest.SetMultipartUpload(
- this->completedMultipartUpload);
-
- Aws::S3::Model::CompleteMultipartUploadOutcome completeUploadOutcome =
- this->client->CompleteMultipartUpload(
- this->completeMultipartUploadRequest);
-
- if (!completeUploadOutcome.IsSuccess()) {
- throw std::runtime_error(completeUploadOutcome.GetError().GetMessage());
- }
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Reactors/server/GetReactor.h b/services/blob/old/src/Reactors/server/GetReactor.h
deleted file mode 100644
--- a/services/blob/old/src/Reactors/server/GetReactor.h
+++ /dev/null
@@ -1,87 +0,0 @@
-#pragma once
-
-#include "GlobalConstants.h"
-#include "S3Tools.h"
-#include <ServerWriteReactorBase.h>
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <aws/s3/model/GetObjectRequest.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class GetReactor
- : public ServerWriteReactorBase<blob::GetRequest, blob::GetResponse> {
- size_t offset = 0;
- size_t fileSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- database::S3Path s3Path;
- Aws::S3::Model::GetObjectRequest getRequest;
-
-public:
- using ServerWriteReactorBase<blob::GetRequest, blob::GetResponse>::
- ServerWriteReactorBase;
-
- std::unique_ptr<grpc::Status>
- writeResponse(blob::GetResponse *response) override {
- if (this->offset >= this->fileSize) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
-
- const size_t nextSize =
- std::min(this->chunkSize, this->fileSize - this->offset);
-
- std::string range = "bytes=" + std::to_string(this->offset) + "-" +
- std::to_string(this->offset + nextSize - 1);
- this->getRequest.SetRange(range);
-
- Aws::S3::Model::GetObjectOutcome getOutcome =
- getS3Client()->GetObject(this->getRequest);
- if (!getOutcome.IsSuccess()) {
- return std::make_unique<grpc::Status>(
- grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile =
- getOutcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- response->set_datachunk(result);
-
- this->offset += nextSize;
- return nullptr;
- }
-
- void initialize() override {
- this->s3Path = tools::findS3Path(this->request.holder());
-
- AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName());
- if (!bucket.isAvailable()) {
- throw std::runtime_error(
- "bucket [" + this->s3Path.getBucketName() + "] not available");
- }
-
- this->fileSize = bucket.getObjectSize(this->s3Path.getObjectName());
- if (this->fileSize == 0) {
- throw std::runtime_error("object empty");
- }
-
- this->getRequest.SetBucket(this->s3Path.getBucketName());
- this->getRequest.SetKey(this->s3Path.getObjectName());
- };
-
- void doneCallback() override{};
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Reactors/server/PutReactor.h b/services/blob/old/src/Reactors/server/PutReactor.h
deleted file mode 100644
--- a/services/blob/old/src/Reactors/server/PutReactor.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#pragma once
-
-#include "ServerBidiReactorBase.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class PutReactor
- : public ServerBidiReactorBase<blob::PutRequest, blob::PutResponse> {
- std::string holder;
- std::string blobHash;
- std::string currentChunk;
- std::unique_ptr<database::S3Path> s3Path;
- std::shared_ptr<database::BlobItem> blobItem;
- std::unique_ptr<MultiPartUploader> uploader;
- bool dataExists = false;
-
-public:
- std::unique_ptr<ServerBidiReactorStatus> handleRequest(
- blob::PutRequest request,
- blob::PutResponse *response) override {
- if (this->holder.empty()) {
- if (request.holder().empty()) {
- throw std::runtime_error("holder has not been provided");
- }
- this->holder = request.holder();
- return nullptr;
- }
- if (this->blobHash.empty()) {
- if (request.blobhash().empty()) {
- throw std::runtime_error("blob hash has not been provided");
- }
- this->blobHash = request.blobhash();
- this->blobItem =
- database::DatabaseManager::getInstance().findBlobItem(this->blobHash);
- if (this->blobItem != nullptr) {
- this->s3Path =
- std::make_unique<database::S3Path>(this->blobItem->getS3Path());
- response->set_dataexists(true);
- this->dataExists = true;
- return std::make_unique<ServerBidiReactorStatus>(
- grpc::Status::OK, true);
- }
- this->s3Path = std::make_unique<database::S3Path>(
- tools::generateS3Path(BLOB_BUCKET_NAME, this->blobHash));
- this->blobItem =
- std::make_shared<database::BlobItem>(this->blobHash, *s3Path);
- response->set_dataexists(false);
- return nullptr;
- }
- if (request.datachunk().empty()) {
- return std::make_unique<ServerBidiReactorStatus>(grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected"));
- }
- if (this->uploader == nullptr) {
- this->uploader = std::make_unique<MultiPartUploader>(
- getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName());
- }
- this->currentChunk += request.datachunk();
- if (this->currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- this->uploader->addPart(this->currentChunk);
- this->currentChunk.clear();
- }
- return nullptr;
- }
-
- void terminateCallback() override {
- if (this->holder.empty()) {
- throw std::runtime_error("holder has not been provided");
- }
- if (this->blobHash.empty()) {
- throw std::runtime_error("blob hash has not been provided");
- }
- if (!this->status.status.ok()) {
- return;
- }
- const database::ReverseIndexItem reverseIndexItem(
- this->holder, this->blobHash);
- if (this->uploader == nullptr) {
- if (!this->dataExists) {
- throw std::runtime_error("uploader not initialized as expected");
- }
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- return;
- }
- if (!this->readingAborted) {
- return;
- }
- if (!currentChunk.empty()) {
- this->uploader->addPart(this->currentChunk);
- }
- this->uploader->finishUpload();
- database::DatabaseManager::getInstance().putBlobItem(*this->blobItem);
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- }
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Path.h b/services/blob/old/src/S3Path.h
deleted file mode 100644
--- a/services/blob/old/src/S3Path.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include <stdexcept>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// S3 path format: [bucket name]/[object name]
-class S3Path {
- std::string bucketName;
- std::string objectName;
-
-public:
- S3Path();
- S3Path(const S3Path &other);
- S3Path &operator=(const S3Path &other);
- S3Path(const std::string bucketName, const std::string objectName);
- S3Path(const std::string fullPath);
-
- std::string getBucketName() const;
- std::string getObjectName() const;
- std::string getFullPath() const;
- void validate() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Path.cpp b/services/blob/old/src/S3Path.cpp
deleted file mode 100644
--- a/services/blob/old/src/S3Path.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-#include "S3Path.h"
-
-#include <algorithm>
-#include <memory>
-#include <stdexcept>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-S3Path::S3Path() {
-}
-
-S3Path::S3Path(const S3Path &other)
- : bucketName(other.bucketName), objectName(other.objectName) {
- this->validate();
-}
-
-S3Path &S3Path::operator=(const S3Path &other) {
- this->bucketName = other.getBucketName();
- this->objectName = other.getObjectName();
- this->validate();
- return *this;
-}
-
-S3Path::S3Path(const std::string bucketName, const std::string objectName)
- : bucketName(bucketName), objectName(objectName) {
- this->validate();
-}
-
-S3Path::S3Path(const std::string fullPath) {
- if (std::count(fullPath.begin(), fullPath.end(), '/') != 1) {
- throw std::runtime_error(
- "incorrect number of delimiters in S3 path " + fullPath);
- }
- size_t delimiterPos = fullPath.find('/');
- this->bucketName = fullPath.substr(0, delimiterPos);
- this->objectName = fullPath.substr(delimiterPos + 1);
- this->validate();
-}
-
-std::string S3Path::getBucketName() const {
- return this->bucketName;
-}
-
-std::string S3Path::getObjectName() const {
- return this->objectName;
-}
-
-std::string S3Path::getFullPath() const {
- return this->getBucketName() + "/" + this->getObjectName();
-}
-
-void S3Path::validate() const {
- if (!this->bucketName.size()) {
- throw std::runtime_error("referencing S3 path with an empty object name");
- }
- if (!this->bucketName.size()) {
- throw std::runtime_error("referencing S3 path with an empty bucket name");
- }
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Tools.h b/services/blob/old/src/S3Tools.h
deleted file mode 100644
--- a/services/blob/old/src/S3Tools.h
+++ /dev/null
@@ -1,23 +0,0 @@
-#pragma once
-
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-#include <aws/s3/S3Client.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket getBucket(const std::string &bucketName);
-
-std::vector<std::string> listBuckets();
-
-std::unique_ptr<Aws::S3::S3Client> getS3Client();
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Tools.cpp b/services/blob/old/src/S3Tools.cpp
deleted file mode 100644
--- a/services/blob/old/src/S3Tools.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-#include "S3Tools.h"
-#include "Constants.h"
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <aws/s3/model/Bucket.h>
-
-#include <cstdlib>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket getBucket(const std::string &bucketName) {
- return AwsS3Bucket(bucketName);
-}
-
-std::vector<std::string> listBuckets() {
- Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets();
- std::vector<std::string> result;
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- Aws::Vector<Aws::S3::Model::Bucket> buckets =
- outcome.GetResult().GetBuckets();
- for (Aws::S3::Model::Bucket &bucket : buckets) {
- result.push_back(bucket.GetName());
- }
- return result;
-}
-
-std::unique_ptr<Aws::S3::S3Client> getS3Client() {
- Aws::Client::ClientConfiguration config;
- config.region = AWS_REGION;
- if (tools::isSandbox()) {
- config.endpointOverride = Aws::String("localstack:4566");
- config.scheme = Aws::Http::Scheme::HTTP;
- return std::make_unique<Aws::S3::S3Client>(
- config,
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- false);
- }
- return std::make_unique<Aws::S3::S3Client>(config);
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Tools.h b/services/blob/old/src/Tools.h
deleted file mode 100644
--- a/services/blob/old/src/Tools.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#pragma once
-
-#include "DatabaseEntitiesTools.h"
-#include "ReverseIndexItem.h"
-#include "S3Path.h"
-
-namespace comm {
-namespace network {
-namespace tools {
-
-database::S3Path
-generateS3Path(const std::string &bucketName, const std::string &blobHash);
-
-std::string computeHashForFile(const database::S3Path &s3Path);
-
-database::S3Path findS3Path(const std::string &holder);
-
-database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem);
-
-class invalid_argument_error : public std::runtime_error {
-public:
- invalid_argument_error(std::string errorMessage)
- : std::runtime_error(errorMessage) {
- }
-};
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Tools.cpp b/services/blob/old/src/Tools.cpp
deleted file mode 100644
--- a/services/blob/old/src/Tools.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-#include "Tools.h"
-
-#include "Constants.h"
-#include "DatabaseEntitiesTools.h"
-#include "DatabaseManager.h"
-#include "GlobalConstants.h"
-#include "S3Tools.h"
-
-#include <openssl/sha.h>
-
-#include <chrono>
-#include <cstdlib>
-#include <iomanip>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-database::S3Path
-generateS3Path(const std::string &bucketName, const std::string &blobHash) {
- return database::S3Path(bucketName, blobHash);
-}
-
-std::string computeHashForFile(const database::S3Path &s3Path) {
- SHA512_CTX ctx;
- SHA512_Init(&ctx);
- const std::function<void(const std::string &)> callback =
- [&ctx](const std::string &chunk) {
- SHA512_Update(&ctx, chunk.data(), chunk.size());
- };
-
- getBucket(s3Path.getBucketName())
- .getObjectDataChunks(
- s3Path.getObjectName(), callback, GRPC_CHUNK_SIZE_LIMIT);
-
- unsigned char hash[SHA512_DIGEST_LENGTH];
- SHA512_Final(hash, &ctx);
-
- std::ostringstream hashStream;
- for (int i = 0; i < SHA512_DIGEST_LENGTH; i++) {
- hashStream << std::hex << std::setfill('0') << std::setw(2)
- << std::nouppercase << (int)hash[i];
- }
-
- return hashStream.str();
-}
-
-database::S3Path findS3Path(const std::string &holder) {
- std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
- database::DatabaseManager::getInstance().findReverseIndexItemByHolder(
- holder);
- if (reverseIndexItem == nullptr) {
- throw std::runtime_error(
- "provided holder: [" + holder + "] has not been found in the database");
- }
- return findS3Path(*reverseIndexItem);
-}
-
-database::S3Path
-findS3Path(const database::ReverseIndexItem &reverseIndexItem) {
- std::shared_ptr<database::BlobItem> blobItem =
- database::DatabaseManager::getInstance().findBlobItem(
- reverseIndexItem.getBlobHash());
- if (blobItem == nullptr) {
- throw std::runtime_error(
- "no blob found for blobHash: [" + reverseIndexItem.getBlobHash() + "]");
- }
- database::S3Path result = blobItem->getS3Path();
- return result;
-}
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/server.cpp b/services/blob/old/src/server.cpp
deleted file mode 100644
--- a/services/blob/old/src/server.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "BlobServiceImpl.h"
-
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-void RunServer() {
- BlobServiceImpl blobService;
-
- grpc::EnableDefaultHealthCheckService(true);
- grpc::ServerBuilder builder;
- builder.AddListeningPort(
- SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials());
- builder.RegisterService(&blobService);
- std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
- LOG(INFO) << "server listening at :" << SERVER_LISTEN_ADDRESS;
-
- server->Wait();
-}
-
-} // namespace network
-} // namespace comm
-
-int main(int argc, char **argv) {
- comm::network::tools::InitLogging("blob");
- comm::network::RunServer();
-
- return 0;
-}
diff --git a/services/blob/old/test/DatabaseManagerTest.cpp b/services/blob/old/test/DatabaseManagerTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/DatabaseManagerTest.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "DatabaseManager.h"
-#include "S3Path.h"
-
-#include <algorithm>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <vector>
-
-using namespace comm::network::database;
-
-class DatabaseManagerTest : public testing::Test {
-protected:
- virtual void SetUp() {
- Aws::InitAPI({});
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-std::string generateName() {
- std::chrono::milliseconds ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch());
- return std::to_string(ms.count());
-}
-
-TEST_F(DatabaseManagerTest, TestOperationsOnBlobItems) {
- const BlobItem item(generateName(), S3Path(generateName(), generateName()));
-
- DatabaseManager::getInstance().putBlobItem(item);
- std::shared_ptr<BlobItem> foundItem =
- DatabaseManager::getInstance().findBlobItem(item.getBlobHash());
- EXPECT_NE(foundItem->getCreated(), 0);
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- const BlobItem item2(generateName(), S3Path(generateName(), generateName()));
- DatabaseManager::getInstance().putBlobItem(item2);
- DatabaseManager::getInstance().removeBlobItem(item.getBlobHash());
- DatabaseManager::getInstance().removeBlobItem(item2.getBlobHash());
- EXPECT_EQ(
- DatabaseManager::getInstance().findBlobItem(item.getBlobHash()), nullptr);
- EXPECT_EQ(
- DatabaseManager::getInstance().findBlobItem(item2.getBlobHash()),
- nullptr);
-}
-
-TEST_F(DatabaseManagerTest, TestOperationsOnReverseIndexItems) {
- const ReverseIndexItem item(generateName(), generateName());
-
- DatabaseManager::getInstance().putReverseIndexItem(item);
- std::vector<std::shared_ptr<ReverseIndexItem>> foundItems =
- DatabaseManager::getInstance().findReverseIndexItemsByHash(
- item.getBlobHash());
- EXPECT_EQ(foundItems.size(), 1);
- std::shared_ptr<ReverseIndexItem> foundItem = foundItems.at(0);
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- foundItem = std::dynamic_pointer_cast<ReverseIndexItem>(
- DatabaseManager::getInstance().findReverseIndexItemByHolder(
- item.getHolder()));
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- DatabaseManager::getInstance().removeReverseIndexItem(foundItem->getHolder());
-}
diff --git a/services/blob/old/test/MultiPartUploadTest.cpp b/services/blob/old/test/MultiPartUploadTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/MultiPartUploadTest.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "TestTools.h"
-#include "Tools.h"
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-
-#include <string>
-
-using namespace comm::network;
-
-class MultiPartUploadTest : public testing::Test {
-protected:
- std::shared_ptr<Aws::S3::S3Client> s3Client;
- std::unique_ptr<AwsS3Bucket> bucket;
-
- virtual void SetUp() {
- Aws::InitAPI({});
- s3Client = std::move(getS3Client());
- bucket = std::make_unique<AwsS3Bucket>(BLOB_BUCKET_NAME);
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-std::string generateNByes(const size_t n) {
- std::string result;
- result.resize(n);
- memset((char *)result.data(), 'A', n);
- return result;
-}
-
-TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart("xxx");
- mpu.addPart("xxx");
- EXPECT_THROW(mpu.finishUpload(), std::runtime_error);
-}
-
-TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1));
- mpu.addPart("xxx");
- EXPECT_THROW(mpu.finishUpload(), std::runtime_error);
-}
-
-TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE));
- mpu.addPart("xxx");
- mpu.finishUpload();
- EXPECT_THROW(
- bucket->getObjectData(objectName), tools::invalid_argument_error);
- EXPECT_EQ(
- bucket->getObjectSize(objectName),
- AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3);
- bucket->removeObject(objectName);
-}
-
-TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart("xxx");
- mpu.finishUpload();
- EXPECT_EQ(bucket->getObjectSize(objectName), 3);
- bucket->removeObject(objectName);
-}
diff --git a/services/blob/old/test/StorageManagerTest.cpp b/services/blob/old/test/StorageManagerTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/StorageManagerTest.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "Constants.h"
-#include "S3Tools.h"
-#include "TestTools.h"
-
-#include <aws/core/Aws.h>
-
-#include <chrono>
-#include <memory>
-#include <string>
-
-using namespace comm::network;
-
-class StorageManagerTest : public testing::Test {
-public:
-protected:
- const std::string data =
- "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8"
- "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6";
-
- virtual void SetUp() {
- Aws::InitAPI({});
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-TEST_F(StorageManagerTest, ObjectOperationsTest) {
- EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).isAvailable());
- std::string objectName = createObject(getBucket(BLOB_BUCKET_NAME));
-
- getBucket(BLOB_BUCKET_NAME).writeObject(objectName, data);
-
- EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), data.size());
- EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).getObjectData(objectName) == data);
- std::string chunkedData;
- const size_t chunkSize = data.size() / 10;
- std::function<void(const std::string &)> callback =
- [&chunkedData](const std::string &chunk) { chunkedData += chunk; };
- getBucket(BLOB_BUCKET_NAME)
- .getObjectDataChunks(objectName, callback, chunkSize);
- EXPECT_TRUE(data == chunkedData);
-
- getBucket(BLOB_BUCKET_NAME).renameObject(objectName, objectName + "c");
- EXPECT_THROW(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName),
- std::runtime_error);
- EXPECT_TRUE(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName + "c") == data);
- getBucket(BLOB_BUCKET_NAME).renameObject(objectName + "c", objectName);
-
- getBucket(BLOB_BUCKET_NAME).clearObject(objectName);
- EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), 0);
-
- getBucket(BLOB_BUCKET_NAME).removeObject(objectName);
- EXPECT_THROW(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName),
- std::runtime_error);
-}
diff --git a/services/blob/old/test/TestTools.h b/services/blob/old/test/TestTools.h
deleted file mode 100644
--- a/services/blob/old/test/TestTools.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#pragma once
-
-#include "AwsS3Bucket.h"
-
-#include <chrono>
-#include <string>
-
-namespace comm {
-namespace network {
-
-std::string generateObjectName();
-std::string createObject(AwsS3Bucket bucket);
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/test/TestTools.cpp b/services/blob/old/test/TestTools.cpp
deleted file mode 100644
--- a/services/blob/old/test/TestTools.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-#include "TestTools.h"
-#include "AwsS3Bucket.h"
-
-namespace comm {
-namespace network {
-
-std::string generateObjectName() {
- std::chrono::milliseconds ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch());
- return std::to_string(ms.count());
-}
-
-std::string createObject(AwsS3Bucket bucket) {
- std::string objectName;
- std::vector<std::string> presentObjects;
- do {
- objectName = generateObjectName();
- presentObjects = bucket.listObjects();
- } while (
- std::find(presentObjects.begin(), presentObjects.end(), objectName) !=
- presentObjects.end());
- return objectName;
-}
-
-} // namespace network
-} // namespace comm

File Metadata

Mime Type
text/plain
Expires
Sat, Nov 16, 10:10 PM (19 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2502498
Default Alt Text
D5768.id19013.diff (56 KB)

Event Timeline