diff --git a/services/backup/src/Constants.h b/services/backup/src/Constants.h index 3f57908b0..5aea75fae 100644 --- a/services/backup/src/Constants.h +++ b/services/backup/src/Constants.h @@ -1,39 +1,22 @@ #pragma once #include "Tools.h" #include namespace comm { namespace network { -// 4MB limit -// WARNING: use keeping in mind that grpc adds its own headers to messages -// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md -// so the message that actually is being sent over the network looks like this -// [Compressed-Flag] [Message-Length] [Message] -// [Compressed-Flag] 1 byte - added by grpc -// [Message-Length] 4 bytes - added by grpc -// [Message] N bytes - actual data -// so for every message we get 5 additional bytes of data -// as mentioned here -// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 -// grpc stream may contain more than one message -const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; -const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5; - -const std::string AWS_REGION = "us-east-2"; - const std::string LOG_TABLE_NAME = decorateTableName("backup-service-log"); const std::string BACKUP_TABLE_NAME = decorateTableName("backup-service-backup"); // This has to be smaller than GRPC_CHUNK_SIZE_LIMIT because we need to // recognize if we may receive multiple chunks or just one. If it was larger // than the chunk limit, once we get the amount of data of size equal to the // limit, we wouldn't know if we should put this in the database right away or // wait for more data. const size_t LOG_DATA_SIZE_DATABASE_LIMIT = 1 * 1024 * 1024; } // namespace network } // namespace comm diff --git a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h index f487d2316..7fffbc886 100644 --- a/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h +++ b/services/backup/src/Reactors/client/blob/BlobPutClientReactor.h @@ -1,53 +1,54 @@ #pragma once #include "Constants.h" +#include "GlobalConstants.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include "ClientBidiReactorBase.h" #include #include #include #include #include namespace comm { namespace network { namespace reactor { class BlobPutClientReactor : public ClientBidiReactorBase { enum class State { SEND_HOLDER = 0, SEND_HASH = 1, SEND_CHUNKS = 2, }; State state = State::SEND_HOLDER; const std::string hash; const std::string holder; size_t currentDataSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; folly::MPMCQueue dataChunks; std::condition_variable *terminationNotifier; public: BlobPutClientReactor( const std::string &holder, const std::string &hash, std::condition_variable *terminationNotifier); void scheduleSendingDataChunk(std::unique_ptr dataChunk); std::unique_ptr prepareRequest( blob::PutRequest &request, std::shared_ptr previousResponse) override; void doneCallback() override; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/AwsS3Bucket.cpp b/services/blob/src/AwsS3Bucket.cpp index e232523bd..141745974 100644 --- a/services/blob/src/AwsS3Bucket.cpp +++ b/services/blob/src/AwsS3Bucket.cpp @@ -1,223 +1,224 @@ #include "AwsS3Bucket.h" #include "Constants.h" +#include "GlobalConstants.h" #include "MultiPartUploader.h" #include "S3Tools.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include namespace comm { namespace network { AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) { } std::vector AwsS3Bucket::listObjects() const { Aws::S3::Model::ListObjectsRequest request; request.SetBucket(this->name); std::vector result; Aws::S3::Model::ListObjectsOutcome outcome = getS3Client()->ListObjects(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector objects = outcome.GetResult().GetContents(); for (Aws::S3::Model::Object &object : objects) { result.push_back(object.GetKey()); } return result; } bool AwsS3Bucket::isAvailable() const { Aws::S3::Model::HeadBucketRequest headRequest; headRequest.SetBucket(this->name); Aws::S3::Model::HeadBucketOutcome outcome = getS3Client()->HeadBucket(headRequest); return outcome.IsSuccess(); } size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const { Aws::S3::Model::HeadObjectRequest headRequest; headRequest.SetBucket(this->name); headRequest.SetKey(objectName); Aws::S3::Model::HeadObjectOutcome headOutcome = getS3Client()->HeadObject(headRequest); if (!headOutcome.IsSuccess()) { throw std::runtime_error(headOutcome.GetError().GetMessage()); } return headOutcome.GetResultWithOwnership().GetContentLength(); } void AwsS3Bucket::renameObject( const std::string ¤tName, const std::string &newName) { Aws::S3::Model::CopyObjectRequest copyRequest; copyRequest.SetCopySource(this->name + "/" + currentName); copyRequest.SetKey(newName); copyRequest.SetBucket(this->name); Aws::S3::Model::CopyObjectOutcome copyOutcome = getS3Client()->CopyObject(copyRequest); if (!copyOutcome.IsSuccess()) { throw std::runtime_error(copyOutcome.GetError().GetMessage()); } this->removeObject(currentName); } void AwsS3Bucket::writeObject( const std::string &objectName, const std::string &data) { // we don't have to handle multiple write here because the GRPC limit is 4MB // and minimum size of data to perform multipart upload is 5MB Aws::S3::Model::PutObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); std::shared_ptr body = std::shared_ptr( new boost::interprocess::bufferstream((char *)data.data(), data.size())); request.SetBody(body); Aws::S3::Model::PutObjectOutcome outcome = getS3Client()->PutObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } std::string AwsS3Bucket::getObjectData(const std::string &objectName) const { Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); const size_t size = this->getObjectSize(objectName); if (size > GRPC_CHUNK_SIZE_LIMIT) { throw invalid_argument_error(std::string( "The file is too big(" + std::to_string(size) + " bytes, max is " + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + "bytes), please, use getObjectDataChunks")); } Aws::S3::Model::GetObjectOutcome outcome = getS3Client()->GetObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); std::string cpy = result; return result; } void AwsS3Bucket::getObjectDataChunks( const std::string &objectName, const std::function &callback, const size_t chunkSize) const { const size_t fileSize = this->getObjectSize(objectName); if (fileSize == 0) { return; } Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); for (size_t offset = 0; offset < fileSize; offset += chunkSize) { const size_t nextSize = std::min(chunkSize, fileSize - offset); std::string range = "bytes=" + std::to_string(offset) + "-" + std::to_string(offset + nextSize); request.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(request); if (!getOutcome.IsSuccess()) { throw std::runtime_error(getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); result.resize(nextSize); callback(result); } } void AwsS3Bucket::appendToObject( const std::string &objectName, const std::string &data) { const size_t objectSize = this->getObjectSize(objectName); if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { std::string currentData = this->getObjectData(objectName); currentData += data; this->writeObject(objectName, currentData); return; } size_t currentSize = 0; MultiPartUploader uploader( getS3Client(), this->name, objectName + "-multipart"); std::function callback = [&uploader, &data, ¤tSize, objectSize](const std::string &chunk) { currentSize += chunk.size(); if (currentSize < objectSize) { uploader.addPart(chunk); } else if (currentSize == objectSize) { uploader.addPart(std::string(chunk + data)); } else { throw std::runtime_error( "size of chunks exceeds the size of the object"); } }; this->getObjectDataChunks( objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE); uploader.finishUpload(); // this will overwrite the target file this->renameObject(objectName + "-multipart", objectName); const size_t newSize = this->getObjectSize(objectName); if (objectSize + data.size() != newSize) { throw std::runtime_error( "append to object " + objectName + " has been performed but the final sizes don't " "match, the size is now [" + std::to_string(newSize) + "] but should be [" + std::to_string(objectSize + data.size()) + "]"); } } void AwsS3Bucket::clearObject(const std::string &objectName) { this->writeObject(objectName, ""); } void AwsS3Bucket::removeObject(const std::string &objectName) { Aws::S3::Model::DeleteObjectRequest deleteRequest; deleteRequest.SetBucket(this->name); deleteRequest.SetKey(objectName); Aws::S3::Model::DeleteObjectOutcome deleteOutcome = getS3Client()->DeleteObject(deleteRequest); if (!deleteOutcome.IsSuccess()) { throw std::runtime_error(deleteOutcome.GetError().GetMessage()); } } } // namespace network } // namespace comm diff --git a/services/blob/src/Constants.h b/services/blob/src/Constants.h index daaeb5251..947feebc2 100644 --- a/services/blob/src/Constants.h +++ b/services/blob/src/Constants.h @@ -1,35 +1,20 @@ #pragma once #include "Tools.h" #include namespace comm { namespace network { -// 4MB limit -// WARNING: use keeping in mind that grpc adds its own headers to messages -// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md -// so the message that actually is being sent over the network looks like this -// [Compressed-Flag] [Message-Length] [Message] -// [Compressed-Flag] 1 byte - added by grpc -// [Message-Length] 4 bytes - added by grpc -// [Message] N bytes - actual data -// so for every message we get 5 additional bytes of data -// as mentioned here -// https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 -// grpc stream may contain more than one message -const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; -const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5; // 5MB limit const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024; -const std::string AWS_REGION = "us-east-2"; const std::string BLOB_BUCKET_NAME = "commapp-blob"; const std::string BLOB_TABLE_NAME = decorateTableName("blob-service-blob"); const std::string REVERSE_INDEX_TABLE_NAME = decorateTableName("blob-service-reverse-index"); } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h index bf15bfcf0..1a4689774 100644 --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -1,88 +1,89 @@ #pragma once +#include "GlobalConstants.h" #include "S3Tools.h" #include "ServerWriteReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class GetReactor : public ServerWriteReactorBase { size_t offset = 0; size_t fileSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; database::S3Path s3Path; Aws::S3::Model::GetObjectRequest getRequest; public: using ServerWriteReactorBase:: ServerWriteReactorBase; std::unique_ptr writeResponse(blob::GetResponse *response) override { if (this->offset >= this->fileSize) { return std::make_unique(grpc::Status::OK); } const size_t nextSize = std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(this->getRequest); if (!getOutcome.IsSuccess()) { return std::make_unique( grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); response->set_datachunk(result); this->offset += nextSize; return nullptr; } void initialize() override { this->s3Path = findS3Path(this->request.holder()); this->fileSize = getBucket(s3Path.getBucketName()).getObjectSize(s3Path.getObjectName()); this->getRequest.SetBucket(this->s3Path.getBucketName()); this->getRequest.SetKey(this->s3Path.getObjectName()); AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); if (!bucket.isAvailable()) { throw std::runtime_error( "bucket [" + this->s3Path.getBucketName() + "] not available"); } const size_t fileSize = bucket.getObjectSize(this->s3Path.getObjectName()); if (this->fileSize == 0) { throw std::runtime_error("object empty"); } }; void doneCallback() override{}; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/S3Tools.cpp b/services/blob/src/S3Tools.cpp index 222c6197b..96dc14a6e 100644 --- a/services/blob/src/S3Tools.cpp +++ b/services/blob/src/S3Tools.cpp @@ -1,45 +1,46 @@ #include "S3Tools.h" #include "Constants.h" +#include "GlobalConstants.h" #include "Tools.h" #include #include namespace comm { namespace network { AwsS3Bucket getBucket(const std::string &bucketName) { return AwsS3Bucket(bucketName); } std::vector listBuckets() { Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets(); std::vector result; if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector buckets = outcome.GetResult().GetBuckets(); for (Aws::S3::Model::Bucket &bucket : buckets) { result.push_back(bucket.GetName()); } return result; } std::unique_ptr getS3Client() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; if (isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; return std::make_unique( config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/blob/src/Tools.cpp b/services/blob/src/Tools.cpp index f6bb36576..83e0c585a 100644 --- a/services/blob/src/Tools.cpp +++ b/services/blob/src/Tools.cpp @@ -1,94 +1,95 @@ #include "Tools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" +#include "GlobalConstants.h" #include "S3Tools.h" #include #include #include #include #include namespace comm { namespace network { database::S3Path generateS3Path(const std::string &bucketName, const std::string &blobHash) { return database::S3Path(bucketName, blobHash); } std::string computeHashForFile(const database::S3Path &s3Path) { SHA512_CTX ctx; SHA512_Init(&ctx); const std::function callback = [&ctx](const std::string &chunk) { SHA512_Update(&ctx, chunk.data(), chunk.size()); }; getBucket(s3Path.getBucketName()) .getObjectDataChunks( s3Path.getObjectName(), callback, GRPC_CHUNK_SIZE_LIMIT); unsigned char hash[SHA512_DIGEST_LENGTH]; SHA512_Final(hash, &ctx); std::ostringstream hashStream; for (int i = 0; i < SHA512_DIGEST_LENGTH; i++) { hashStream << std::hex << std::setfill('0') << std::setw(2) << std::nouppercase << (int)hash[i]; } return hashStream.str(); } database::S3Path findS3Path(const std::string &holder) { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error( "provided holder: [" + holder + "] has not been found in the database"); } return findS3Path(*reverseIndexItem); } database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem) { std::shared_ptr blobItem = database::DatabaseManager::getInstance().findBlobItem( reverseIndexItem.getBlobHash()); if (blobItem == nullptr) { throw std::runtime_error( "no blob found for blobHash: [" + reverseIndexItem.getBlobHash() + "]"); } database::S3Path result = blobItem->getS3Path(); return result; } uint64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } std::string decorateTableName(const std::string &baseName) { std::string suffix = ""; if (std::getenv("COMM_TEST_SERVICES") != nullptr && std::string(std::getenv("COMM_TEST_SERVICES")) == "1") { suffix = "-test"; } return baseName + suffix; } bool isDevMode() { if (std::getenv("COMM_SERVICES_DEV_MODE") == nullptr) { return false; } return std::string(std::getenv("COMM_SERVICES_DEV_MODE")) == "1"; } } // namespace network } // namespace comm diff --git a/services/lib/src/DynamoDBTools.cpp b/services/lib/src/DynamoDBTools.cpp index 83f48c699..73cfa7db5 100644 --- a/services/lib/src/DynamoDBTools.cpp +++ b/services/lib/src/DynamoDBTools.cpp @@ -1,18 +1,19 @@ #include "DynamoDBTools.h" #include "Constants.h" +#include "GlobalConstants.h" namespace comm { namespace network { std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; if (isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/blob/src/Constants.h b/services/lib/src/GlobalConstants.h similarity index 72% copy from services/blob/src/Constants.h copy to services/lib/src/GlobalConstants.h index daaeb5251..cb5f2328d 100644 --- a/services/blob/src/Constants.h +++ b/services/lib/src/GlobalConstants.h @@ -1,35 +1,26 @@ #pragma once -#include "Tools.h" - #include namespace comm { namespace network { // 4MB limit // WARNING: use keeping in mind that grpc adds its own headers to messages // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // so the message that actually is being sent over the network looks like this // [Compressed-Flag] [Message-Length] [Message] // [Compressed-Flag] 1 byte - added by grpc // [Message-Length] 4 bytes - added by grpc // [Message] N bytes - actual data // so for every message we get 5 additional bytes of data // as mentioned here // https://github.com/grpc/grpc/issues/15734#issuecomment-396962671 // grpc stream may contain more than one message const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; const size_t GRPC_METADATA_SIZE_PER_MESSAGE = 5; -// 5MB limit -const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024; const std::string AWS_REGION = "us-east-2"; -const std::string BLOB_BUCKET_NAME = "commapp-blob"; - -const std::string BLOB_TABLE_NAME = decorateTableName("blob-service-blob"); -const std::string REVERSE_INDEX_TABLE_NAME = - decorateTableName("blob-service-reverse-index"); } // namespace network } // namespace comm