Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F3259538
D5768.id19013.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
56 KB
Referenced Files
None
Subscribers
None
D5768.id19013.diff
View Options
diff --git a/services/blob/old/CMakeLists.txt b/services/blob/old/CMakeLists.txt
deleted file mode 100644
--- a/services/blob/old/CMakeLists.txt
+++ /dev/null
@@ -1,139 +0,0 @@
-PROJECT(blob C CXX)
-
-cmake_minimum_required(VERSION 3.16)
-
-set(CMAKE_CXX_STANDARD 14)
-set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin)
-
-if(COMMAND cmake_policy)
- cmake_policy(SET CMP0003 NEW)
-endif(COMMAND cmake_policy)
-
-set(CMAKE_CXX_STANDARD 17)
-
-# FIND LIBS
-find_package(glog REQUIRED)
-find_package(Protobuf REQUIRED)
-find_package(gRPC REQUIRED)
-find_package(Folly REQUIRED)
-find_package(AWSSDK REQUIRED COMPONENTS s3 core dynamodb)
-find_package(Boost 1.40
- COMPONENTS program_options filesystem context regex system thread
- REQUIRED
-)
-find_package(OpenSSL REQUIRED)
-find_package(double-conversion REQUIRED)
-
-if(${CMAKE_CURRENT_SOURCE_DIR} MATCHES "^\/transferred.*")
- # Inside the docker build contex
- set(_proto_path "grpc")
- set(_lib_path "../lib/src")
-else()
- # Inside repo
- set(_proto_path "../../../shared/protos")
- set(_lib_path "../../lib/src")
-endif()
-
-# Shared Comm protos
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_proto_path}
- ${CMAKE_CURRENT_BINARY_DIR}/protos
- EXCLUDE_FROM_ALL
-)
-
-# Reference native/cpp projects
-add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/${_lib_path}
- ${CMAKE_CURRENT_BINARY_DIR}/common # CMake's build directory
- EXCLUDE_FROM_ALL # Don't build everything, just what we need
-)
-
-file(GLOB_RECURSE SOURCE_CODE "./src/*.cpp")
-
-# SERVER
-add_executable(
- blob
-
- ${SOURCE_CODE}
-)
-
-set(
- INCLUDE_DIRS
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${CMAKE_CURRENT_SOURCE_DIR}/src/server-base-reactors
- ${CMAKE_CURRENT_SOURCE_DIR}/src/DatabaseEntities
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server
- ${CMAKE_CURRENT_SOURCE_DIR}/src/Reactors/server/base-reactors
-
- ${Boost_INCLUDE_DIR}
-)
-
-target_include_directories(
- blob
- PUBLIC
- ${INCLUDE_DIRS}
-)
-
-set(
- LIBS
-
- ${GRPC_LIBS}
- ${AWSSDK_LINK_LIBRARIES}
- ${Boost_LIBRARIES}
- OpenSSL::SSL
- glog::glog
- gRPC::grpc++
- double-conversion::double-conversion
- Folly::folly
-
- comm-blob-grpc
- comm-server-base-reactors
- comm-services-common
-)
-
-target_link_libraries(
- blob
-
- ${LIBS}
-)
-
-install(
- TARGETS blob
- RUNTIME DESTINATION bin/
-)
-
-# TEST
-option(BUILD_TESTING "Turn on tests" ON)
-if (BUILD_TESTING)
- file(GLOB TEST_CODE "./test/*.cpp")
- list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp")
- enable_testing()
-
- find_package(GTest CONFIG REQUIRED)
- include_directories(
- ${GTEST_INCLUDE_DIR}
- ./test
- )
-
- add_executable(
- runTests
-
- ${SOURCE_CODE}
- ${TEST_CODE}
- )
-
- target_include_directories(
- runTests
- PUBLIC
- ${INCLUDE_DIRS}
- )
-
- target_link_libraries(
- runTests
-
- ${LIBS}
- GTest::gtest_main
- )
-
- include(GoogleTest)
- gtest_discover_tests(runTests)
-endif()
diff --git a/services/blob/old/src/AwsS3Bucket.h b/services/blob/old/src/AwsS3Bucket.h
deleted file mode 100644
--- a/services/blob/old/src/AwsS3Bucket.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-
-#include <functional>
-#include <string>
-#include <vector>
-
-namespace comm {
-namespace network {
-
-class AwsS3Bucket {
- const std::string name;
-
-public:
- AwsS3Bucket(const std::string name);
-
- std::vector<std::string> listObjects() const;
- bool isAvailable() const;
- size_t getObjectSize(const std::string &objectName) const;
- void renameObject(const std::string ¤tName, const std::string &newName);
- void writeObject(const std::string &objectName, const std::string &data);
- std::string getObjectData(const std::string &objectName) const;
- void getObjectDataChunks(
- const std::string &objectName,
- const std::function<void(const std::string &)> &callback,
- const size_t chunkSize) const;
- void appendToObject(const std::string &objectName, const std::string &data);
- void clearObject(const std::string &objectName);
- void removeObject(const std::string &objectName);
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/AwsS3Bucket.cpp b/services/blob/old/src/AwsS3Bucket.cpp
deleted file mode 100644
--- a/services/blob/old/src/AwsS3Bucket.cpp
+++ /dev/null
@@ -1,224 +0,0 @@
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-#include "GlobalConstants.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "Tools.h"
-
-#include <aws/s3/model/CopyObjectRequest.h>
-#include <aws/s3/model/DeleteObjectRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/HeadBucketRequest.h>
-#include <aws/s3/model/HeadObjectRequest.h>
-#include <aws/s3/model/ListObjectsRequest.h>
-#include <aws/s3/model/Object.h>
-#include <aws/s3/model/PutObjectRequest.h>
-
-#include <boost/interprocess/streams/bufferstream.hpp>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) {
-}
-
-std::vector<std::string> AwsS3Bucket::listObjects() const {
- Aws::S3::Model::ListObjectsRequest request;
- request.SetBucket(this->name);
- std::vector<std::string> result;
-
- Aws::S3::Model::ListObjectsOutcome outcome =
- getS3Client()->ListObjects(request);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- Aws::Vector<Aws::S3::Model::Object> objects =
- outcome.GetResult().GetContents();
- for (Aws::S3::Model::Object &object : objects) {
- result.push_back(object.GetKey());
- }
- return result;
-}
-
-bool AwsS3Bucket::isAvailable() const {
- Aws::S3::Model::HeadBucketRequest headRequest;
- headRequest.SetBucket(this->name);
- Aws::S3::Model::HeadBucketOutcome outcome =
- getS3Client()->HeadBucket(headRequest);
- return outcome.IsSuccess();
-}
-
-size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const {
- Aws::S3::Model::HeadObjectRequest headRequest;
- headRequest.SetBucket(this->name);
- headRequest.SetKey(objectName);
- Aws::S3::Model::HeadObjectOutcome headOutcome =
- getS3Client()->HeadObject(headRequest);
- if (!headOutcome.IsSuccess()) {
- throw std::runtime_error(headOutcome.GetError().GetMessage());
- }
- return headOutcome.GetResultWithOwnership().GetContentLength();
-}
-
-void AwsS3Bucket::renameObject(
- const std::string ¤tName,
- const std::string &newName) {
- Aws::S3::Model::CopyObjectRequest copyRequest;
- copyRequest.SetCopySource(this->name + "/" + currentName);
- copyRequest.SetKey(newName);
- copyRequest.SetBucket(this->name);
-
- Aws::S3::Model::CopyObjectOutcome copyOutcome =
- getS3Client()->CopyObject(copyRequest);
- if (!copyOutcome.IsSuccess()) {
- throw std::runtime_error(copyOutcome.GetError().GetMessage());
- }
-
- this->removeObject(currentName);
-}
-
-void AwsS3Bucket::writeObject(
- const std::string &objectName,
- const std::string &data) {
- // we don't have to handle multiple write here because the GRPC limit is 4MB
- // and minimum size of data to perform multipart upload is 5MB
- Aws::S3::Model::PutObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
-
- std::shared_ptr<Aws::IOStream> body = std::shared_ptr<Aws::IOStream>(
- new boost::interprocess::bufferstream((char *)data.data(), data.size()));
-
- request.SetBody(body);
-
- Aws::S3::Model::PutObjectOutcome outcome = getS3Client()->PutObject(request);
-
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-}
-
-std::string AwsS3Bucket::getObjectData(const std::string &objectName) const {
- Aws::S3::Model::GetObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
-
- const size_t size = this->getObjectSize(objectName);
- if (size > GRPC_CHUNK_SIZE_LIMIT) {
- throw tools::invalid_argument_error(std::string(
- "The file is too big(" + std::to_string(size) + " bytes, max is " +
- std::to_string(GRPC_CHUNK_SIZE_LIMIT) +
- "bytes), please, use getObjectDataChunks"));
- }
- Aws::S3::Model::GetObjectOutcome outcome = getS3Client()->GetObject(request);
-
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- std::string cpy = result;
-
- return result;
-}
-
-void AwsS3Bucket::getObjectDataChunks(
- const std::string &objectName,
- const std::function<void(const std::string &)> &callback,
- const size_t chunkSize) const {
- const size_t fileSize = this->getObjectSize(objectName);
-
- if (fileSize == 0) {
- return;
- }
-
- Aws::S3::Model::GetObjectRequest request;
- request.SetBucket(this->name);
- request.SetKey(objectName);
- for (size_t offset = 0; offset < fileSize; offset += chunkSize) {
- const size_t nextSize = std::min(chunkSize, fileSize - offset);
-
- std::string range = "bytes=" + std::to_string(offset) + "-" +
- std::to_string(offset + nextSize);
- request.SetRange(range);
-
- Aws::S3::Model::GetObjectOutcome getOutcome =
- getS3Client()->GetObject(request);
- if (!getOutcome.IsSuccess()) {
- throw std::runtime_error(getOutcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile =
- getOutcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- result.resize(nextSize);
- callback(result);
- }
-}
-
-void AwsS3Bucket::appendToObject(
- const std::string &objectName,
- const std::string &data) {
- const size_t objectSize = this->getObjectSize(objectName);
- if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- std::string currentData = this->getObjectData(objectName);
- currentData += data;
- this->writeObject(objectName, currentData);
- return;
- }
- size_t currentSize = 0;
- MultiPartUploader uploader(
- getS3Client(), this->name, objectName + "-multipart");
- std::function<void(const std::string &)> callback =
- [&uploader, &data, ¤tSize, objectSize](const std::string &chunk) {
- currentSize += chunk.size();
- if (currentSize < objectSize) {
- uploader.addPart(chunk);
- } else if (currentSize == objectSize) {
- uploader.addPart(std::string(chunk + data));
- } else {
- throw std::runtime_error(
- "size of chunks exceeds the size of the object");
- }
- };
- this->getObjectDataChunks(
- objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE);
- uploader.finishUpload();
- // this will overwrite the target file
- this->renameObject(objectName + "-multipart", objectName);
- const size_t newSize = this->getObjectSize(objectName);
- if (objectSize + data.size() != newSize) {
- throw std::runtime_error(
- "append to object " + objectName +
- " has been performed but the final sizes don't "
- "match, the size is now [" +
- std::to_string(newSize) + "] but should be [" +
- std::to_string(objectSize + data.size()) + "]");
- }
-}
-
-void AwsS3Bucket::clearObject(const std::string &objectName) {
- this->writeObject(objectName, "");
-}
-
-void AwsS3Bucket::removeObject(const std::string &objectName) {
- Aws::S3::Model::DeleteObjectRequest deleteRequest;
- deleteRequest.SetBucket(this->name);
- deleteRequest.SetKey(objectName);
-
- Aws::S3::Model::DeleteObjectOutcome deleteOutcome =
- getS3Client()->DeleteObject(deleteRequest);
- if (!deleteOutcome.IsSuccess()) {
- throw std::runtime_error(deleteOutcome.GetError().GetMessage());
- }
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/BlobServiceImpl.h b/services/blob/old/src/BlobServiceImpl.h
deleted file mode 100644
--- a/services/blob/old/src/BlobServiceImpl.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include "S3Path.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <aws/core/Aws.h>
-
-#include <grpcpp/grpcpp.h>
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-class BlobServiceImpl final : public blob::BlobService::CallbackService {
- void verifyBlobHash(
- const std::string &expectedBlobHash,
- const database::S3Path &s3Path);
- void assignVariableIfEmpty(
- const std::string &label,
- std::string &lvalue,
- const std::string &rvalue);
-
-public:
- BlobServiceImpl();
- virtual ~BlobServiceImpl();
-
- grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
- Put(grpc::CallbackServerContext *context) override;
- grpc::ServerWriteReactor<blob::GetResponse> *
- Get(grpc::CallbackServerContext *context,
- const blob::GetRequest *request) override;
- grpc::ServerUnaryReactor *Remove(
- grpc::CallbackServerContext *context,
- const blob::RemoveRequest *request,
- google::protobuf::Empty *response) override;
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/BlobServiceImpl.cpp b/services/blob/old/src/BlobServiceImpl.cpp
deleted file mode 100644
--- a/services/blob/old/src/BlobServiceImpl.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-#include "BlobServiceImpl.h"
-
-#include "Constants.h"
-#include "DatabaseManager.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "Tools.h"
-
-#include "GetReactor.h"
-#include "PutReactor.h"
-
-#include <glog/logging.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-BlobServiceImpl::BlobServiceImpl() {
- Aws::InitAPI({});
-
- if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) {
- throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available");
- }
-}
-
-BlobServiceImpl::~BlobServiceImpl() {
- Aws::ShutdownAPI({});
-}
-
-void BlobServiceImpl::verifyBlobHash(
- const std::string &expectedBlobHash,
- const database::S3Path &s3Path) {
- const std::string computedBlobHash = tools::computeHashForFile(s3Path);
- if (expectedBlobHash != computedBlobHash) {
- throw std::runtime_error(
- "blob hash mismatch, expected: [" + expectedBlobHash +
- "], computed: [" + computedBlobHash + "]");
- }
-}
-
-void BlobServiceImpl::assignVariableIfEmpty(
- const std::string &label,
- std::string &lvalue,
- const std::string &rvalue) {
- if (!lvalue.empty()) {
- throw std::runtime_error(
- "multiple assignment for variable " + label + " is not allowed");
- }
- lvalue = rvalue;
-}
-
-grpc::ServerBidiReactor<blob::PutRequest, blob::PutResponse> *
-BlobServiceImpl::Put(grpc::CallbackServerContext *context) {
- return new reactor::PutReactor();
-}
-
-grpc::ServerWriteReactor<blob::GetResponse> *BlobServiceImpl::Get(
- grpc::CallbackServerContext *context,
- const blob::GetRequest *request) {
-
- reactor::GetReactor *gr = new reactor::GetReactor(request);
- gr->start();
- return gr;
-}
-
-grpc::ServerUnaryReactor *BlobServiceImpl::Remove(
- grpc::CallbackServerContext *context,
- const blob::RemoveRequest *request,
- google::protobuf::Empty *response) {
- grpc::Status status = grpc::Status::OK;
- const std::string holder = request->holder();
- try {
- std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
- database::DatabaseManager::getInstance().findReverseIndexItemByHolder(
- holder);
- if (reverseIndexItem == nullptr) {
- throw std::runtime_error("no item found for holder: " + holder);
- }
- // TODO handle cleanup here properly
- // for now the object's being removed right away
- const std::string blobHash = reverseIndexItem->getBlobHash();
- database::DatabaseManager::getInstance().removeReverseIndexItem(holder);
- if (database::DatabaseManager::getInstance()
- .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash())
- .size() == 0) {
- database::S3Path s3Path = tools::findS3Path(*reverseIndexItem);
- AwsS3Bucket bucket = getBucket(s3Path.getBucketName());
- bucket.removeObject(s3Path.getObjectName());
-
- database::DatabaseManager::getInstance().removeBlobItem(blobHash);
- }
- } catch (std::runtime_error &e) {
- LOG(ERROR) << e.what();
- status = grpc::Status(grpc::StatusCode::INTERNAL, e.what());
- }
- auto *reactor = context->DefaultReactor();
- reactor->Finish(status);
- return reactor;
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Constants.h b/services/blob/old/src/Constants.h
deleted file mode 100644
--- a/services/blob/old/src/Constants.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#pragma once
-
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-
-// 5MB limit
-const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024;
-
-const std::string BLOB_BUCKET_NAME = "commapp-blob";
-
-const std::string BLOB_TABLE_NAME =
- tools::decorateTableName("blob-service-blob");
-const std::string REVERSE_INDEX_TABLE_NAME =
- tools::decorateTableName("blob-service-reverse-index");
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/BlobItem.h b/services/blob/old/src/DatabaseEntities/BlobItem.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/BlobItem.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include "Item.h"
-#include "S3Path.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-class BlobItem : public Item {
-
- std::string blobHash;
- S3Path s3Path;
- uint64_t created = 0;
-
- void validate() const override;
-
-public:
- static const std::string TABLE_NAME;
- static const std::string FIELD_BLOB_HASH;
- static const std::string FIELD_S3_PATH;
- static const std::string FIELD_CREATED;
-
- BlobItem() {
- }
- BlobItem(
- const std::string blobHash,
- const S3Path s3Path,
- uint64_t created = 0);
- BlobItem(const AttributeValues &itemFromDB);
-
- void assignItemFromDatabase(const AttributeValues &itemFromDB) override;
-
- std::string getTableName() const override;
- PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override;
- PrimaryKeyValue getPrimaryKeyValue() const override;
-
- std::string getBlobHash() const;
- S3Path getS3Path() const;
- uint64_t getCreated() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/BlobItem.cpp b/services/blob/old/src/DatabaseEntities/BlobItem.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/BlobItem.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-#include "BlobItem.h"
-
-#include "Constants.h"
-
-namespace comm {
-namespace network {
-namespace database {
-
-const std::string BlobItem::FIELD_BLOB_HASH = "blobHash";
-const std::string BlobItem::FIELD_S3_PATH = "s3Path";
-const std::string BlobItem::FIELD_CREATED = "created";
-
-const std::string BlobItem::TABLE_NAME = BLOB_TABLE_NAME;
-
-BlobItem::BlobItem(
- const std::string blobHash,
- const S3Path s3Path,
- uint64_t created)
- : blobHash(blobHash), s3Path(s3Path), created(created) {
- this->validate();
-}
-
-BlobItem::BlobItem(const AttributeValues &itemFromDB) {
- this->assignItemFromDatabase(itemFromDB);
-}
-
-void BlobItem::validate() const {
- if (!this->blobHash.size()) {
- throw std::runtime_error("blobHash empty");
- }
- this->s3Path.validate();
-}
-
-void BlobItem::assignItemFromDatabase(const AttributeValues &itemFromDB) {
- try {
- this->blobHash = itemFromDB.at(BlobItem::FIELD_BLOB_HASH).GetS();
- this->s3Path = S3Path(itemFromDB.at(BlobItem::FIELD_S3_PATH).GetS());
- this->created = std::stoll(
- std::string(itemFromDB.at(BlobItem::FIELD_CREATED).GetS()).c_str());
- } catch (std::logic_error &e) {
- throw std::runtime_error(
- "invalid blob item provided, " + std::string(e.what()));
- }
- this->validate();
-}
-
-std::string BlobItem::getTableName() const {
- return BlobItem::TABLE_NAME;
-}
-
-PrimaryKeyDescriptor BlobItem::getPrimaryKeyDescriptor() const {
- return PrimaryKeyDescriptor(BlobItem::FIELD_BLOB_HASH);
-}
-
-PrimaryKeyValue BlobItem::getPrimaryKeyValue() const {
- return PrimaryKeyValue(this->blobHash);
-}
-
-std::string BlobItem::getBlobHash() const {
- return this->blobHash;
-}
-
-S3Path BlobItem::getS3Path() const {
- return this->s3Path;
-}
-
-uint64_t BlobItem::getCreated() const {
- return this->created;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h b/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#pragma once
-
-#include "Item.h"
-
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-/**
- * Needs blobHash(pk)-index that projects:
- * blobHash
- * holder
- */
-class ReverseIndexItem : public Item {
-
- std::string holder;
- std::string blobHash;
-
- void validate() const override;
-
-public:
- static const std::string TABLE_NAME;
- static const std::string FIELD_HOLDER;
- static const std::string FIELD_BLOB_HASH;
-
- ReverseIndexItem() {
- }
- ReverseIndexItem(const std::string holder, const std::string blobHash);
- ReverseIndexItem(const AttributeValues &itemFromDB);
-
- void assignItemFromDatabase(const AttributeValues &itemFromDB) override;
-
- std::string getTableName() const override;
- PrimaryKeyDescriptor getPrimaryKeyDescriptor() const override;
- PrimaryKeyValue getPrimaryKeyValue() const override;
-
- std::string getHolder() const;
- std::string getBlobHash() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp b/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseEntities/ReverseIndexItem.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-#include "ReverseIndexItem.h"
-
-#include "Constants.h"
-
-namespace comm {
-namespace network {
-namespace database {
-
-const std::string ReverseIndexItem::FIELD_HOLDER = "holder";
-const std::string ReverseIndexItem::FIELD_BLOB_HASH = "blobHash";
-
-const std::string ReverseIndexItem::TABLE_NAME = REVERSE_INDEX_TABLE_NAME;
-
-ReverseIndexItem::ReverseIndexItem(
- const std::string holder,
- const std::string blobHash)
- : holder(holder), blobHash(blobHash) {
- this->validate();
-}
-ReverseIndexItem::ReverseIndexItem(const AttributeValues &itemFromDB) {
- this->assignItemFromDatabase(itemFromDB);
-}
-
-void ReverseIndexItem::validate() const {
- if (!this->holder.size()) {
- throw std::runtime_error("reverse index empty");
- }
- if (!this->blobHash.size()) {
- throw std::runtime_error("blobHash empty");
- }
-}
-
-void ReverseIndexItem::assignItemFromDatabase(
- const AttributeValues &itemFromDB) {
- this->holder = itemFromDB.at(ReverseIndexItem::FIELD_HOLDER).GetS();
- this->blobHash = itemFromDB.at(ReverseIndexItem::FIELD_BLOB_HASH).GetS();
- this->validate();
-}
-
-std::string ReverseIndexItem::getTableName() const {
- return ReverseIndexItem::TABLE_NAME;
-}
-
-PrimaryKeyDescriptor ReverseIndexItem::getPrimaryKeyDescriptor() const {
- return PrimaryKeyDescriptor(ReverseIndexItem::FIELD_HOLDER);
-}
-
-PrimaryKeyValue ReverseIndexItem::getPrimaryKeyValue() const {
- return PrimaryKeyValue(this->holder);
-}
-
-std::string ReverseIndexItem::getHolder() const {
- return this->holder;
-}
-
-std::string ReverseIndexItem::getBlobHash() const {
- return this->blobHash;
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseManager.h b/services/blob/old/src/DatabaseManager.h
deleted file mode 100644
--- a/services/blob/old/src/DatabaseManager.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#pragma once
-
-#include "BlobItem.h"
-#include "DatabaseEntitiesTools.h"
-#include "DatabaseManagerBase.h"
-#include "DynamoDBTools.h"
-#include "ReverseIndexItem.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/model/AttributeDefinition.h>
-#include <aws/dynamodb/model/DeleteItemRequest.h>
-#include <aws/dynamodb/model/GetItemRequest.h>
-#include <aws/dynamodb/model/PutItemRequest.h>
-
-#include <memory>
-#include <stdexcept>
-#include <string>
-#include <vector>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// this class should be thread-safe in case any shared resources appear
-class DatabaseManager : public DatabaseManagerBase {
-public:
- static DatabaseManager &getInstance();
-
- void putBlobItem(const BlobItem &item);
- std::shared_ptr<BlobItem> findBlobItem(const std::string &blobHash);
- void removeBlobItem(const std::string &blobHash);
-
- void putReverseIndexItem(const ReverseIndexItem &item);
- std::shared_ptr<ReverseIndexItem>
- findReverseIndexItemByHolder(const std::string &holder);
- std::vector<std::shared_ptr<database::ReverseIndexItem>>
- findReverseIndexItemsByHash(const std::string &blobHash);
- void removeReverseIndexItem(const std::string &holder);
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/DatabaseManager.cpp b/services/blob/old/src/DatabaseManager.cpp
deleted file mode 100644
--- a/services/blob/old/src/DatabaseManager.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-#include "DatabaseManager.h"
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <aws/core/utils/Outcome.h>
-#include <aws/dynamodb/model/QueryRequest.h>
-#include <aws/dynamodb/model/ScanRequest.h>
-
-#include <vector>
-
-namespace comm {
-namespace network {
-namespace database {
-
-DatabaseManager &DatabaseManager::getInstance() {
- static DatabaseManager instance;
- return instance;
-}
-
-void DatabaseManager::putBlobItem(const BlobItem &item) {
- Aws::DynamoDB::Model::PutItemRequest request;
- request.SetTableName(BlobItem::TABLE_NAME);
- request.AddItem(
- BlobItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(item.getBlobHash()));
- request.AddItem(
- BlobItem::FIELD_S3_PATH,
- Aws::DynamoDB::Model::AttributeValue(item.getS3Path().getFullPath()));
- request.AddItem(
- BlobItem::FIELD_CREATED,
- Aws::DynamoDB::Model::AttributeValue(
- std::to_string(tools::getCurrentTimestamp())));
-
- this->innerPutItem(std::make_shared<BlobItem>(item), request);
-}
-
-std::shared_ptr<BlobItem>
-DatabaseManager::findBlobItem(const std::string &blobHash) {
- Aws::DynamoDB::Model::GetItemRequest request;
- request.AddKey(
- BlobItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(blobHash));
- return this->innerFindItem<BlobItem>(request);
-}
-
-void DatabaseManager::removeBlobItem(const std::string &blobHash) {
- std::shared_ptr<BlobItem> item = this->findBlobItem(blobHash);
- if (item == nullptr) {
- return;
- }
- this->innerRemoveItem(*item);
-}
-
-void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) {
- if (this->findReverseIndexItemByHolder(item.getHolder()) != nullptr) {
- throw std::runtime_error(
- "An item for the given holder [" + item.getHolder() +
- "] already exists");
- }
- Aws::DynamoDB::Model::PutItemRequest request;
- request.SetTableName(ReverseIndexItem::TABLE_NAME);
- request.AddItem(
- ReverseIndexItem::FIELD_HOLDER,
- Aws::DynamoDB::Model::AttributeValue(item.getHolder()));
- request.AddItem(
- ReverseIndexItem::FIELD_BLOB_HASH,
- Aws::DynamoDB::Model::AttributeValue(item.getBlobHash()));
-
- this->innerPutItem(std::make_shared<ReverseIndexItem>(item), request);
-}
-
-std::shared_ptr<ReverseIndexItem>
-DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) {
- Aws::DynamoDB::Model::GetItemRequest request;
- request.AddKey(
- ReverseIndexItem::FIELD_HOLDER,
- Aws::DynamoDB::Model::AttributeValue(holder));
-
- return this->innerFindItem<ReverseIndexItem>(request);
-}
-
-std::vector<std::shared_ptr<database::ReverseIndexItem>>
-DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) {
- std::vector<std::shared_ptr<database::ReverseIndexItem>> result;
-
- Aws::DynamoDB::Model::QueryRequest req;
- req.SetTableName(ReverseIndexItem::TABLE_NAME);
- req.SetKeyConditionExpression("blobHash = :valueToMatch");
-
- AttributeValues attributeValues;
- attributeValues.emplace(":valueToMatch", blobHash);
-
- req.SetExpressionAttributeValues(attributeValues);
- req.SetIndexName("blobHash-index");
-
- const Aws::DynamoDB::Model::QueryOutcome &outcome =
- getDynamoDBClient()->Query(req);
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- const Aws::Vector<AttributeValues> &items = outcome.GetResult().GetItems();
- for (auto &item : items) {
- result.push_back(std::make_shared<database::ReverseIndexItem>(item));
- }
-
- return result;
-}
-
-void DatabaseManager::removeReverseIndexItem(const std::string &holder) {
- std::shared_ptr<database::ReverseIndexItem> item =
- findReverseIndexItemByHolder(holder);
- if (item == nullptr) {
- return;
- }
- this->innerRemoveItem(*item);
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/MultiPartUploader.h b/services/blob/old/src/MultiPartUploader.h
deleted file mode 100644
--- a/services/blob/old/src/MultiPartUploader.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/CompleteMultipartUploadRequest.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-class MultiPartUploader {
- std::shared_ptr<Aws::S3::S3Client> client;
- const std::string bucketName;
- const std::string objectName;
- std::vector<size_t> partsSizes;
-
- Aws::S3::Model::CompleteMultipartUploadRequest completeMultipartUploadRequest;
- Aws::S3::Model::CompletedMultipartUpload completedMultipartUpload;
- std::string uploadId;
-
- size_t partNumber = 1;
-
-public:
- MultiPartUploader(
- std::shared_ptr<Aws::S3::S3Client> client,
- const std::string bucketName,
- const std::string objectName);
- void addPart(const std::string &part);
- void finishUpload();
-};
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/MultiPartUploader.cpp b/services/blob/old/src/MultiPartUploader.cpp
deleted file mode 100644
--- a/services/blob/old/src/MultiPartUploader.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-#include "MultiPartUploader.h"
-#include "Tools.h"
-
-#include <aws/core/utils/HashingUtils.h>
-#include <aws/s3/model/CreateMultipartUploadRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/Object.h>
-#include <aws/s3/model/UploadPartRequest.h>
-
-#include <boost/interprocess/streams/bufferstream.hpp>
-
-namespace comm {
-namespace network {
-
-MultiPartUploader::MultiPartUploader(
- std::shared_ptr<Aws::S3::S3Client> client,
- const std::string bucketName,
- const std::string objectName)
- : client(client), bucketName(bucketName), objectName(objectName) {
- this->completeMultipartUploadRequest.SetBucket(this->bucketName);
- this->completeMultipartUploadRequest.SetKey(this->objectName);
-
- Aws::S3::Model::CreateMultipartUploadRequest createRequest;
- createRequest.SetBucket(this->bucketName);
- createRequest.SetKey(this->objectName);
- createRequest.SetContentType("application/octet-stream");
-
- Aws::S3::Model::CreateMultipartUploadOutcome createOutcome =
- this->client->CreateMultipartUpload(createRequest);
-
- if (!createOutcome.IsSuccess()) {
- throw std::runtime_error(createOutcome.GetError().GetMessage());
- }
- this->uploadId = createOutcome.GetResult().GetUploadId();
- this->completeMultipartUploadRequest.SetUploadId(this->uploadId);
-}
-
-void MultiPartUploader::addPart(const std::string &part) {
- Aws::S3::Model::UploadPartRequest uploadRequest;
- uploadRequest.SetBucket(this->bucketName);
- uploadRequest.SetKey(this->objectName);
- uploadRequest.SetPartNumber(this->partNumber);
- uploadRequest.SetUploadId(this->uploadId);
-
- std::shared_ptr<Aws::IOStream> body = std::shared_ptr<Aws::IOStream>(
- new boost::interprocess::bufferstream((char *)part.data(), part.size()));
-
- uploadRequest.SetBody(body);
-
- Aws::Utils::ByteBuffer partMd5(Aws::Utils::HashingUtils::CalculateMD5(*body));
- uploadRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(partMd5));
-
- uploadRequest.SetContentLength(part.size());
-
- Aws::S3::Model::UploadPartOutcome uploadPartOutcome =
- this->client->UploadPart(uploadRequest);
- Aws::S3::Model::CompletedPart completedPart;
- completedPart.SetPartNumber(this->partNumber);
- std::string eTag = uploadPartOutcome.GetResult().GetETag();
- if (eTag.empty()) {
- throw std::runtime_error("etag empty");
- }
- completedPart.SetETag(eTag);
- this->completedMultipartUpload.AddParts(completedPart);
- ++this->partNumber;
-}
-
-void MultiPartUploader::finishUpload() {
- if (!this->completedMultipartUpload.PartsHasBeenSet()) {
- return;
- }
- this->completeMultipartUploadRequest.SetMultipartUpload(
- this->completedMultipartUpload);
-
- Aws::S3::Model::CompleteMultipartUploadOutcome completeUploadOutcome =
- this->client->CompleteMultipartUpload(
- this->completeMultipartUploadRequest);
-
- if (!completeUploadOutcome.IsSuccess()) {
- throw std::runtime_error(completeUploadOutcome.GetError().GetMessage());
- }
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Reactors/server/GetReactor.h b/services/blob/old/src/Reactors/server/GetReactor.h
deleted file mode 100644
--- a/services/blob/old/src/Reactors/server/GetReactor.h
+++ /dev/null
@@ -1,87 +0,0 @@
-#pragma once
-
-#include "GlobalConstants.h"
-#include "S3Tools.h"
-#include <ServerWriteReactorBase.h>
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <aws/s3/model/GetObjectRequest.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class GetReactor
- : public ServerWriteReactorBase<blob::GetRequest, blob::GetResponse> {
- size_t offset = 0;
- size_t fileSize = 0;
- const size_t chunkSize =
- GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE;
- database::S3Path s3Path;
- Aws::S3::Model::GetObjectRequest getRequest;
-
-public:
- using ServerWriteReactorBase<blob::GetRequest, blob::GetResponse>::
- ServerWriteReactorBase;
-
- std::unique_ptr<grpc::Status>
- writeResponse(blob::GetResponse *response) override {
- if (this->offset >= this->fileSize) {
- return std::make_unique<grpc::Status>(grpc::Status::OK);
- }
-
- const size_t nextSize =
- std::min(this->chunkSize, this->fileSize - this->offset);
-
- std::string range = "bytes=" + std::to_string(this->offset) + "-" +
- std::to_string(this->offset + nextSize - 1);
- this->getRequest.SetRange(range);
-
- Aws::S3::Model::GetObjectOutcome getOutcome =
- getS3Client()->GetObject(this->getRequest);
- if (!getOutcome.IsSuccess()) {
- return std::make_unique<grpc::Status>(
- grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage());
- }
-
- Aws::IOStream &retrievedFile =
- getOutcome.GetResultWithOwnership().GetBody();
-
- std::stringstream buffer;
- buffer << retrievedFile.rdbuf();
- std::string result(buffer.str());
- response->set_datachunk(result);
-
- this->offset += nextSize;
- return nullptr;
- }
-
- void initialize() override {
- this->s3Path = tools::findS3Path(this->request.holder());
-
- AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName());
- if (!bucket.isAvailable()) {
- throw std::runtime_error(
- "bucket [" + this->s3Path.getBucketName() + "] not available");
- }
-
- this->fileSize = bucket.getObjectSize(this->s3Path.getObjectName());
- if (this->fileSize == 0) {
- throw std::runtime_error("object empty");
- }
-
- this->getRequest.SetBucket(this->s3Path.getBucketName());
- this->getRequest.SetKey(this->s3Path.getObjectName());
- };
-
- void doneCallback() override{};
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Reactors/server/PutReactor.h b/services/blob/old/src/Reactors/server/PutReactor.h
deleted file mode 100644
--- a/services/blob/old/src/Reactors/server/PutReactor.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#pragma once
-
-#include "ServerBidiReactorBase.h"
-
-#include <blob.grpc.pb.h>
-#include <blob.pb.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace reactor {
-
-class PutReactor
- : public ServerBidiReactorBase<blob::PutRequest, blob::PutResponse> {
- std::string holder;
- std::string blobHash;
- std::string currentChunk;
- std::unique_ptr<database::S3Path> s3Path;
- std::shared_ptr<database::BlobItem> blobItem;
- std::unique_ptr<MultiPartUploader> uploader;
- bool dataExists = false;
-
-public:
- std::unique_ptr<ServerBidiReactorStatus> handleRequest(
- blob::PutRequest request,
- blob::PutResponse *response) override {
- if (this->holder.empty()) {
- if (request.holder().empty()) {
- throw std::runtime_error("holder has not been provided");
- }
- this->holder = request.holder();
- return nullptr;
- }
- if (this->blobHash.empty()) {
- if (request.blobhash().empty()) {
- throw std::runtime_error("blob hash has not been provided");
- }
- this->blobHash = request.blobhash();
- this->blobItem =
- database::DatabaseManager::getInstance().findBlobItem(this->blobHash);
- if (this->blobItem != nullptr) {
- this->s3Path =
- std::make_unique<database::S3Path>(this->blobItem->getS3Path());
- response->set_dataexists(true);
- this->dataExists = true;
- return std::make_unique<ServerBidiReactorStatus>(
- grpc::Status::OK, true);
- }
- this->s3Path = std::make_unique<database::S3Path>(
- tools::generateS3Path(BLOB_BUCKET_NAME, this->blobHash));
- this->blobItem =
- std::make_shared<database::BlobItem>(this->blobHash, *s3Path);
- response->set_dataexists(false);
- return nullptr;
- }
- if (request.datachunk().empty()) {
- return std::make_unique<ServerBidiReactorStatus>(grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT, "data chunk expected"));
- }
- if (this->uploader == nullptr) {
- this->uploader = std::make_unique<MultiPartUploader>(
- getS3Client(), BLOB_BUCKET_NAME, s3Path->getObjectName());
- }
- this->currentChunk += request.datachunk();
- if (this->currentChunk.size() > AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) {
- this->uploader->addPart(this->currentChunk);
- this->currentChunk.clear();
- }
- return nullptr;
- }
-
- void terminateCallback() override {
- if (this->holder.empty()) {
- throw std::runtime_error("holder has not been provided");
- }
- if (this->blobHash.empty()) {
- throw std::runtime_error("blob hash has not been provided");
- }
- if (!this->status.status.ok()) {
- return;
- }
- const database::ReverseIndexItem reverseIndexItem(
- this->holder, this->blobHash);
- if (this->uploader == nullptr) {
- if (!this->dataExists) {
- throw std::runtime_error("uploader not initialized as expected");
- }
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- return;
- }
- if (!this->readingAborted) {
- return;
- }
- if (!currentChunk.empty()) {
- this->uploader->addPart(this->currentChunk);
- }
- this->uploader->finishUpload();
- database::DatabaseManager::getInstance().putBlobItem(*this->blobItem);
- database::DatabaseManager::getInstance().putReverseIndexItem(
- reverseIndexItem);
- }
-};
-
-} // namespace reactor
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Path.h b/services/blob/old/src/S3Path.h
deleted file mode 100644
--- a/services/blob/old/src/S3Path.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#pragma once
-
-#include <stdexcept>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-// S3 path format: [bucket name]/[object name]
-class S3Path {
- std::string bucketName;
- std::string objectName;
-
-public:
- S3Path();
- S3Path(const S3Path &other);
- S3Path &operator=(const S3Path &other);
- S3Path(const std::string bucketName, const std::string objectName);
- S3Path(const std::string fullPath);
-
- std::string getBucketName() const;
- std::string getObjectName() const;
- std::string getFullPath() const;
- void validate() const;
-};
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Path.cpp b/services/blob/old/src/S3Path.cpp
deleted file mode 100644
--- a/services/blob/old/src/S3Path.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-#include "S3Path.h"
-
-#include <algorithm>
-#include <memory>
-#include <stdexcept>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace database {
-
-S3Path::S3Path() {
-}
-
-S3Path::S3Path(const S3Path &other)
- : bucketName(other.bucketName), objectName(other.objectName) {
- this->validate();
-}
-
-S3Path &S3Path::operator=(const S3Path &other) {
- this->bucketName = other.getBucketName();
- this->objectName = other.getObjectName();
- this->validate();
- return *this;
-}
-
-S3Path::S3Path(const std::string bucketName, const std::string objectName)
- : bucketName(bucketName), objectName(objectName) {
- this->validate();
-}
-
-S3Path::S3Path(const std::string fullPath) {
- if (std::count(fullPath.begin(), fullPath.end(), '/') != 1) {
- throw std::runtime_error(
- "incorrect number of delimiters in S3 path " + fullPath);
- }
- size_t delimiterPos = fullPath.find('/');
- this->bucketName = fullPath.substr(0, delimiterPos);
- this->objectName = fullPath.substr(delimiterPos + 1);
- this->validate();
-}
-
-std::string S3Path::getBucketName() const {
- return this->bucketName;
-}
-
-std::string S3Path::getObjectName() const {
- return this->objectName;
-}
-
-std::string S3Path::getFullPath() const {
- return this->getBucketName() + "/" + this->getObjectName();
-}
-
-void S3Path::validate() const {
- if (!this->bucketName.size()) {
- throw std::runtime_error("referencing S3 path with an empty object name");
- }
- if (!this->bucketName.size()) {
- throw std::runtime_error("referencing S3 path with an empty bucket name");
- }
-}
-
-} // namespace database
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Tools.h b/services/blob/old/src/S3Tools.h
deleted file mode 100644
--- a/services/blob/old/src/S3Tools.h
+++ /dev/null
@@ -1,23 +0,0 @@
-#pragma once
-
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-
-#include <aws/core/Aws.h>
-#include <aws/dynamodb/DynamoDBClient.h>
-#include <aws/s3/S3Client.h>
-
-#include <memory>
-#include <string>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket getBucket(const std::string &bucketName);
-
-std::vector<std::string> listBuckets();
-
-std::unique_ptr<Aws::S3::S3Client> getS3Client();
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/S3Tools.cpp b/services/blob/old/src/S3Tools.cpp
deleted file mode 100644
--- a/services/blob/old/src/S3Tools.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-#include "S3Tools.h"
-#include "Constants.h"
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-#include "Tools.h"
-
-#include <aws/s3/model/Bucket.h>
-
-#include <cstdlib>
-
-namespace comm {
-namespace network {
-
-AwsS3Bucket getBucket(const std::string &bucketName) {
- return AwsS3Bucket(bucketName);
-}
-
-std::vector<std::string> listBuckets() {
- Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets();
- std::vector<std::string> result;
- if (!outcome.IsSuccess()) {
- throw std::runtime_error(outcome.GetError().GetMessage());
- }
- Aws::Vector<Aws::S3::Model::Bucket> buckets =
- outcome.GetResult().GetBuckets();
- for (Aws::S3::Model::Bucket &bucket : buckets) {
- result.push_back(bucket.GetName());
- }
- return result;
-}
-
-std::unique_ptr<Aws::S3::S3Client> getS3Client() {
- Aws::Client::ClientConfiguration config;
- config.region = AWS_REGION;
- if (tools::isSandbox()) {
- config.endpointOverride = Aws::String("localstack:4566");
- config.scheme = Aws::Http::Scheme::HTTP;
- return std::make_unique<Aws::S3::S3Client>(
- config,
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- false);
- }
- return std::make_unique<Aws::S3::S3Client>(config);
-}
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Tools.h b/services/blob/old/src/Tools.h
deleted file mode 100644
--- a/services/blob/old/src/Tools.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#pragma once
-
-#include "DatabaseEntitiesTools.h"
-#include "ReverseIndexItem.h"
-#include "S3Path.h"
-
-namespace comm {
-namespace network {
-namespace tools {
-
-database::S3Path
-generateS3Path(const std::string &bucketName, const std::string &blobHash);
-
-std::string computeHashForFile(const database::S3Path &s3Path);
-
-database::S3Path findS3Path(const std::string &holder);
-
-database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem);
-
-class invalid_argument_error : public std::runtime_error {
-public:
- invalid_argument_error(std::string errorMessage)
- : std::runtime_error(errorMessage) {
- }
-};
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/Tools.cpp b/services/blob/old/src/Tools.cpp
deleted file mode 100644
--- a/services/blob/old/src/Tools.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-#include "Tools.h"
-
-#include "Constants.h"
-#include "DatabaseEntitiesTools.h"
-#include "DatabaseManager.h"
-#include "GlobalConstants.h"
-#include "S3Tools.h"
-
-#include <openssl/sha.h>
-
-#include <chrono>
-#include <cstdlib>
-#include <iomanip>
-#include <string>
-
-namespace comm {
-namespace network {
-namespace tools {
-
-database::S3Path
-generateS3Path(const std::string &bucketName, const std::string &blobHash) {
- return database::S3Path(bucketName, blobHash);
-}
-
-std::string computeHashForFile(const database::S3Path &s3Path) {
- SHA512_CTX ctx;
- SHA512_Init(&ctx);
- const std::function<void(const std::string &)> callback =
- [&ctx](const std::string &chunk) {
- SHA512_Update(&ctx, chunk.data(), chunk.size());
- };
-
- getBucket(s3Path.getBucketName())
- .getObjectDataChunks(
- s3Path.getObjectName(), callback, GRPC_CHUNK_SIZE_LIMIT);
-
- unsigned char hash[SHA512_DIGEST_LENGTH];
- SHA512_Final(hash, &ctx);
-
- std::ostringstream hashStream;
- for (int i = 0; i < SHA512_DIGEST_LENGTH; i++) {
- hashStream << std::hex << std::setfill('0') << std::setw(2)
- << std::nouppercase << (int)hash[i];
- }
-
- return hashStream.str();
-}
-
-database::S3Path findS3Path(const std::string &holder) {
- std::shared_ptr<database::ReverseIndexItem> reverseIndexItem =
- database::DatabaseManager::getInstance().findReverseIndexItemByHolder(
- holder);
- if (reverseIndexItem == nullptr) {
- throw std::runtime_error(
- "provided holder: [" + holder + "] has not been found in the database");
- }
- return findS3Path(*reverseIndexItem);
-}
-
-database::S3Path
-findS3Path(const database::ReverseIndexItem &reverseIndexItem) {
- std::shared_ptr<database::BlobItem> blobItem =
- database::DatabaseManager::getInstance().findBlobItem(
- reverseIndexItem.getBlobHash());
- if (blobItem == nullptr) {
- throw std::runtime_error(
- "no blob found for blobHash: [" + reverseIndexItem.getBlobHash() + "]");
- }
- database::S3Path result = blobItem->getS3Path();
- return result;
-}
-
-} // namespace tools
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/src/server.cpp b/services/blob/old/src/server.cpp
deleted file mode 100644
--- a/services/blob/old/src/server.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "BlobServiceImpl.h"
-
-#include "GlobalConstants.h"
-#include "GlobalTools.h"
-
-#include <glog/logging.h>
-#include <grpcpp/grpcpp.h>
-
-#include <memory>
-
-namespace comm {
-namespace network {
-
-void RunServer() {
- BlobServiceImpl blobService;
-
- grpc::EnableDefaultHealthCheckService(true);
- grpc::ServerBuilder builder;
- builder.AddListeningPort(
- SERVER_LISTEN_ADDRESS, grpc::InsecureServerCredentials());
- builder.RegisterService(&blobService);
- std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
- LOG(INFO) << "server listening at :" << SERVER_LISTEN_ADDRESS;
-
- server->Wait();
-}
-
-} // namespace network
-} // namespace comm
-
-int main(int argc, char **argv) {
- comm::network::tools::InitLogging("blob");
- comm::network::RunServer();
-
- return 0;
-}
diff --git a/services/blob/old/test/DatabaseManagerTest.cpp b/services/blob/old/test/DatabaseManagerTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/DatabaseManagerTest.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "DatabaseManager.h"
-#include "S3Path.h"
-
-#include <algorithm>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <vector>
-
-using namespace comm::network::database;
-
-class DatabaseManagerTest : public testing::Test {
-protected:
- virtual void SetUp() {
- Aws::InitAPI({});
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-std::string generateName() {
- std::chrono::milliseconds ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch());
- return std::to_string(ms.count());
-}
-
-TEST_F(DatabaseManagerTest, TestOperationsOnBlobItems) {
- const BlobItem item(generateName(), S3Path(generateName(), generateName()));
-
- DatabaseManager::getInstance().putBlobItem(item);
- std::shared_ptr<BlobItem> foundItem =
- DatabaseManager::getInstance().findBlobItem(item.getBlobHash());
- EXPECT_NE(foundItem->getCreated(), 0);
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- const BlobItem item2(generateName(), S3Path(generateName(), generateName()));
- DatabaseManager::getInstance().putBlobItem(item2);
- DatabaseManager::getInstance().removeBlobItem(item.getBlobHash());
- DatabaseManager::getInstance().removeBlobItem(item2.getBlobHash());
- EXPECT_EQ(
- DatabaseManager::getInstance().findBlobItem(item.getBlobHash()), nullptr);
- EXPECT_EQ(
- DatabaseManager::getInstance().findBlobItem(item2.getBlobHash()),
- nullptr);
-}
-
-TEST_F(DatabaseManagerTest, TestOperationsOnReverseIndexItems) {
- const ReverseIndexItem item(generateName(), generateName());
-
- DatabaseManager::getInstance().putReverseIndexItem(item);
- std::vector<std::shared_ptr<ReverseIndexItem>> foundItems =
- DatabaseManager::getInstance().findReverseIndexItemsByHash(
- item.getBlobHash());
- EXPECT_EQ(foundItems.size(), 1);
- std::shared_ptr<ReverseIndexItem> foundItem = foundItems.at(0);
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- foundItem = std::dynamic_pointer_cast<ReverseIndexItem>(
- DatabaseManager::getInstance().findReverseIndexItemByHolder(
- item.getHolder()));
- EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash());
- DatabaseManager::getInstance().removeReverseIndexItem(foundItem->getHolder());
-}
diff --git a/services/blob/old/test/MultiPartUploadTest.cpp b/services/blob/old/test/MultiPartUploadTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/MultiPartUploadTest.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "AwsS3Bucket.h"
-#include "Constants.h"
-#include "MultiPartUploader.h"
-#include "S3Tools.h"
-#include "TestTools.h"
-#include "Tools.h"
-
-#include <aws/core/Aws.h>
-#include <aws/s3/S3Client.h>
-
-#include <string>
-
-using namespace comm::network;
-
-class MultiPartUploadTest : public testing::Test {
-protected:
- std::shared_ptr<Aws::S3::S3Client> s3Client;
- std::unique_ptr<AwsS3Bucket> bucket;
-
- virtual void SetUp() {
- Aws::InitAPI({});
- s3Client = std::move(getS3Client());
- bucket = std::make_unique<AwsS3Bucket>(BLOB_BUCKET_NAME);
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-std::string generateNByes(const size_t n) {
- std::string result;
- result.resize(n);
- memset((char *)result.data(), 'A', n);
- return result;
-}
-
-TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart("xxx");
- mpu.addPart("xxx");
- EXPECT_THROW(mpu.finishUpload(), std::runtime_error);
-}
-
-TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1));
- mpu.addPart("xxx");
- EXPECT_THROW(mpu.finishUpload(), std::runtime_error);
-}
-
-TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE));
- mpu.addPart("xxx");
- mpu.finishUpload();
- EXPECT_THROW(
- bucket->getObjectData(objectName), tools::invalid_argument_error);
- EXPECT_EQ(
- bucket->getObjectSize(objectName),
- AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3);
- bucket->removeObject(objectName);
-}
-
-TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) {
- std::string objectName = createObject(*bucket);
- MultiPartUploader mpu(s3Client, BLOB_BUCKET_NAME, objectName);
- mpu.addPart("xxx");
- mpu.finishUpload();
- EXPECT_EQ(bucket->getObjectSize(objectName), 3);
- bucket->removeObject(objectName);
-}
diff --git a/services/blob/old/test/StorageManagerTest.cpp b/services/blob/old/test/StorageManagerTest.cpp
deleted file mode 100644
--- a/services/blob/old/test/StorageManagerTest.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-#include <gtest/gtest.h>
-
-#include "Constants.h"
-#include "S3Tools.h"
-#include "TestTools.h"
-
-#include <aws/core/Aws.h>
-
-#include <chrono>
-#include <memory>
-#include <string>
-
-using namespace comm::network;
-
-class StorageManagerTest : public testing::Test {
-public:
-protected:
- const std::string data =
- "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8"
- "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6";
-
- virtual void SetUp() {
- Aws::InitAPI({});
- }
-
- virtual void TearDown() {
- Aws::ShutdownAPI({});
- }
-};
-
-TEST_F(StorageManagerTest, ObjectOperationsTest) {
- EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).isAvailable());
- std::string objectName = createObject(getBucket(BLOB_BUCKET_NAME));
-
- getBucket(BLOB_BUCKET_NAME).writeObject(objectName, data);
-
- EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), data.size());
- EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).getObjectData(objectName) == data);
- std::string chunkedData;
- const size_t chunkSize = data.size() / 10;
- std::function<void(const std::string &)> callback =
- [&chunkedData](const std::string &chunk) { chunkedData += chunk; };
- getBucket(BLOB_BUCKET_NAME)
- .getObjectDataChunks(objectName, callback, chunkSize);
- EXPECT_TRUE(data == chunkedData);
-
- getBucket(BLOB_BUCKET_NAME).renameObject(objectName, objectName + "c");
- EXPECT_THROW(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName),
- std::runtime_error);
- EXPECT_TRUE(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName + "c") == data);
- getBucket(BLOB_BUCKET_NAME).renameObject(objectName + "c", objectName);
-
- getBucket(BLOB_BUCKET_NAME).clearObject(objectName);
- EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), 0);
-
- getBucket(BLOB_BUCKET_NAME).removeObject(objectName);
- EXPECT_THROW(
- getBucket(BLOB_BUCKET_NAME).getObjectData(objectName),
- std::runtime_error);
-}
diff --git a/services/blob/old/test/TestTools.h b/services/blob/old/test/TestTools.h
deleted file mode 100644
--- a/services/blob/old/test/TestTools.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#pragma once
-
-#include "AwsS3Bucket.h"
-
-#include <chrono>
-#include <string>
-
-namespace comm {
-namespace network {
-
-std::string generateObjectName();
-std::string createObject(AwsS3Bucket bucket);
-
-} // namespace network
-} // namespace comm
diff --git a/services/blob/old/test/TestTools.cpp b/services/blob/old/test/TestTools.cpp
deleted file mode 100644
--- a/services/blob/old/test/TestTools.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-#include "TestTools.h"
-#include "AwsS3Bucket.h"
-
-namespace comm {
-namespace network {
-
-std::string generateObjectName() {
- std::chrono::milliseconds ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch());
- return std::to_string(ms.count());
-}
-
-std::string createObject(AwsS3Bucket bucket) {
- std::string objectName;
- std::vector<std::string> presentObjects;
- do {
- objectName = generateObjectName();
- presentObjects = bucket.listObjects();
- } while (
- std::find(presentObjects.begin(), presentObjects.end(), objectName) !=
- presentObjects.end());
- return objectName;
-}
-
-} // namespace network
-} // namespace comm
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 16, 10:10 PM (19 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2502498
Default Alt Text
D5768.id19013.diff (56 KB)
Attached To
Mode
D5768: [services][blob] Remove old C++ implementation
Attached
Detach File
Event Timeline
Log In to Comment