diff --git a/services/blob/CMakeLists.txt b/services/blob/CMakeLists.txt index 5c5a24171..c254808dc 100644 --- a/services/blob/CMakeLists.txt +++ b/services/blob/CMakeLists.txt @@ -1,130 +1,116 @@ PROJECT(blob C CXX) cmake_minimum_required(VERSION 3.16) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY bin) if(COMMAND cmake_policy) cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) set(CMAKE_CXX_STANDARD 17) set(BUILD_TESTING OFF CACHE BOOL "Turn off tests" FORCE) set(WITH_GTEST "Use Google Test" OFF) # FIND LIBS include(./cmake-components/grpc.cmake) include(./cmake-components/folly.cmake) add_subdirectory(./lib/glog) find_package(AWSSDK REQUIRED COMPONENTS s3 core dynamodb) find_package(Boost 1.40 COMPONENTS program_options REQUIRED) find_package(OpenSSL REQUIRED) # FIND FILES file(GLOB DOUBLE_CONVERSION_SOURCES "./lib/double-conversion/double-conversion/*.cc") if ($ENV{COMM_TEST_SERVICES} MATCHES 1) add_compile_definitions(COMM_TEST_SERVICES) endif() file(GLOB GENERATED_CODE "./_generated/*.cc") -set(DEV_SOURCE_CODE "") -set(DEV_HEADERS_PATH "") if ($ENV{COMM_SERVICES_DEV_MODE} MATCHES 1) add_compile_definitions(COMM_SERVICES_DEV_MODE) - file(GLOB DEV_SOURCE_CODE "./dev/*.cpp" "./src/*.dev.cpp" "./src/**/*.dev.cpp") - set(DEV_HEADERS_PATH "./dev") endif() file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") -list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") - -foreach (ITEM ${DEV_SOURCE_CODE}) - string(REPLACE "/" ";" SPLIT_ITEM ${ITEM}) - list(GET SPLIT_ITEM -1 FILE_FULL_NAME) - string(REPLACE ".dev.cpp" ".cpp" FILE_NAME ${FILE_FULL_NAME}) - list(FILTER SOURCE_CODE EXCLUDE REGEX ".*${FILE_NAME}$") - list(APPEND SOURCE_CODE "${ITEM}") -endforeach() include_directories( ./src ./src/DatabaseEntities ./src/Reactors/ ./src/Reactors/server ./src/Reactors/server/base-reactors ./_generated ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} - ${DEV_HEADERS_PATH} ) # SERVER add_executable( blob ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ) set( LIBS ${GRPC_LIBS} ${AWSSDK_LINK_LIBRARIES} ${Boost_LIBRARIES} OpenSSL::SSL glog::glog ) target_link_libraries( blob ${LIBS} ) install( TARGETS blob RUNTIME DESTINATION bin/ ) # TEST if ($ENV{COMM_TEST_SERVICES} MATCHES 1) file(GLOB TEST_CODE "./test/*.cpp") list(FILTER SOURCE_CODE EXCLUDE REGEX "./src/server.cpp") enable_testing() find_package(GTest REQUIRED) include_directories( ${GTEST_INCLUDE_DIR} ./test ) add_executable( runTests ${GENERATED_CODE} ${DOUBLE_CONVERSION_SOURCES} ${FOLLY_SOURCES} ${SOURCE_CODE} ${TEST_CODE} ) target_link_libraries( runTests ${LIBS} gtest gtest_main ) add_test( NAME runTests COMMAND runTests ) endif() diff --git a/services/blob/dev/DatabaseSimulator.h b/services/blob/dev/DatabaseSimulator.h deleted file mode 100644 index 972b660b9..000000000 --- a/services/blob/dev/DatabaseSimulator.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include "DatabaseEntitiesTools.h" - -#include - -#include -#include - -namespace comm { -namespace network { -namespace database { - -// thread-safe in-memory database -struct DatabaseSimulator { - // hash -> item - folly::ConcurrentHashMap> blob; - // holder -> item - folly::ConcurrentHashMap> - reverseIndex; -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/blob/dev/DevTools.cpp b/services/blob/dev/DevTools.cpp deleted file mode 100644 index 88086b1fa..000000000 --- a/services/blob/dev/DevTools.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "DevTools.h" - -namespace comm { -namespace network { - -std::string createCommPath(const std::string &path) { - if (path.substr(0, commFilesystemPath.size()) == commFilesystemPath) { - return path; - } - return commFilesystemPath + "/" + path; -} - -} // namespace network -} // namespace comm diff --git a/services/blob/dev/DevTools.h b/services/blob/dev/DevTools.h deleted file mode 100644 index ffcaa3428..000000000 --- a/services/blob/dev/DevTools.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include - -namespace comm { -namespace network { - -const std::string commFilesystemPath = "/tmp/comm"; - -std::string createCommPath(const std::string &path); - -} // namespace network -} // namespace comm diff --git a/services/blob/src/AwsS3Bucket.dev.cpp b/services/blob/src/AwsS3Bucket.dev.cpp deleted file mode 100644 index b62098444..000000000 --- a/services/blob/src/AwsS3Bucket.dev.cpp +++ /dev/null @@ -1,109 +0,0 @@ -#include "AwsS3Bucket.h" -#include "DevTools.h" -#include "Tools.h" - -#include -#include -#include - -namespace comm { -namespace network { - -AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) { - std::filesystem::create_directories(commFilesystemPath); -} - -std::vector AwsS3Bucket::listObjects() const { - std::vector result; - for (const auto &entry : - std::filesystem::directory_iterator(commFilesystemPath)) { - result.push_back(entry.path()); - } - return result; -} - -bool AwsS3Bucket::isAvailable() const { - return std::filesystem::exists(commFilesystemPath); -} - -size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const { - return std::filesystem::file_size(createCommPath(objectName)); -} - -void AwsS3Bucket::renameObject( - const std::string ¤tName, - const std::string &newName) { - std::filesystem::rename(createCommPath(currentName), createCommPath(newName)); -} - -void AwsS3Bucket::writeObject( - const std::string &objectName, - const std::string &data) { - if (std::filesystem::exists(createCommPath(objectName))) { - this->clearObject(createCommPath(objectName)); - } - std::ofstream ofs(createCommPath(objectName)); - ofs << data; -} - -std::string AwsS3Bucket::getObjectData(const std::string &objectName) const { - std::ifstream ifs( - createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); - - std::ifstream::pos_type fileSize = ifs.tellg(); - ifs.seekg(0, std::ios::beg); - if (fileSize > GRPC_CHUNK_SIZE_LIMIT) { - throw invalid_argument_error(std::string( - "The file is too big(" + std::to_string(fileSize) + " bytes, max is " + - std::to_string(GRPC_CHUNK_SIZE_LIMIT) + - "bytes), please, use getObjectDataChunks")); - } - - std::string bytes; - bytes.resize(fileSize); - ifs.read((char *)bytes.data(), fileSize); - - return bytes; -} - -void AwsS3Bucket::getObjectDataChunks( - const std::string &objectName, - const std::function &callback, - const size_t chunkSize) const { - std::ifstream ifs( - createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); - - std::ifstream::pos_type fileSize = ifs.tellg(); - - size_t filePos = 0; - while (filePos < fileSize) { - const size_t nextSize = std::min(chunkSize, (size_t)fileSize - filePos); - ifs.seekg(filePos, std::ios::beg); - std::string bytes; - bytes.resize(nextSize); - ifs.read((char *)bytes.data(), nextSize); - filePos += bytes.size(); - callback(bytes); - } -} - -void AwsS3Bucket::appendToObject( - const std::string &objectName, - const std::string &data) { - std::ofstream ofs; - ofs.open(createCommPath(objectName), std::ios_base::app); - ofs << data; -} - -void AwsS3Bucket::clearObject(const std::string &objectName) { - std::filesystem::resize_file(createCommPath(objectName), 0); -} - -void AwsS3Bucket::removeObject(const std::string &objectName) { - std::filesystem::remove(createCommPath(objectName)); -} - -} // namespace network -} // namespace comm diff --git a/services/blob/src/AwsTools.cpp b/services/blob/src/AwsTools.cpp index 66945b0d0..8d5737a89 100644 --- a/services/blob/src/AwsTools.cpp +++ b/services/blob/src/AwsTools.cpp @@ -1,41 +1,52 @@ #include "AwsTools.h" #include "Constants.h" #include "Tools.h" #include namespace comm { namespace network { AwsS3Bucket getBucket(const std::string &bucketName) { return AwsS3Bucket(bucketName); } std::vector listBuckets() { Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets(); std::vector result; if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector buckets = outcome.GetResult().GetBuckets(); for (Aws::S3::Model::Bucket &bucket : buckets) { result.push_back(bucket.GetName()); } return result; } std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; +#ifdef COMM_SERVICES_DEV_MODE + config.endpointOverride = Aws::String("localstack:4566"); + config.scheme = Aws::Http::Scheme::HTTP; +#endif return std::make_unique(config); } std::unique_ptr getS3Client() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; +#ifdef COMM_SERVICES_DEV_MODE + config.endpointOverride = Aws::String("localstack:4566"); + config.scheme = Aws::Http::Scheme::HTTP; + return std::make_unique( + config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); +#else return std::make_unique(config); +#endif } } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseManager.dev.cpp b/services/blob/src/DatabaseManager.dev.cpp deleted file mode 100644 index c454fc5ac..000000000 --- a/services/blob/src/DatabaseManager.dev.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include "DatabaseManager.h" -#include "Tools.h" - -#include -#include - -namespace comm { -namespace network { -namespace database { - -DatabaseManager &DatabaseManager::getInstance() { - static DatabaseManager instance; - return instance; -} - -void DatabaseManager::putBlobItem(const BlobItem &item) { - std::shared_ptr blobItem = std::make_shared( - item.getBlobHash(), item.getS3Path(), getCurrentTimestamp()); - this->dbSimulator.blob.insert(item.getBlobHash(), std::move(blobItem)); -} - -std::shared_ptr -DatabaseManager::findBlobItem(const std::string &blobHash) { - if (this->dbSimulator.blob.find(blobHash) == this->dbSimulator.blob.end()) { - return nullptr; - } - return this->dbSimulator.blob.at(blobHash); -} - -void DatabaseManager::removeBlobItem(const std::string &blobHash) { - this->dbSimulator.blob.erase(blobHash); -} - -void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) { - this->dbSimulator.reverseIndex.insert( - item.getHolder(), - std::move(std::make_shared( - item.getHolder(), item.getBlobHash()))); -} - -std::shared_ptr -DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) { - if (this->dbSimulator.reverseIndex.find(holder) == - this->dbSimulator.reverseIndex.end()) { - return nullptr; - } - return this->dbSimulator.reverseIndex.at(holder); -} - -std::vector> -DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) { - std::vector> items = {}; - for (auto it = this->dbSimulator.reverseIndex.begin(); - it != this->dbSimulator.reverseIndex.end(); - ++it) { - if (it->second->getBlobHash() == blobHash) { - items.push_back(it->second); - } - } - return items; -} - -bool DatabaseManager::removeReverseIndexItem(const std::string &holder) { - return this->dbSimulator.reverseIndex.erase(holder); -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/blob/src/DatabaseManager.h b/services/blob/src/DatabaseManager.h index fbd80bc4a..ff527fd8c 100644 --- a/services/blob/src/DatabaseManager.h +++ b/services/blob/src/DatabaseManager.h @@ -1,75 +1,68 @@ #pragma once #include "AwsTools.h" #include "DatabaseEntitiesTools.h" #include #include #include #include #include #include #include #include #include -#ifdef COMM_SERVICES_DEV_MODE -#include "DatabaseSimulator.h" -#endif - namespace comm { namespace network { namespace database { // this class should be thread-safe in case any shared resources appear class DatabaseManager { -#ifdef COMM_SERVICES_DEV_MODE - DatabaseSimulator dbSimulator; -#endif void innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request); template std::shared_ptr innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); public: static DatabaseManager &getInstance(); void putBlobItem(const BlobItem &item); std::shared_ptr findBlobItem(const std::string &blobHash); void removeBlobItem(const std::string &blobHash); void putReverseIndexItem(const ReverseIndexItem &item); std::shared_ptr findReverseIndexItemByHolder(const std::string &holder); std::vector> findReverseIndexItemsByHash(const std::string &blobHash); bool removeReverseIndexItem(const std::string &holder); }; template std::shared_ptr DatabaseManager::innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request) { std::shared_ptr item = createItemByType(); request.SetTableName(item->getTableName()); const Aws::DynamoDB::Model::GetItemOutcome &outcome = getDynamoDBClient()->GetItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); if (!outcomeItem.size()) { return nullptr; } item->assignItemFromDatabase(outcomeItem); return std::move(item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/blob/src/MultiPartUploader.dev.cpp b/services/blob/src/MultiPartUploader.dev.cpp deleted file mode 100644 index 292e525de..000000000 --- a/services/blob/src/MultiPartUploader.dev.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "MultiPartUploader.h" -#include "AwsS3Bucket.h" -#include "DevTools.h" -#include "Tools.h" - -#include - -namespace comm { -namespace network { - -std::unique_ptr bucket; - -MultiPartUploader::MultiPartUploader( - std::shared_ptr client, - const std::string bucketName, - const std::string objectName) - : client(nullptr), bucketName(bucketName), objectName(objectName) { - bucket->writeObject(createCommPath(this->objectName), ""); -} - -void MultiPartUploader::addPart(const std::string &part) { - AwsS3Bucket(bucketName) - .appendToObject(createCommPath(this->objectName + "mpu"), part); - this->partsSizes.push_back(part.size()); - ++this->partNumber; -} - -void MultiPartUploader::finishUpload() { - AwsS3Bucket bucket(bucketName); - for (size_t i = 0; i < this->partsSizes.size() - 1; ++i) { - if (this->partsSizes.at(i) < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { - bucket.removeObject(createCommPath(this->objectName + "mpu")); - throw std::runtime_error("too small part detected"); - } - } - bucket.renameObject( - createCommPath(this->objectName + "mpu"), - createCommPath(this->objectName)); -} - -} // namespace network -} // namespace comm diff --git a/services/blob/test/MultiPartUploadTest.cpp b/services/blob/test/MultiPartUploadTest.cpp index e1a42d9ff..64ec9d934 100644 --- a/services/blob/test/MultiPartUploadTest.cpp +++ b/services/blob/test/MultiPartUploadTest.cpp @@ -1,77 +1,76 @@ #include #include "AwsS3Bucket.h" +#include "AwsTools.h" #include "MultiPartUploader.h" #include "TestTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class MultiPartUploadTest : public testing::Test { protected: std::shared_ptr s3Client; const std::string bucketName = "commapp-test"; std::unique_ptr bucket; virtual void SetUp() { Aws::InitAPI({}); - Aws::Client::ClientConfiguration config; - config.region = "us-east-2"; - s3Client = std::make_shared(config); + s3Client = std::move(getS3Client()); bucket = std::make_unique(bucketName); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateNByes(const size_t n) { std::string result; result.resize(n); memset((char *)result.data(), 'A', n); return result; } TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1)); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE)); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_THROW(bucket->getObjectData(objectName), invalid_argument_error); EXPECT_EQ( bucket->getObjectSize(objectName), AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3); bucket->removeObject(objectName); } TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_EQ(bucket->getObjectSize(objectName), 3); bucket->removeObject(objectName); }