diff --git a/services/blob/CMakeLists.txt b/services/blob/CMakeLists.txt --- a/services/blob/CMakeLists.txt +++ b/services/blob/CMakeLists.txt @@ -30,24 +30,11 @@ endif() file(GLOB GENERATED_CODE "./_generated/*.cc") -set(DEV_SOURCE_CODE "") -set(DEV_HEADERS_PATH "") if ($ENV{COMM_SERVICES_DEV_MODE} MATCHES 1) add_compile_definitions(COMM_SERVICES_DEV_MODE) - file(GLOB DEV_SOURCE_CODE "./dev/*.cpp" "./src/*.dev.cpp" "./src/**/*.dev.cpp") - set(DEV_HEADERS_PATH "./dev") endif() file(GLOB SOURCE_CODE "./src/*.cpp" "./src/**/*.cpp") -list(FILTER SOURCE_CODE EXCLUDE REGEX ".*.dev.cpp$") - -foreach (ITEM ${DEV_SOURCE_CODE}) - string(REPLACE "/" ";" SPLIT_ITEM ${ITEM}) - list(GET SPLIT_ITEM -1 FILE_FULL_NAME) - string(REPLACE ".dev.cpp" ".cpp" FILE_NAME ${FILE_FULL_NAME}) - list(FILTER SOURCE_CODE EXCLUDE REGEX ".*${FILE_NAME}$") - list(APPEND SOURCE_CODE "${ITEM}") -endforeach() include_directories( ./src @@ -56,7 +43,6 @@ ${FOLLY_INCLUDES} ./lib/double-conversion ${Boost_INCLUDE_DIR} - ${DEV_HEADERS_PATH} ) # SERVER diff --git a/services/blob/dev/DatabaseSimulator.h b/services/blob/dev/DatabaseSimulator.h deleted file mode 100644 --- a/services/blob/dev/DatabaseSimulator.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include "DatabaseEntitiesTools.h" - -#include - -#include -#include - -namespace comm { -namespace network { -namespace database { - -// thread-safe in-memory database -struct DatabaseSimulator { - // hash -> item - folly::ConcurrentHashMap> blob; - // holder -> item - folly::ConcurrentHashMap> - reverseIndex; -}; - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/blob/dev/DevTools.h b/services/blob/dev/DevTools.h deleted file mode 100644 --- a/services/blob/dev/DevTools.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include - -namespace comm { -namespace network { - -const std::string commFilesystemPath = "/tmp/comm"; - -std::string createCommPath(const std::string &path); - -} // namespace network -} // namespace comm diff --git a/services/blob/dev/DevTools.cpp b/services/blob/dev/DevTools.cpp deleted file mode 100644 --- a/services/blob/dev/DevTools.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "DevTools.h" - -namespace comm { -namespace network { - -std::string createCommPath(const std::string &path) { - if (path.substr(0, commFilesystemPath.size()) == commFilesystemPath) { - return path; - } - return commFilesystemPath + "/" + path; -} - -} // namespace network -} // namespace comm diff --git a/services/blob/src/AwsS3Bucket.dev.cpp b/services/blob/src/AwsS3Bucket.dev.cpp deleted file mode 100644 --- a/services/blob/src/AwsS3Bucket.dev.cpp +++ /dev/null @@ -1,109 +0,0 @@ -#include "AwsS3Bucket.h" -#include "DevTools.h" -#include "Tools.h" - -#include -#include -#include - -namespace comm { -namespace network { - -AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) { - std::filesystem::create_directories(commFilesystemPath); -} - -std::vector AwsS3Bucket::listObjects() const { - std::vector result; - for (const auto &entry : - std::filesystem::directory_iterator(commFilesystemPath)) { - result.push_back(entry.path()); - } - return result; -} - -bool AwsS3Bucket::isAvailable() const { - return std::filesystem::exists(commFilesystemPath); -} - -size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const { - return std::filesystem::file_size(createCommPath(objectName)); -} - -void AwsS3Bucket::renameObject( - const std::string ¤tName, - const std::string &newName) { - std::filesystem::rename(createCommPath(currentName), createCommPath(newName)); -} - -void AwsS3Bucket::writeObject( - const std::string &objectName, - const std::string &data) { - if (std::filesystem::exists(createCommPath(objectName))) { - this->clearObject(createCommPath(objectName)); - } - std::ofstream ofs(createCommPath(objectName)); - ofs << data; -} - -std::string AwsS3Bucket::getObjectData(const std::string &objectName) const { - std::ifstream ifs( - createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); - - std::ifstream::pos_type fileSize = ifs.tellg(); - ifs.seekg(0, std::ios::beg); - if (fileSize > GRPC_CHUNK_SIZE_LIMIT) { - throw invalid_argument_error(std::string( - "The file is too big(" + std::to_string(fileSize) + " bytes, max is " + - std::to_string(GRPC_CHUNK_SIZE_LIMIT) + - "bytes), please, use getObjectDataChunks")); - } - - std::string bytes; - bytes.resize(fileSize); - ifs.read((char *)bytes.data(), fileSize); - - return bytes; -} - -void AwsS3Bucket::getObjectDataChunks( - const std::string &objectName, - const std::function &callback, - const size_t chunkSize) const { - std::ifstream ifs( - createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); - - std::ifstream::pos_type fileSize = ifs.tellg(); - - size_t filePos = 0; - while (filePos < fileSize) { - const size_t nextSize = std::min(chunkSize, (size_t)fileSize - filePos); - ifs.seekg(filePos, std::ios::beg); - std::string bytes; - bytes.resize(nextSize); - ifs.read((char *)bytes.data(), nextSize); - filePos += bytes.size(); - callback(bytes); - } -} - -void AwsS3Bucket::appendToObject( - const std::string &objectName, - const std::string &data) { - std::ofstream ofs; - ofs.open(createCommPath(objectName), std::ios_base::app); - ofs << data; -} - -void AwsS3Bucket::clearObject(const std::string &objectName) { - std::filesystem::resize_file(createCommPath(objectName), 0); -} - -void AwsS3Bucket::removeObject(const std::string &objectName) { - std::filesystem::remove(createCommPath(objectName)); -} - -} // namespace network -} // namespace comm diff --git a/services/blob/src/AwsTools.cpp b/services/blob/src/AwsTools.cpp --- a/services/blob/src/AwsTools.cpp +++ b/services/blob/src/AwsTools.cpp @@ -28,13 +28,24 @@ std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; +#ifdef COMM_SERVICES_DEV_MODE + config.endpointOverride = Aws::String("localstack:4566"); + config.scheme = Aws::Http::Scheme::HTTP; +#endif return std::make_unique(config); } std::unique_ptr getS3Client() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; +#ifdef COMM_SERVICES_DEV_MODE + config.endpointOverride = Aws::String("localstack:4566"); + config.scheme = Aws::Http::Scheme::HTTP; + return std::make_unique( + config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); +#else return std::make_unique(config); +#endif } } // namespace network diff --git a/services/blob/src/DatabaseManager.h b/services/blob/src/DatabaseManager.h --- a/services/blob/src/DatabaseManager.h +++ b/services/blob/src/DatabaseManager.h @@ -14,19 +14,12 @@ #include #include -#ifdef COMM_SERVICES_DEV_MODE -#include "DatabaseSimulator.h" -#endif - namespace comm { namespace network { namespace database { // this class should be thread-safe in case any shared resources appear class DatabaseManager { -#ifdef COMM_SERVICES_DEV_MODE - DatabaseSimulator dbSimulator; -#endif void innerPutItem( std::shared_ptr item, diff --git a/services/blob/src/DatabaseManager.dev.cpp b/services/blob/src/DatabaseManager.dev.cpp deleted file mode 100644 --- a/services/blob/src/DatabaseManager.dev.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include "DatabaseManager.h" -#include "Tools.h" - -#include -#include - -namespace comm { -namespace network { -namespace database { - -DatabaseManager &DatabaseManager::getInstance() { - static DatabaseManager instance; - return instance; -} - -void DatabaseManager::putBlobItem(const BlobItem &item) { - std::shared_ptr blobItem = std::make_shared( - item.getBlobHash(), item.getS3Path(), getCurrentTimestamp()); - this->dbSimulator.blob.insert(item.getBlobHash(), std::move(blobItem)); -} - -std::shared_ptr -DatabaseManager::findBlobItem(const std::string &blobHash) { - if (this->dbSimulator.blob.find(blobHash) == this->dbSimulator.blob.end()) { - return nullptr; - } - return this->dbSimulator.blob.at(blobHash); -} - -void DatabaseManager::removeBlobItem(const std::string &blobHash) { - this->dbSimulator.blob.erase(blobHash); -} - -void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) { - this->dbSimulator.reverseIndex.insert( - item.getHolder(), - std::move(std::make_shared( - item.getHolder(), item.getBlobHash()))); -} - -std::shared_ptr -DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) { - if (this->dbSimulator.reverseIndex.find(holder) == - this->dbSimulator.reverseIndex.end()) { - return nullptr; - } - return this->dbSimulator.reverseIndex.at(holder); -} - -std::vector> -DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) { - std::vector> items = {}; - for (auto it = this->dbSimulator.reverseIndex.begin(); - it != this->dbSimulator.reverseIndex.end(); - ++it) { - if (it->second->getBlobHash() == blobHash) { - items.push_back(it->second); - } - } - return items; -} - -bool DatabaseManager::removeReverseIndexItem(const std::string &holder) { - return this->dbSimulator.reverseIndex.erase(holder); -} - -} // namespace database -} // namespace network -} // namespace comm diff --git a/services/blob/src/MultiPartUploader.dev.cpp b/services/blob/src/MultiPartUploader.dev.cpp deleted file mode 100644 --- a/services/blob/src/MultiPartUploader.dev.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "MultiPartUploader.h" -#include "AwsS3Bucket.h" -#include "DevTools.h" -#include "Tools.h" - -#include - -namespace comm { -namespace network { - -std::unique_ptr bucket; - -MultiPartUploader::MultiPartUploader( - std::shared_ptr client, - const std::string bucketName, - const std::string objectName) - : client(nullptr), bucketName(bucketName), objectName(objectName) { - bucket->writeObject(createCommPath(this->objectName), ""); -} - -void MultiPartUploader::addPart(const std::string &part) { - AwsS3Bucket(bucketName) - .appendToObject(createCommPath(this->objectName + "mpu"), part); - this->partsSizes.push_back(part.size()); - ++this->partNumber; -} - -void MultiPartUploader::finishUpload() { - AwsS3Bucket bucket(bucketName); - for (size_t i = 0; i < this->partsSizes.size() - 1; ++i) { - if (this->partsSizes.at(i) < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { - bucket.removeObject(createCommPath(this->objectName + "mpu")); - throw std::runtime_error("too small part detected"); - } - } - bucket.renameObject( - createCommPath(this->objectName + "mpu"), - createCommPath(this->objectName)); -} - -} // namespace network -} // namespace comm diff --git a/services/blob/test/MultiPartUploadTest.cpp b/services/blob/test/MultiPartUploadTest.cpp --- a/services/blob/test/MultiPartUploadTest.cpp +++ b/services/blob/test/MultiPartUploadTest.cpp @@ -1,6 +1,7 @@ #include #include "AwsS3Bucket.h" +#include "AwsTools.h" #include "MultiPartUploader.h" #include "TestTools.h" #include "Tools.h" @@ -20,9 +21,7 @@ virtual void SetUp() { Aws::InitAPI({}); - Aws::Client::ClientConfiguration config; - config.region = "us-east-2"; - s3Client = std::make_shared(config); + s3Client = std::move(getS3Client()); bucket = std::make_unique(bucketName); } diff --git a/services/docker-compose.yml b/services/docker-compose.yml --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -49,6 +49,8 @@ - "${COMM_SERVICES_PORT_BLOB}:50051" volumes: - $HOME/.aws/credentials:/root/.aws/credentials:ro + networks: + - services-net # localstack localstack: image: localstack/localstack