diff --git a/services/blob/src/BlobServiceImpl.cpp b/services/blob/src/BlobServiceImpl.cpp index 5a72766e3..0f06dbf16 100644 --- a/services/blob/src/BlobServiceImpl.cpp +++ b/services/blob/src/BlobServiceImpl.cpp @@ -1,104 +1,103 @@ #include "BlobServiceImpl.h" #include "Constants.h" #include "DatabaseManager.h" #include "MultiPartUploader.h" #include "S3Tools.h" #include "Tools.h" #include "GetReactor.h" #include "PutReactor.h" #include -#include #include namespace comm { namespace network { BlobServiceImpl::BlobServiceImpl() { Aws::InitAPI({}); if (!getBucket(BLOB_BUCKET_NAME).isAvailable()) { throw std::runtime_error("bucket " + BLOB_BUCKET_NAME + " not available"); } } BlobServiceImpl::~BlobServiceImpl() { Aws::ShutdownAPI({}); } void BlobServiceImpl::verifyBlobHash( const std::string &expectedBlobHash, const database::S3Path &s3Path) { const std::string computedBlobHash = tools::computeHashForFile(s3Path); if (expectedBlobHash != computedBlobHash) { throw std::runtime_error( "blob hash mismatch, expected: [" + expectedBlobHash + "], computed: [" + computedBlobHash + "]"); } } void BlobServiceImpl::assignVariableIfEmpty( const std::string &label, std::string &lvalue, const std::string &rvalue) { if (!lvalue.empty()) { throw std::runtime_error( "multiple assignment for variable " + label + " is not allowed"); } lvalue = rvalue; } grpc::ServerBidiReactor * BlobServiceImpl::Put(grpc::CallbackServerContext *context) { return new reactor::PutReactor(); } grpc::ServerWriteReactor *BlobServiceImpl::Get( grpc::CallbackServerContext *context, const blob::GetRequest *request) { reactor::GetReactor *gr = new reactor::GetReactor(request); gr->start(); return gr; } grpc::ServerUnaryReactor *BlobServiceImpl::Remove( grpc::CallbackServerContext *context, const blob::RemoveRequest *request, google::protobuf::Empty *response) { grpc::Status status = grpc::Status::OK; const std::string holder = request->holder(); try { std::shared_ptr reverseIndexItem = database::DatabaseManager::getInstance().findReverseIndexItemByHolder( holder); if (reverseIndexItem == nullptr) { throw std::runtime_error("no item found for holder: " + holder); } // TODO handle cleanup here properly // for now the object's being removed right away const std::string blobHash = reverseIndexItem->getBlobHash(); database::DatabaseManager::getInstance().removeReverseIndexItem(holder); if (database::DatabaseManager::getInstance() .findReverseIndexItemsByHash(reverseIndexItem->getBlobHash()) .size() == 0) { database::S3Path s3Path = tools::findS3Path(*reverseIndexItem); AwsS3Bucket bucket = getBucket(s3Path.getBucketName()); bucket.removeObject(s3Path.getObjectName()); database::DatabaseManager::getInstance().removeBlobItem(blobHash); } } catch (std::runtime_error &e) { LOG(ERROR) << e.what(); status = grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } auto *reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } } // namespace network } // namespace comm diff --git a/services/blob/src/DatabaseManager.cpp b/services/blob/src/DatabaseManager.cpp index 273cb9724..9766b1a5d 100644 --- a/services/blob/src/DatabaseManager.cpp +++ b/services/blob/src/DatabaseManager.cpp @@ -1,121 +1,120 @@ #include "DatabaseManager.h" #include "GlobalTools.h" #include "Tools.h" #include #include #include -#include #include namespace comm { namespace network { namespace database { DatabaseManager &DatabaseManager::getInstance() { static DatabaseManager instance; return instance; } void DatabaseManager::putBlobItem(const BlobItem &item) { Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(BlobItem::tableName); request.AddItem( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); request.AddItem( BlobItem::FIELD_S3_PATH, Aws::DynamoDB::Model::AttributeValue(item.getS3Path().getFullPath())); request.AddItem( BlobItem::FIELD_CREATED, Aws::DynamoDB::Model::AttributeValue( std::to_string(tools::getCurrentTimestamp()))); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findBlobItem(const std::string &blobHash) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( BlobItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(blobHash)); return std::move(this->innerFindItem(request)); } void DatabaseManager::removeBlobItem(const std::string &blobHash) { std::shared_ptr item = this->findBlobItem(blobHash); if (item == nullptr) { return; } this->innerRemoveItem(*item); } void DatabaseManager::putReverseIndexItem(const ReverseIndexItem &item) { if (this->findReverseIndexItemByHolder(item.getHolder()) != nullptr) { throw std::runtime_error( "An item for the given holder [" + item.getHolder() + "] already exists"); } Aws::DynamoDB::Model::PutItemRequest request; request.SetTableName(ReverseIndexItem::tableName); request.AddItem( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(item.getHolder())); request.AddItem( ReverseIndexItem::FIELD_BLOB_HASH, Aws::DynamoDB::Model::AttributeValue(item.getBlobHash())); this->innerPutItem(std::make_shared(item), request); } std::shared_ptr DatabaseManager::findReverseIndexItemByHolder(const std::string &holder) { Aws::DynamoDB::Model::GetItemRequest request; request.AddKey( ReverseIndexItem::FIELD_HOLDER, Aws::DynamoDB::Model::AttributeValue(holder)); return std::move(this->innerFindItem(request)); } std::vector> DatabaseManager::findReverseIndexItemsByHash(const std::string &blobHash) { std::vector> result; Aws::DynamoDB::Model::QueryRequest req; req.SetTableName(ReverseIndexItem::tableName); req.SetKeyConditionExpression("blobHash = :valueToMatch"); AttributeValues attributeValues; attributeValues.emplace(":valueToMatch", blobHash); req.SetExpressionAttributeValues(attributeValues); req.SetIndexName("blobHash-index"); const Aws::DynamoDB::Model::QueryOutcome &outcome = getDynamoDBClient()->Query(req); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const Aws::Vector &items = outcome.GetResult().GetItems(); for (auto &item : items) { result.push_back(std::make_shared(item)); } return result; } void DatabaseManager::removeReverseIndexItem(const std::string &holder) { std::shared_ptr item = findReverseIndexItemByHolder(holder); if (item == nullptr) { return; } this->innerRemoveItem(*item); } } // namespace database } // namespace network } // namespace comm diff --git a/services/blob/src/Reactors/server/GetReactor.h b/services/blob/src/Reactors/server/GetReactor.h index 906703b2b..b59229b8b 100644 --- a/services/blob/src/Reactors/server/GetReactor.h +++ b/services/blob/src/Reactors/server/GetReactor.h @@ -1,89 +1,88 @@ #pragma once #include "GlobalConstants.h" #include "S3Tools.h" #include "ServerWriteReactorBase.h" #include "../_generated/blob.grpc.pb.h" #include "../_generated/blob.pb.h" #include -#include #include #include namespace comm { namespace network { namespace reactor { class GetReactor : public ServerWriteReactorBase { size_t offset = 0; size_t fileSize = 0; const size_t chunkSize = GRPC_CHUNK_SIZE_LIMIT - GRPC_METADATA_SIZE_PER_MESSAGE; database::S3Path s3Path; Aws::S3::Model::GetObjectRequest getRequest; public: using ServerWriteReactorBase:: ServerWriteReactorBase; std::unique_ptr writeResponse(blob::GetResponse *response) override { if (this->offset >= this->fileSize) { return std::make_unique(grpc::Status::OK); } const size_t nextSize = std::min(this->chunkSize, this->fileSize - this->offset); std::string range = "bytes=" + std::to_string(this->offset) + "-" + std::to_string(this->offset + nextSize - 1); this->getRequest.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = getS3Client()->GetObject(this->getRequest); if (!getOutcome.IsSuccess()) { return std::make_unique( grpc::StatusCode::INTERNAL, getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::stringstream buffer; buffer << retrievedFile.rdbuf(); std::string result(buffer.str()); response->set_datachunk(result); this->offset += nextSize; return nullptr; } void initialize() override { this->s3Path = tools::findS3Path(this->request.holder()); this->fileSize = getBucket(s3Path.getBucketName()).getObjectSize(s3Path.getObjectName()); this->getRequest.SetBucket(this->s3Path.getBucketName()); this->getRequest.SetKey(this->s3Path.getObjectName()); AwsS3Bucket bucket = getBucket(this->s3Path.getBucketName()); if (!bucket.isAvailable()) { throw std::runtime_error( "bucket [" + this->s3Path.getBucketName() + "] not available"); } const size_t fileSize = bucket.getObjectSize(this->s3Path.getObjectName()); if (this->fileSize == 0) { throw std::runtime_error("object empty"); } }; void doneCallback() override{}; }; } // namespace reactor } // namespace network } // namespace comm diff --git a/services/blob/src/server.cpp b/services/blob/src/server.cpp index b9a271c2f..979af4e92 100644 --- a/services/blob/src/server.cpp +++ b/services/blob/src/server.cpp @@ -1,37 +1,36 @@ #include "BlobServiceImpl.h" #include "GlobalTools.h" #include #include -#include #include #include namespace comm { namespace network { void RunServer() { std::string server_address = "0.0.0.0:50051"; BlobServiceImpl blobService; grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&blobService); std::unique_ptr server(builder.BuildAndStart()); LOG(INFO) << "Server listening"; server->Wait(); } } // namespace network } // namespace comm int main(int argc, char **argv) { comm::network::tools::InitLogging("blob"); comm::network::RunServer(); return 0; } diff --git a/services/blob/test/DatabaseManagerTest.cpp b/services/blob/test/DatabaseManagerTest.cpp index 6865578f0..ee70d3d61 100644 --- a/services/blob/test/DatabaseManagerTest.cpp +++ b/services/blob/test/DatabaseManagerTest.cpp @@ -1,68 +1,66 @@ #include #include "DatabaseManager.h" #include "S3Path.h" -#include - #include #include #include #include #include using namespace comm::network::database; class DatabaseManagerTest : public testing::Test { protected: virtual void SetUp() { Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; std::string generateName() { std::chrono::milliseconds ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()); return std::to_string(ms.count()); } TEST_F(DatabaseManagerTest, TestOperationsOnBlobItems) { const BlobItem item(generateName(), S3Path(generateName(), generateName())); DatabaseManager::getInstance().putBlobItem(item); std::shared_ptr foundItem = DatabaseManager::getInstance().findBlobItem(item.getBlobHash()); EXPECT_NE(foundItem->getCreated(), 0); EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash()); const BlobItem item2(generateName(), S3Path(generateName(), generateName())); DatabaseManager::getInstance().putBlobItem(item2); DatabaseManager::getInstance().removeBlobItem(item.getBlobHash()); DatabaseManager::getInstance().removeBlobItem(item2.getBlobHash()); EXPECT_EQ( DatabaseManager::getInstance().findBlobItem(item.getBlobHash()), nullptr); EXPECT_EQ( DatabaseManager::getInstance().findBlobItem(item2.getBlobHash()), nullptr); } TEST_F(DatabaseManagerTest, TestOperationsOnReverseIndexItems) { const ReverseIndexItem item(generateName(), generateName()); DatabaseManager::getInstance().putReverseIndexItem(item); std::vector> foundItems = DatabaseManager::getInstance().findReverseIndexItemsByHash( item.getBlobHash()); EXPECT_EQ(foundItems.size(), 1); std::shared_ptr foundItem = foundItems.at(0); EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash()); foundItem = std::dynamic_pointer_cast( DatabaseManager::getInstance().findReverseIndexItemByHolder( item.getHolder())); EXPECT_EQ(item.getBlobHash(), foundItem->getBlobHash()); DatabaseManager::getInstance().removeReverseIndexItem(foundItem->getHolder()); } diff --git a/services/blob/test/StorageManagerTest.cpp b/services/blob/test/StorageManagerTest.cpp index 35665aff9..a00b51835 100644 --- a/services/blob/test/StorageManagerTest.cpp +++ b/services/blob/test/StorageManagerTest.cpp @@ -1,63 +1,62 @@ #include #include "Constants.h" #include "S3Tools.h" #include "TestTools.h" #include #include -#include #include #include using namespace comm::network; class StorageManagerTest : public testing::Test { public: protected: const std::string data = "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8" "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6"; virtual void SetUp() { Aws::InitAPI({}); } virtual void TearDown() { Aws::ShutdownAPI({}); } }; TEST_F(StorageManagerTest, ObjectOperationsTest) { EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).isAvailable()); std::string objectName = createObject(getBucket(BLOB_BUCKET_NAME)); getBucket(BLOB_BUCKET_NAME).writeObject(objectName, data); EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), data.size()); EXPECT_TRUE(getBucket(BLOB_BUCKET_NAME).getObjectData(objectName) == data); std::string chunkedData; const size_t chunkSize = data.size() / 10; std::function callback = [&chunkedData](const std::string &chunk) { chunkedData += chunk; }; getBucket(BLOB_BUCKET_NAME) .getObjectDataChunks(objectName, callback, chunkSize); EXPECT_TRUE(data == chunkedData); getBucket(BLOB_BUCKET_NAME).renameObject(objectName, objectName + "c"); EXPECT_THROW( getBucket(BLOB_BUCKET_NAME).getObjectData(objectName), std::runtime_error); EXPECT_TRUE( getBucket(BLOB_BUCKET_NAME).getObjectData(objectName + "c") == data); getBucket(BLOB_BUCKET_NAME).renameObject(objectName + "c", objectName); getBucket(BLOB_BUCKET_NAME).clearObject(objectName); EXPECT_EQ(getBucket(BLOB_BUCKET_NAME).getObjectSize(objectName), 0); getBucket(BLOB_BUCKET_NAME).removeObject(objectName); EXPECT_THROW( getBucket(BLOB_BUCKET_NAME).getObjectData(objectName), std::runtime_error); }