diff --git a/services/backup/src/DatabaseEntities/LogItem.cpp b/services/backup/src/DatabaseEntities/LogItem.cpp index da0dbae26..861d619b5 100644 --- a/services/backup/src/DatabaseEntities/LogItem.cpp +++ b/services/backup/src/DatabaseEntities/LogItem.cpp @@ -1,110 +1,109 @@ #include "LogItem.h" -#include "AwsTools.h" #include "Constants.h" #include namespace comm { namespace network { namespace database { const std::string LogItem::FIELD_BACKUP_ID = "backupID"; const std::string LogItem::FIELD_LOG_ID = "logID"; const std::string LogItem::FIELD_PERSISTED_IN_BLOB = "persistedInBlob"; const std::string LogItem::FIELD_VALUE = "value"; const std::string LogItem::FIELD_ATTACHMENT_HOLDERS = "attachmentHolders"; std::string LogItem::tableName = LOG_TABLE_NAME; LogItem::LogItem( const std::string backupID, const std::string logID, const bool persistedInBlob, const std::string value, std::vector attachmentHolders) : backupID(backupID), logID(logID), persistedInBlob(persistedInBlob), value(value), attachmentHolders(attachmentHolders) { this->validate(); } LogItem::LogItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void LogItem::validate() const { if (!this->backupID.size()) { throw std::runtime_error("backupID empty"); } if (!this->logID.size()) { throw std::runtime_error("logID empty"); } if (!this->value.size()) { throw std::runtime_error("value empty"); } if (!this->persistedInBlob && this->value.size() > LOG_DATA_SIZE_DATABASE_LIMIT) { throw std::runtime_error( "the value of this log is too big to be stored in the database, it " "should be stored in the blob instead"); } } void LogItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { try { this->backupID = itemFromDB.at(LogItem::FIELD_BACKUP_ID).GetS(); this->logID = itemFromDB.at(LogItem::FIELD_LOG_ID).GetS(); this->persistedInBlob = std::stoi( std::string(itemFromDB.at(LogItem::FIELD_PERSISTED_IN_BLOB).GetS()) .c_str()); this->value = itemFromDB.at(LogItem::FIELD_VALUE).GetS(); auto attachmentsHolders = itemFromDB.find(LogItem::FIELD_ATTACHMENT_HOLDERS); if (attachmentsHolders != itemFromDB.end()) { this->attachmentHolders = attachmentsHolders->second.GetSS(); } } catch (std::logic_error &e) { throw std::runtime_error( "invalid log item provided, " + std::string(e.what())); } this->validate(); } std::string LogItem::getTableName() const { return LogItem::tableName; } PrimaryKey LogItem::getPrimaryKey() const { return PrimaryKey(LogItem::FIELD_BACKUP_ID, LogItem::FIELD_LOG_ID); } PrimaryKeyValue LogItem::getPrimaryKeyValue() const { return PrimaryKeyValue(this->backupID, this->logID); } std::string LogItem::getBackupID() const { return this->backupID; } std::string LogItem::getLogID() const { return this->logID; } bool LogItem::getPersistedInBlob() const { return this->persistedInBlob; } std::string LogItem::getValue() const { return this->value; } std::vector LogItem::getAttachmentHolders() const { return this->attachmentHolders; } } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/DatabaseManager.h b/services/backup/src/DatabaseManager.h index baa7b897a..84187853d 100644 --- a/services/backup/src/DatabaseManager.h +++ b/services/backup/src/DatabaseManager.h @@ -1,64 +1,64 @@ #pragma once -#include "AwsTools.h" #include "DatabaseEntitiesTools.h" +#include "DynamoDBTools.h" #include #include #include #include #include #include namespace comm { namespace network { namespace database { // this class should be thread-safe in case any shared resources appear class DatabaseManager { void innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request); template std::shared_ptr innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); public: static DatabaseManager &getInstance(); void putBackupItem(const BackupItem &item); std::shared_ptr findBackupItem(const std::string &userID, const std::string &backupID); std::shared_ptr findLastBackupItem(const std::string &userID); void removeBackupItem(std::shared_ptr item); void putLogItem(const LogItem &item); std::vector> findLogItemsForBackup(const std::string &backupID); void removeLogItem(std::shared_ptr item); }; template std::shared_ptr DatabaseManager::innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request) { std::shared_ptr item = createItemByType(); request.SetTableName(item->getTableName()); const Aws::DynamoDB::Model::GetItemOutcome &outcome = getDynamoDBClient()->GetItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); if (!outcomeItem.size()) { return nullptr; } item->assignItemFromDatabase(outcomeItem); return item; } } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/AwsTools.cpp b/services/backup/src/DynamoDBTools.cpp similarity index 88% copy from services/backup/src/AwsTools.cpp copy to services/backup/src/DynamoDBTools.cpp index 04b553381..83f48c699 100644 --- a/services/backup/src/AwsTools.cpp +++ b/services/backup/src/DynamoDBTools.cpp @@ -1,21 +1,18 @@ -#include "AwsTools.h" +#include "DynamoDBTools.h" #include "Constants.h" -#include "Tools.h" - -#include namespace comm { namespace network { std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; if (isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/backup/src/AwsTools.h b/services/backup/src/DynamoDBTools.h similarity index 91% copy from services/backup/src/AwsTools.h copy to services/backup/src/DynamoDBTools.h index ff2ce3d45..f32a84372 100644 --- a/services/backup/src/AwsTools.h +++ b/services/backup/src/DynamoDBTools.h @@ -1,14 +1,16 @@ #pragma once +#include "Constants.h" + #include #include #include namespace comm { namespace network { std::unique_ptr getDynamoDBClient(); } // namespace network } // namespace comm diff --git a/services/blob/src/AwsS3Bucket.cpp b/services/blob/src/AwsS3Bucket.cpp index c22cbda00..e232523bd 100644 --- a/services/blob/src/AwsS3Bucket.cpp +++ b/services/blob/src/AwsS3Bucket.cpp @@ -1,223 +1,223 @@ #include "AwsS3Bucket.h" -#include "AwsTools.h" #include "Constants.h" #include "MultiPartUploader.h" +#include "S3Tools.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include namespace comm { namespace network { AwsS3Bucket::AwsS3Bucket(const std::string name) : name(name) { } std::vector AwsS3Bucket::listObjects() const { Aws::S3::Model::ListObjectsRequest request; request.SetBucket(this->name); std::vector result; Aws::S3::Model::ListObjectsOutcome outcome = getS3Client()->ListObjects(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector objects = outcome.GetResult().GetContents(); for (Aws::S3::Model::Object &object : objects) { result.push_back(object.GetKey()); } return result; } bool AwsS3Bucket::isAvailable() const { Aws::S3::Model::HeadBucketRequest headRequest; headRequest.SetBucket(this->name); Aws::S3::Model::HeadBucketOutcome outcome = getS3Client()->HeadBucket(headRequest); return outcome.IsSuccess(); } size_t AwsS3Bucket::getObjectSize(const std::string &objectName) const { Aws::S3::Model::HeadObjectRequest headRequest; headRequest.SetBucket(this->name); headRequest.SetKey(objectName); Aws::S3::Model::HeadObjectOutcome headOutcome = getS3Client()->HeadObject(headRequest); if (!headOutcome.IsSuccess()) { throw std::runtime_error(headOutcome.GetError().GetMessage()); } return headOutcome.GetResultWithOwnership().GetContentLength(); } void AwsS3Bucket::renameObject( const std::string ¤tName, const std::string &newName) { Aws::S3::Model::CopyObjectRequest copyRequest; copyRequest.SetCopySource(this->name + "/" + currentName); copyRequest.SetKey(newName); copyRequest.SetBucket(this->name); Aws::S3::Model::CopyObjectOutcome copyOutcome = getS3Client()->CopyObject(copyRequest); if (!copyOutcome.IsSuccess()) { throw std::runtime_error(copyOutcome.GetError().GetMessage()); } this->removeObject(currentName); } void AwsS3Bucket::writeObject( const std::string &objectName, const std::string &data) { // we don't have to handle multiple write here because the GRPC limit is 4MB // and minimum size of data to perform multipart upload is 5MB Aws::S3::Model::PutObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); std::shared_ptr body = std::shared_ptr( new boost::interprocess::bufferstream((char *)data.data(), data.size())); request.SetBody(body); Aws::S3::Model::PutObjectOutcome outcome = getS3Client()->PutObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } std::string AwsS3Bucket::getObjectData(const std::string &objectName) const { Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); const size_t size = this->getObjectSize(objectName); if (size > GRPC_CHUNK_SIZE_LIMIT) { throw invalid_argument_error(std::string( "The file is too big(" + std::to_string(size) + " bytes, max is " + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + "bytes), please, use getObjectDataChunks")); } Aws::S3::Model::GetObjectOutcome outcome = getS3Client()->GetObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); std::string cpy = result; return result; } void AwsS3Bucket::getObjectDataChunks( const std::string &objectName, const std::function &callback, const size_t chunkSize) const { const size_t fileSize = this->getObjectSize(objectName); if (fileSize == 0) { return; } Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); for (size_t offset = 0; offset < fileSize; offset += chunkSize) { const size_t nextSize = std::min(chunkSize, fileSize - offset); std::string range = "bytes=" + std::to_string(offset) + "-" + std::to_string(offset + nextSize); request.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(request); if (!getOutcome.IsSuccess()) { throw std::runtime_error(getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); result.resize(nextSize); callback(result); } } void AwsS3Bucket::appendToObject( const std::string &objectName, const std::string &data) { const size_t objectSize = this->getObjectSize(objectName); if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { std::string currentData = this->getObjectData(objectName); currentData += data; this->writeObject(objectName, currentData); return; } size_t currentSize = 0; MultiPartUploader uploader( getS3Client(), this->name, objectName + "-multipart"); std::function callback = [&uploader, &data, ¤tSize, objectSize](const std::string &chunk) { currentSize += chunk.size(); if (currentSize < objectSize) { uploader.addPart(chunk); } else if (currentSize == objectSize) { uploader.addPart(std::string(chunk + data)); } else { throw std::runtime_error( "size of chunks exceeds the size of the object"); } }; this->getObjectDataChunks( objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE); uploader.finishUpload(); // this will overwrite the target file this->renameObject(objectName + "-multipart", objectName); const size_t newSize = this->getObjectSize(objectName); if (objectSize + data.size() != newSize) { throw std::runtime_error( "append to object " + objectName + " has been performed but the final sizes don't " "match, the size is now [" + std::to_string(newSize) + "] but should be [" + std::to_string(objectSize + data.size()) + "]"); } } void AwsS3Bucket::clearObject(const std::string &objectName) { this->writeObject(objectName, ""); } void AwsS3Bucket::removeObject(const std::string &objectName) { Aws::S3::Model::DeleteObjectRequest deleteRequest; deleteRequest.SetBucket(this->name); deleteRequest.SetKey(objectName); Aws::S3::Model::DeleteObjectOutcome deleteOutcome = getS3Client()->DeleteObject(deleteRequest); if (!deleteOutcome.IsSuccess()) { throw std::runtime_error(deleteOutcome.GetError().GetMessage()); } } } // namespace network } // namespace comm diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp index 7c0321aa6..f2c35bae4 100644 --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -1,107 +1,107 @@ #include "BlobServiceImpl.h" -#include "AwsTools.h" #include "Constants.h" #include "DatabaseManager.h" #include "MultiPartUploader.h" +#include "S3Tools.h" #include "Tools.h" #include "GetReactor.h" #include "PutReactor.h" #include #include namespace comm { namespace network { BlobServiceImpl::BlobServiceImpl() { Aws::InitAPI({}); if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) { throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available"); } } BlobServiceImpl::~BlobServiceImpl() { Aws::ShutdownAPI({}); } void BlobServiceImpl::verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path) { const std::string computedBlobHash = computeHashForFile(s3Path); if (expectedBlobHash != computedBlobHash) { throw std::runtime_error( "blob hash mismatch, expected: [" + expectedBlobHash + "], computed: [" + computedBlobHash + "]"); } } void BlobServiceImpl::assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue) { if (!lvalue.empty()) { throw std::runtime_error( "multiple assignment for variable " + label + " is not allowed"); } lvalue = rvalue; } grpc::ServerBidiReactor * BlobServiceImpl::Put(grpc::CallbackServerContext *context) { return new reactor::PutReactor(); } grpc::ServerWriteReactor *BlobServiceImpl::Get( grpc::CallbackServerContext *context, const blob::GetRequest *request) { reactor::GetReactor *gr = new reactor::GetReactor(request); gr->start(); return gr; } grpc::ServerUnaryReactor *BlobServiceImpl::Remove( grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error("no item found for holder: " + holder); } // TODO handle cleanup here properly // for now the object's being removed right away const std::string blobHash = reverseIndexItem->getBlobHash(); if (!database::DatabaseManager::getInstance().removeReverseIndexItem( holder)) { throw std::runtime_error( "could not remove an item for holder " + holder + "(probably does not exist)"); } if (database::DatabaseManager::getInstance() .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash()) .size() == 0) { database::S3Path s3Path = findS3Path(*reverseIndexItem); AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); bucket.removeObject(s3Path.getObjectName()); database::DatabaseManager::getInstance().removeBlobItem(blobHash); } } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseEntities/BlobItem.cpp b/services/blob/src/DatabaseEntities/BlobItem.cpp index 8f46cb5c9..a709bb884 100644 --- a/services/blob/src/DatabaseEntities/BlobItem.cpp +++ b/services/blob/src/DatabaseEntities/BlobItem.cpp @@ -1,74 +1,73 @@ #include "BlobItem.h" -#include "AwsTools.h" #include "Constants.h" namespace comm { namespace network { namespace database { const std::string BlobItem::FIELD_BLOB_HASH = "blobHash"; const std::string BlobItem::FIELD_S3_PATH = "s3Path"; const std::string BlobItem::FIELD_CREATED = "created"; std::string BlobItem::tableName = BLOB_TABLE_NAME; BlobItem::BlobItem( const std::string blobHash, const S3Path s3Path, uint64_t created) : blobHash(blobHash), s3Path(s3Path), created(created) { this->validate(); } BlobItem::BlobItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void BlobItem::validate() const { if (!this->blobHash.size()) { throw std::runtime_error("blobHash empty"); } this->s3Path.validate(); } void BlobItem::assignItemFromDatabase(const AttributeValues &itemFromDB) { try { this->blobHash = itemFromDB.at(BlobItem::FIELD_BLOB_HASH).GetS(); this->s3Path = S3Path(itemFromDB.at(BlobItem::FIELD_S3_PATH).GetS()); this->created = std::stoll( std::string(itemFromDB.at(BlobItem::FIELD_CREATED).GetS()).c_str()); } catch (std::logic_error &e) { throw std::runtime_error( "invalid blob item provided, " + std::string(e.what())); } this->validate(); } std::string BlobItem::getTableName() const { return BlobItem::tableName; } PrimaryKey BlobItem::getPrimaryKey() const { return PrimaryKey(BlobItem::FIELD_BLOB_HASH); } PrimaryKeyValue BlobItem::getPrimaryKeyValue() const { return PrimaryKeyValue(this->blobHash); } std::string BlobItem::getBlobHash() const { return this->blobHash; } S3Path BlobItem::getS3Path() const { return this->s3Path; } uint64_t BlobItem::getCreated() const { return this->created; } } // namespace database } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseEntities/ReverseIndexItem.cpp b/services/blob/src/DatabaseEntities/ReverseIndexItem.cpp index 7de0c6d49..4d0080a70 100644 --- a/services/blob/src/DatabaseEntities/ReverseIndexItem.cpp +++ b/services/blob/src/DatabaseEntities/ReverseIndexItem.cpp @@ -1,63 +1,62 @@ #include "ReverseIndexItem.h" -#include "AwsTools.h" #include "Constants.h" namespace comm { namespace network { namespace database { const std::string ReverseIndexItem::FIELD_HOLDER = "holder"; const std::string ReverseIndexItem::FIELD_BLOB_HASH = "blobHash"; std::string ReverseIndexItem::tableName = REVERSE_INDEX_TABLE_NAME; ReverseIndexItem::ReverseIndexItem( const std::string holder, const std::string blobHash) : holder(holder), blobHash(blobHash) { this->validate(); } ReverseIndexItem::ReverseIndexItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void ReverseIndexItem::validate() const { if (!this->holder.size()) { throw std::runtime_error("reverse index empty"); } if (!this->blobHash.size()) { throw std::runtime_error("blobHash empty"); } } void ReverseIndexItem::assignItemFromDatabase( const AttributeValues &itemFromDB) { this->holder = itemFromDB.at(ReverseIndexItem::FIELD_HOLDER).GetS(); this->blobHash = itemFromDB.at(ReverseIndexItem::FIELD_BLOB_HASH).GetS(); this->validate(); } std::string ReverseIndexItem::getTableName() const { return ReverseIndexItem::tableName; } PrimaryKey ReverseIndexItem::getPrimaryKey() const { return PrimaryKey(ReverseIndexItem::FIELD_HOLDER); } PrimaryKeyValue ReverseIndexItem::getPrimaryKeyValue() const { return PrimaryKeyValue(this->holder); } std::string ReverseIndexItem::getHolder() const { return this->holder; } std::string ReverseIndexItem::getBlobHash() const { return this->blobHash; } } // namespace database } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseManager.h b/services/blob/src/DatabaseManager.h index ff527fd8c..22d517a32 100644 --- a/services/blob/src/DatabaseManager.h +++ b/services/blob/src/DatabaseManager.h @@ -1,68 +1,68 @@ #pragma once -#include "AwsTools.h" #include "DatabaseEntitiesTools.h" +#include "DynamoDBTools.h" #include #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace database { // this class should be thread-safe in case any shared resources appear class DatabaseManager { void innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request); template std::shared_ptr innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request); void innerRemoveItem(const Item &item); public: static DatabaseManager &getInstance(); void putBlobItem(const BlobItem &item); std::shared_ptr findBlobItem(const std::string &blobHash); void removeBlobItem(const std::string &blobHash); void putReverseIndexItem(const ReverseIndexItem &item); std::shared_ptr findReverseIndexItemByHolder(const std::string &holder); std::vector> findReverseIndexItemsByHash(const std::string &blobHash); bool removeReverseIndexItem(const std::string &holder); }; template std::shared_ptr DatabaseManager::innerFindItem(Aws::DynamoDB::Model::GetItemRequest &request) { std::shared_ptr item = createItemByType(); request.SetTableName(item->getTableName()); const Aws::DynamoDB::Model::GetItemOutcome &outcome = getDynamoDBClient()->GetItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const AttributeValues &outcomeItem = outcome.GetResult().GetItem(); if (!outcomeItem.size()) { return nullptr; } item->assignItemFromDatabase(outcomeItem); return std::move(item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/backup/src/AwsTools.cpp b/services/blob/src/DynamoDBTools.cpp similarity index 88% rename from services/backup/src/AwsTools.cpp rename to services/blob/src/DynamoDBTools.cpp index 04b553381..83f48c699 100644 --- a/services/backup/src/AwsTools.cpp +++ b/services/blob/src/DynamoDBTools.cpp @@ -1,21 +1,18 @@ -#include "AwsTools.h" +#include "DynamoDBTools.h" #include "Constants.h" -#include "Tools.h" - -#include namespace comm { namespace network { std::unique_ptr getDynamoDBClient() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; if (isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/backup/src/AwsTools.h b/services/blob/src/DynamoDBTools.h similarity index 91% rename from services/backup/src/AwsTools.h rename to services/blob/src/DynamoDBTools.h index ff2ce3d45..f32a84372 100644 --- a/services/backup/src/AwsTools.h +++ b/services/blob/src/DynamoDBTools.h @@ -1,14 +1,16 @@ #pragma once +#include "Constants.h" + #include #include #include namespace comm { namespace network { std::unique_ptr getDynamoDBClient(); } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h index 2370dbeb2..bf15bfcf0 100644 --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -1,87 +1,88 @@ #pragma once +#include "S3Tools.h" #include "ServerWriteReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include #include #include #include namespace comm { namespace network { namespace reactor { class GetReactor : public ServerWriteReactorBase { size_t offset = 0; size_t fileSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; database::S3Path s3Path; Aws::S3::Model::GetObjectRequest getRequest; public: using ServerWriteReactorBase:: ServerWriteReactorBase; std::unique_ptr writeResponse(blob::GetResponse *response) override { if (this->offset >= this->fileSize) { return std::make_unique(grpc::Status::OK); } const size_t nextSize = std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(this->getRequest); if (!getOutcome.IsSuccess()) { return std::make_unique( grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); response->set_datachunk(result); this->offset += nextSize; return nullptr; } void initialize() override { this->s3Path = findS3Path(this->request.holder()); this->fileSize = getBucket(s3Path.getBucketName()).getObjectSize(s3Path.getObjectName()); this->getRequest.SetBucket(this->s3Path.getBucketName()); this->getRequest.SetKey(this->s3Path.getObjectName()); AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); if (!bucket.isAvailable()) { throw std::runtime_error( "bucket [" + this->s3Path.getBucketName() + "] not available"); } const size_t fileSize = bucket.getObjectSize(this->s3Path.getObjectName()); if (this->fileSize == 0) { throw std::runtime_error("object empty"); } }; void doneCallback() override{}; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/AwsTools.cpp b/services/blob/src/S3Tools.cpp similarity index 76% rename from services/blob/src/AwsTools.cpp rename to services/blob/src/S3Tools.cpp index 71c6af137..222c6197b 100644 --- a/services/blob/src/AwsTools.cpp +++ b/services/blob/src/S3Tools.cpp @@ -1,55 +1,45 @@ -#include "AwsTools.h" +#include "S3Tools.h" #include "Constants.h" #include "Tools.h" #include #include namespace comm { namespace network { AwsS3Bucket getBucket(const std::string &bucketName) { return AwsS3Bucket(bucketName); } std::vector listBuckets() { Aws::S3::Model::ListBucketsOutcome outcome = getS3Client()->ListBuckets(); std::vector result; if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector buckets = outcome.GetResult().GetBuckets(); for (Aws::S3::Model::Bucket &bucket : buckets) { result.push_back(bucket.GetName()); } return result; } -std::unique_ptr getDynamoDBClient() { - Aws::Client::ClientConfiguration config; - config.region = AWS_REGION; - if (isDevMode()) { - config.endpointOverride = Aws::String("localstack:4566"); - config.scheme = Aws::Http::Scheme::HTTP; - } - return std::make_unique(config); -} - std::unique_ptr getS3Client() { Aws::Client::ClientConfiguration config; config.region = AWS_REGION; if (isDevMode()) { config.endpointOverride = Aws::String("localstack:4566"); config.scheme = Aws::Http::Scheme::HTTP; return std::make_unique( config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); } return std::make_unique(config); } } // namespace network } // namespace comm diff --git a/services/blob/src/AwsTools.h b/services/blob/src/S3Tools.h similarity index 86% rename from services/blob/src/AwsTools.h rename to services/blob/src/S3Tools.h index e7f349b14..e75caf918 100644 --- a/services/blob/src/AwsTools.h +++ b/services/blob/src/S3Tools.h @@ -1,25 +1,23 @@ #pragma once #include "AwsS3Bucket.h" #include "Constants.h" #include #include #include #include #include namespace comm { namespace network { AwsS3Bucket getBucket(const std::string &bucketName); std::vector listBuckets(); -std::unique_ptr getDynamoDBClient(); - std::unique_ptr getS3Client(); } // namespace network } // namespace comm diff --git a/services/blob/src/Tools.cpp b/services/blob/src/Tools.cpp index 1e644188c..f6bb36576 100644 --- a/services/blob/src/Tools.cpp +++ b/services/blob/src/Tools.cpp @@ -1,94 +1,94 @@ #include "Tools.h" -#include "AwsTools.h" #include "Constants.h" #include "DatabaseEntitiesTools.h" #include "DatabaseManager.h" +#include "S3Tools.h" #include #include #include #include #include namespace comm { namespace network { database::S3Path generateS3Path(const std::string &bucketName, const std::string &blobHash) { return database::S3Path(bucketName, blobHash); } std::string computeHashForFile(const database::S3Path &s3Path) { SHA512_CTX ctx; SHA512_Init(&ctx); const std::function callback = [&ctx](const std::string &chunk) { SHA512_Update(&ctx, chunk.data(), chunk.size()); }; getBucket(s3Path.getBucketName()) .getObjectDataChunks( s3Path.getObjectName(), callback, GRPC_CHUNK_SIZE_LIMIT); unsigned char hash[SHA512_DIGEST_LENGTH]; SHA512_Final(hash, &ctx); std::ostringstream hashStream; for (int i = 0; i < SHA512_DIGEST_LENGTH; i++) { hashStream << std::hex << std::setfill('0') << std::setw(2) << std::nouppercase << (int)hash[i]; } return hashStream.str(); } database::S3Path findS3Path(const std::string &holder) { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error( "provided holder: [" + holder + "] has not been found in the database"); } return findS3Path(*reverseIndexItem); } database::S3Path findS3Path(const database::ReverseIndexItem &reverseIndexItem) { std::shared_ptr blobItem = database::DatabaseManager::getInstance().findBlobItem( reverseIndexItem.getBlobHash()); if (blobItem == nullptr) { throw std::runtime_error( "no blob found for blobHash: [" + reverseIndexItem.getBlobHash() + "]"); } database::S3Path result = blobItem->getS3Path(); return result; } uint64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } std::string decorateTableName(const std::string &baseName) { std::string suffix = ""; if (std::getenv("COMM_TEST_SERVICES") != nullptr && std::string(std::getenv("COMM_TEST_SERVICES")) == "1") { suffix = "-test"; } return baseName + suffix; } bool isDevMode() { if (std::getenv("COMM_SERVICES_DEV_MODE") == nullptr) { return false; } return std::string(std::getenv("COMM_SERVICES_DEV_MODE")) == "1"; } } // namespace network } // namespace comm diff --git a/services/blob/test/MultiPartUploadTest.cpp b/services/blob/test/MultiPartUploadTest.cpp index 64ec9d934..8c271fedc 100644 --- a/services/blob/test/MultiPartUploadTest.cpp +++ b/services/blob/test/MultiPartUploadTest.cpp @@ -1,76 +1,76 @@ #include #include "AwsS3Bucket.h" -#include "AwsTools.h" #include "MultiPartUploader.h" +#include "S3Tools.h" #include "TestTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class MultiPartUploadTest : public testing::Test { protected: std::shared_ptr s3Client; const std::string bucketName = "commapp-test"; std::unique_ptr bucket; virtual void SetUp() { Aws::InitAPI({}); s3Client = std::move(getS3Client()); bucket = std::make_unique(bucketName); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateNByes(const size_t n) { std::string result; result.resize(n); memset((char *)result.data(), 'A', n); return result; } TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1)); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE)); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_THROW(bucket->getObjectData(objectName), invalid_argument_error); EXPECT_EQ( bucket->getObjectSize(objectName), AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3); bucket->removeObject(objectName); } TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_EQ(bucket->getObjectSize(objectName), 3); bucket->removeObject(objectName); } diff --git a/services/blob/test/StorageManagerTest.cpp b/services/blob/test/StorageManagerTest.cpp index b0843d881..e30cf06e2 100644 --- a/services/blob/test/StorageManagerTest.cpp +++ b/services/blob/test/StorageManagerTest.cpp @@ -1,59 +1,59 @@ #include -#include "AwsTools.h" +#include "S3Tools.h" #include "TestTools.h" #include #include #include #include #include using namespace comm::network; class StorageManagerTest : public testing::Test { public: protected: const std::string bucketName = "commapp-test"; const std::string data = "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8" "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6"; virtual void SetUp() { Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; TEST_F(StorageManagerTest, ObjectOperationsTest) { EXPECT_TRUE(getBucket(bucketName).isAvailable()); std::string objectName = createObject(getBucket(bucketName)); getBucket(bucketName).writeObject(objectName, data); EXPECT_EQ(getBucket(bucketName).getObjectSize(objectName), data.size()); EXPECT_TRUE(getBucket(bucketName).getObjectData(objectName) == data); std::string chunkedData; const size_t chunkSize = data.size() / 10; std::function callback = [&chunkedData](const std::string &chunk) { chunkedData += chunk; }; getBucket(bucketName).getObjectDataChunks(objectName, callback, chunkSize); EXPECT_TRUE(data == chunkedData); getBucket(bucketName).renameObject(objectName, objectName + "c"); EXPECT_THROW( getBucket(bucketName).getObjectData(objectName), std::runtime_error); EXPECT_TRUE(getBucket(bucketName).getObjectData(objectName + "c") == data); getBucket(bucketName).renameObject(objectName + "c", objectName); getBucket(bucketName).clearObject(objectName); EXPECT_EQ(getBucket(bucketName).getObjectSize(objectName), 0); getBucket(bucketName).removeObject(objectName); EXPECT_THROW( getBucket(bucketName).getObjectData(objectName), std::runtime_error); }