diff --git a/native/.clang-format b/.clang-format similarity index 100% rename from native/.clang-format rename to .clang-format diff --git a/services/backup/docker-server/contents/server/dev/AwsS3Bucket.dev.cpp b/services/backup/docker-server/contents/server/dev/AwsS3Bucket.dev.cpp index 121c912c4..fe30a5841 100644 --- a/services/backup/docker-server/contents/server/dev/AwsS3Bucket.dev.cpp +++ b/services/backup/docker-server/contents/server/dev/AwsS3Bucket.dev.cpp @@ -1,105 +1,111 @@ #include "AwsS3Bucket.h" #include "DevTools.h" #include "Tools.h" #include #include #include namespace comm { namespace network { -AwsS3Bucket::AwsS3Bucket(const std::string name, - std::shared_ptr client) +AwsS3Bucket::AwsS3Bucket( + const std::string name, + std::shared_ptr client) : name(name), client(nullptr) { std::filesystem::create_directories(commFilesystemPath); } std::vector AwsS3Bucket::listObjects() { std::vector result; for (const auto &entry : std::filesystem::directory_iterator(commFilesystemPath)) { result.push_back(entry.path()); } return result; } bool AwsS3Bucket::isAvailable() const { return std::filesystem::exists(commFilesystemPath); } const size_t AwsS3Bucket::getObjectSize(const std::string &objectName) { return std::filesystem::file_size(createCommPath(objectName)); } -void AwsS3Bucket::renameObject(const std::string ¤tName, - const std::string &newName) { +void AwsS3Bucket::renameObject( + const std::string ¤tName, + const std::string &newName) { std::filesystem::rename(createCommPath(currentName), createCommPath(newName)); } -void AwsS3Bucket::writeObject(const std::string &objectName, - const std::string data) { +void AwsS3Bucket::writeObject( + const std::string &objectName, + const std::string data) { if (std::filesystem::exists(createCommPath(objectName))) { this->clearObject(createCommPath(objectName)); } std::ofstream ofs(createCommPath(objectName)); ofs << data; } std::string AwsS3Bucket::getObjectData(const std::string &objectName) { - std::ifstream ifs(createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); + std::ifstream ifs( + createCommPath(objectName), + std::ios::in | std::ios::binary | std::ios::ate); std::ifstream::pos_type fileSize = ifs.tellg(); ifs.seekg(0, std::ios::beg); if (fileSize > GRPC_CHUNK_SIZE_LIMIT) { - throw invalid_argument_error( - std::string("The file is too big(" + std::to_string(fileSize) + - " bytes, max is " + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + - "bytes), please, use getObjectDataChunks")); + throw invalid_argument_error(std::string( + "The file is too big(" + std::to_string(fileSize) + " bytes, max is " + + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + + "bytes), please, use getObjectDataChunks")); } std::string bytes; bytes.resize(fileSize); ifs.read((char *)bytes.data(), fileSize); return bytes; } void AwsS3Bucket::getObjectDataChunks( const std::string &objectName, const std::function &callback, const size_t chunkSize) { - std::ifstream ifs(createCommPath(objectName), - std::ios::in | std::ios::binary | std::ios::ate); + std::ifstream ifs( + createCommPath(objectName), + std::ios::in | std::ios::binary | std::ios::ate); std::ifstream::pos_type fileSize = ifs.tellg(); size_t filePos = 0; while (filePos < fileSize) { ifs.seekg(filePos, std::ios::beg); std::string bytes; bytes.resize(chunkSize); ifs.read((char *)bytes.data(), chunkSize); filePos += bytes.size(); callback(bytes); } } -void AwsS3Bucket::appendToObject(const std::string &objectName, - const std::string data) { +void AwsS3Bucket::appendToObject( + const std::string &objectName, + const std::string data) { std::ofstream ofs; ofs.open(createCommPath(objectName), std::ios_base::app); ofs << data; } void AwsS3Bucket::clearObject(const std::string &objectName) { std::filesystem::resize_file(createCommPath(objectName), 0); } void AwsS3Bucket::deleteObject(const std::string &objectName) { std::filesystem::remove(createCommPath(objectName)); } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/dev/MultiPartUploader.dev.cpp b/services/backup/docker-server/contents/server/dev/MultiPartUploader.dev.cpp index 36b474abf..aec70b018 100644 --- a/services/backup/docker-server/contents/server/dev/MultiPartUploader.dev.cpp +++ b/services/backup/docker-server/contents/server/dev/MultiPartUploader.dev.cpp @@ -1,40 +1,42 @@ #include "MultiPartUploader.h" #include "AwsS3Bucket.h" #include "DevTools.h" #include "Tools.h" #include namespace comm { namespace network { std::unique_ptr bucket; -MultiPartUploader::MultiPartUploader(std::shared_ptr client, - const std::string bucketName, - const std::string objectName) +MultiPartUploader::MultiPartUploader( + std::shared_ptr client, + const std::string bucketName, + const std::string objectName) : client(nullptr), bucketName(bucketName), objectName(objectName) { bucket->writeObject(createCommPath(this->objectName), ""); } void MultiPartUploader::addPart(const std::string &part) { AwsS3Bucket(bucketName, nullptr) .appendToObject(createCommPath(this->objectName + "mpu"), part); this->partsSizes.push_back(part.size()); ++this->partCounter; } void MultiPartUploader::finishUpload() { AwsS3Bucket bucket(bucketName, nullptr); for (size_t i = 0; i < this->partsSizes.size() - 1; ++i) { if (this->partsSizes.at(i) < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { bucket.deleteObject(createCommPath(this->objectName + "mpu")); throw std::runtime_error("too small part detected"); } } - bucket.renameObject(createCommPath(this->objectName + "mpu"), - createCommPath(this->objectName)); + bucket.renameObject( + createCommPath(this->objectName + "mpu"), + createCommPath(this->objectName)); } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/AwsS3Bucket.cpp b/services/backup/docker-server/contents/server/src/AwsS3Bucket.cpp index b37fd4e22..d85e0a6b6 100644 --- a/services/backup/docker-server/contents/server/src/AwsS3Bucket.cpp +++ b/services/backup/docker-server/contents/server/src/AwsS3Bucket.cpp @@ -1,216 +1,222 @@ #include "AwsS3Bucket.h" #include "MultiPartUploader.h" #include "Tools.h" #include #include #include #include #include #include #include #include #include namespace comm { namespace network { -AwsS3Bucket::AwsS3Bucket(const std::string name, - std::shared_ptr client) - : name(name), client(client) {} +AwsS3Bucket::AwsS3Bucket( + const std::string name, + std::shared_ptr client) + : name(name), client(client) { +} std::vector AwsS3Bucket::listObjects() { Aws::S3::Model::ListObjectsRequest request; request.SetBucket(this->name); std::vector result; Aws::S3::Model::ListObjectsOutcome outcome = this->client->ListObjects(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } Aws::Vector objects = outcome.GetResult().GetContents(); for (Aws::S3::Model::Object &object : objects) { result.push_back(object.GetKey()); } return result; } bool AwsS3Bucket::isAvailable() const { Aws::S3::Model::HeadBucketRequest headRequest; headRequest.SetBucket(this->name); Aws::S3::Model::HeadBucketOutcome outcome = this->client->HeadBucket(headRequest); return outcome.IsSuccess(); } const size_t AwsS3Bucket::getObjectSize(const std::string &objectName) { Aws::S3::Model::HeadObjectRequest headRequest; headRequest.SetBucket(this->name); headRequest.SetKey(objectName); Aws::S3::Model::HeadObjectOutcome headOutcome = this->client->HeadObject(headRequest); if (!headOutcome.IsSuccess()) { throw std::runtime_error(headOutcome.GetError().GetMessage()); } return headOutcome.GetResultWithOwnership().GetContentLength(); } -void AwsS3Bucket::renameObject(const std::string ¤tName, - const std::string &newName) { +void AwsS3Bucket::renameObject( + const std::string ¤tName, + const std::string &newName) { Aws::S3::Model::CopyObjectRequest copyRequest; copyRequest.SetCopySource(this->name + "/" + currentName); copyRequest.SetKey(newName); copyRequest.SetBucket(this->name); Aws::S3::Model::CopyObjectOutcome copyOutcome = this->client->CopyObject(copyRequest); if (!copyOutcome.IsSuccess()) { throw std::runtime_error(copyOutcome.GetError().GetMessage()); } this->deleteObject(currentName); } -void AwsS3Bucket::writeObject(const std::string &objectName, - const std::string data) { +void AwsS3Bucket::writeObject( + const std::string &objectName, + const std::string data) { // we don't have to handle multiple write here because the GRPC limit is 4MB // and minimum size of data to perform multipart upload is 5MB Aws::S3::Model::PutObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); std::shared_ptr body = std::shared_ptr( new boost::interprocess::bufferstream((char *)data.data(), data.size())); request.SetBody(body); Aws::S3::Model::PutObjectOutcome outcome = this->client->PutObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } std::string AwsS3Bucket::getObjectData(const std::string &objectName) { Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); Aws::S3::Model::GetObjectOutcome outcome = this->client->GetObject(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } const size_t size = this->getObjectSize(objectName); if (size > GRPC_CHUNK_SIZE_LIMIT) { - throw invalid_argument_error( - std::string("The file is too big(" + std::to_string(size) + - " bytes, max is " + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + - "bytes), please, use getObjectDataChunks")); + throw invalid_argument_error(std::string( + "The file is too big(" + std::to_string(size) + " bytes, max is " + + std::to_string(GRPC_CHUNK_SIZE_LIMIT) + + "bytes), please, use getObjectDataChunks")); } Aws::IOStream &retrievedFile = outcome.GetResultWithOwnership().GetBody(); std::string result; result.resize(size); retrievedFile.get((char *)result.data(), size + 1); return result; } void AwsS3Bucket::getObjectDataChunks( const std::string &objectName, const std::function &callback, const size_t chunkSize) { const size_t fileSize = this->getObjectSize(objectName); if (fileSize == 0) { return; } Aws::S3::Model::GetObjectRequest request; request.SetBucket(this->name); request.SetKey(objectName); for (size_t offset = 0; offset < fileSize; offset += chunkSize) { const size_t nextSize = std::min(chunkSize, fileSize - offset); std::string range = "bytes=" + std::to_string(offset) + "-" + - std::to_string(offset + nextSize); + std::to_string(offset + nextSize); request.SetRange(range); Aws::S3::Model::GetObjectOutcome getOutcome = this->client->GetObject(request); if (!getOutcome.IsSuccess()) { throw std::runtime_error(getOutcome.GetError().GetMessage()); } Aws::IOStream &retrievedFile = getOutcome.GetResultWithOwnership().GetBody(); std::string result; result.resize(nextSize); retrievedFile.get((char *)result.data(), nextSize + 1); callback(result); } } -void AwsS3Bucket::appendToObject(const std::string &objectName, - const std::string data) { +void AwsS3Bucket::appendToObject( + const std::string &objectName, + const std::string data) { const size_t objectSize = this->getObjectSize(objectName); if (objectSize < AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE) { std::string currentData = this->getObjectData(objectName); currentData += data; this->writeObject(objectName, currentData); return; } size_t currentSize = 0; - MultiPartUploader uploader(this->client, this->name, - objectName + "-multipart"); + MultiPartUploader uploader( + this->client, this->name, objectName + "-multipart"); std::function callback = [&uploader, &data, ¤tSize, objectSize](const std::string &chunk) { currentSize += chunk.size(); if (currentSize < objectSize) { uploader.addPart(chunk); } else if (currentSize == objectSize) { uploader.addPart(std::string(chunk + data)); } else { throw std::runtime_error( "size of chunks exceeds the size of the object"); } }; - this->getObjectDataChunks(objectName, callback, - AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE); + this->getObjectDataChunks( + objectName, callback, AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE); uploader.finishUpload(); // this will overwrite the target file this->renameObject(objectName + "-multipart", objectName); const size_t newSize = this->getObjectSize(objectName); if (objectSize + data.size() != newSize) { - throw std::runtime_error("append to object " + objectName + - " has been performed but the final sizes don't " - "match, the size is now [" + - std::to_string(newSize) + "] but should be [" + - std::to_string(objectSize + data.size()) + "]"); + throw std::runtime_error( + "append to object " + objectName + + " has been performed but the final sizes don't " + "match, the size is now [" + + std::to_string(newSize) + "] but should be [" + + std::to_string(objectSize + data.size()) + "]"); } } void AwsS3Bucket::clearObject(const std::string &objectName) { this->writeObject(objectName, ""); } void AwsS3Bucket::deleteObject(const std::string &objectName) { Aws::S3::Model::DeleteObjectRequest deleteRequest; deleteRequest.SetKey(objectName); deleteRequest.SetBucket(this->name); Aws::S3::Model::DeleteObjectOutcome deleteOutcome = this->client->DeleteObject(deleteRequest); if (!deleteOutcome.IsSuccess()) { throw std::runtime_error(deleteOutcome.GetError().GetMessage()); } } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/AwsS3Bucket.h b/services/backup/docker-server/contents/server/src/AwsS3Bucket.h index 59661a810..3a48e99ec 100644 --- a/services/backup/docker-server/contents/server/src/AwsS3Bucket.h +++ b/services/backup/docker-server/contents/server/src/AwsS3Bucket.h @@ -1,37 +1,38 @@ #pragma once #include #include #include #include #include namespace comm { namespace network { class AwsS3Bucket { const std::string name; std::shared_ptr client; public: - AwsS3Bucket(const std::string name, - std::shared_ptr client); + AwsS3Bucket( + const std::string name, + std::shared_ptr client); std::vector listObjects(); bool isAvailable() const; const size_t getObjectSize(const std::string &objectName); void renameObject(const std::string ¤tName, const std::string &newName); void writeObject(const std::string &objectName, const std::string data); std::string getObjectData(const std::string &objectName); - void - getObjectDataChunks(const std::string &objectName, - const std::function &callback, - const size_t chunkSize); + void getObjectDataChunks( + const std::string &objectName, + const std::function &callback, + const size_t chunkSize); void appendToObject(const std::string &objectName, const std::string data); void clearObject(const std::string &objectName); void deleteObject(const std::string &objectName); }; } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp index 1ef891ef0..ca42bbabc 100644 --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.cpp @@ -1,171 +1,177 @@ #include "BackupServiceImpl.h" #include namespace comm { namespace network { -std::string -BackupServiceImpl::generateObjectName(const std::string &userId, - const OBJECT_TYPE objectType) const { +std::string BackupServiceImpl::generateObjectName( + const std::string &userId, + const OBJECT_TYPE objectType) const { if (objectType == OBJECT_TYPE::ENCRYPTED_BACKUP_KEY) { return userId + "-backup-key"; } if (objectType == OBJECT_TYPE::TRANSACTION_LOGS) { return userId + "-logs"; } if (objectType == OBJECT_TYPE::COMPACTION) { return userId + "-compaction"; } throw std::runtime_error("unhandled operation"); } BackupServiceImpl::BackupServiceImpl() { Aws::InitAPI({}); this->storageManager = std::make_unique(); if (!this->storageManager->getBucket(this->bucketName).isAvailable()) { throw std::runtime_error("bucket " + this->bucketName + " not available"); } } -BackupServiceImpl::~BackupServiceImpl() { Aws::ShutdownAPI({}); } +BackupServiceImpl::~BackupServiceImpl() { + Aws::ShutdownAPI({}); +} -grpc::Status -BackupServiceImpl::ResetKey(grpc::ServerContext *context, - grpc::ServerReader *reader, - google::protobuf::Empty *response) { +grpc::Status BackupServiceImpl::ResetKey( + grpc::ServerContext *context, + grpc::ServerReader *reader, + google::protobuf::Empty *response) { backup::ResetKeyRequest request; std::string id; AwsS3Bucket bucket = this->storageManager->getBucket(this->bucketName); try { while (reader->Read(&request)) { if (!id.size()) { id = request.userid(); } else if (id != request.userid()) { throw std::runtime_error("id mismatch: " + id + "/" + request.userid()); } const std::string newKey = request.newkey(); const std::string compactionChunk = request.compactionchunk(); // the following behavior assumes that the client sends: // 1. key + empty chunk // 2. empty key + chunk // ... // N. empty key + chunk if (newKey.size()) { std::cout << "Backup Service => ResetKey(this log will be removed) " "reading key [" << newKey << "]" << std::endl; bucket.writeObject( this->generateObjectName(id, OBJECT_TYPE::ENCRYPTED_BACKUP_KEY), newKey); bucket.clearObject( this->generateObjectName(id, OBJECT_TYPE::COMPACTION)); } else if (compactionChunk.size()) { std::cout << "Backup Service => ResetKey(this log will be removed) " "reading chunk [" << compactionChunk << "]" << std::endl; bucket.appendToObject( this->generateObjectName(id, OBJECT_TYPE::COMPACTION), compactionChunk); } } bucket.clearObject( this->generateObjectName(id, OBJECT_TYPE::TRANSACTION_LOGS)); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; } -grpc::Status BackupServiceImpl::SendLog(grpc::ServerContext *context, - const backup::SendLogRequest *request, - google::protobuf::Empty *response) { +grpc::Status BackupServiceImpl::SendLog( + grpc::ServerContext *context, + const backup::SendLogRequest *request, + google::protobuf::Empty *response) { const std::string id = request->userid(); const std::string data = request->data(); std::cout << "Backup Service => SendLog, id:[" << id << "] data: [" << data << "](this log will be removed)" << std::endl; try { this->storageManager->getBucket(this->bucketName) .appendToObject( this->generateObjectName(id, OBJECT_TYPE::TRANSACTION_LOGS), data); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; } -grpc::Status -BackupServiceImpl::PullBackupKey(grpc::ServerContext *context, - const backup::PullBackupKeyRequest *request, - backup::PullBackupKeyResponse *response) { +grpc::Status BackupServiceImpl::PullBackupKey( + grpc::ServerContext *context, + const backup::PullBackupKeyRequest *request, + backup::PullBackupKeyResponse *response) { const std::string id = request->userid(); const std::string pakeKey = request->pakekey(); std::cout << "Backup Service => PullBackupKey, id:[" << id << "] pakeKey: [" << pakeKey << "](this log will be removed)" << std::endl; // TODO pake operations - verify user's password with pake's keys try { std::string key = this->storageManager->getBucket(this->bucketName) .getObjectData(this->generateObjectName( id, OBJECT_TYPE::ENCRYPTED_BACKUP_KEY)); response->set_encryptedbackupkey(key); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; } grpc::Status BackupServiceImpl::PullCompaction( - grpc::ServerContext *context, const backup::PullCompactionRequest *request, + grpc::ServerContext *context, + const backup::PullCompactionRequest *request, grpc::ServerWriter *writer) { const std::string id = request->userid(); std::cout << "Backup Service => PullCompaction, id:[" << id << "](this log will be removed)" << std::endl; AwsS3Bucket bucket = this->storageManager->getBucket(this->bucketName); try { backup::PullCompactionResponse response; std::function callback = [&response, &writer](std::string chunk) { response.set_compactionchunk(chunk); if (!writer->Write(response)) { throw std::runtime_error("writer interrupted sending compaction"); } }; bucket.getObjectDataChunks( - this->generateObjectName(id, OBJECT_TYPE::COMPACTION), callback, + this->generateObjectName(id, OBJECT_TYPE::COMPACTION), + callback, GRPC_CHUNK_SIZE_LIMIT); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } try { backup::PullCompactionResponse response; std::function callback = [&response, &writer](std::string chunk) { response.set_logchunk(chunk); if (!writer->Write(response)) { throw std::runtime_error("writer interrupted sending logs"); } }; bucket.getObjectDataChunks( - this->generateObjectName(id, OBJECT_TYPE::TRANSACTION_LOGS), callback, + this->generateObjectName(id, OBJECT_TYPE::TRANSACTION_LOGS), + callback, GRPC_CHUNK_SIZE_LIMIT); } catch (std::runtime_error &e) { std::cout << "error: " << e.what() << std::endl; return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } return grpc::Status::OK; } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/BackupServiceImpl.h b/services/backup/docker-server/contents/server/src/BackupServiceImpl.h index df6cfbeec..d54e1794c 100644 --- a/services/backup/docker-server/contents/server/src/BackupServiceImpl.h +++ b/services/backup/docker-server/contents/server/src/BackupServiceImpl.h @@ -1,49 +1,53 @@ #pragma once #include "AwsStorageManager.h" #include "Tools.h" #include "../_generated/backup.grpc.pb.h" #include "../_generated/backup.pb.h" #include #include #include #include #include #include namespace comm { namespace network { class BackupServiceImpl final : public backup::BackupService::Service { const std::string bucketName = "commapp-backup"; std::unique_ptr storageManager; - std::string generateObjectName(const std::string &userId, - const OBJECT_TYPE objectType) const; + std::string generateObjectName( + const std::string &userId, + const OBJECT_TYPE objectType) const; public: BackupServiceImpl(); virtual ~BackupServiceImpl(); - grpc::Status ResetKey(grpc::ServerContext *context, - grpc::ServerReader *reader, - google::protobuf::Empty *response) override; - grpc::Status SendLog(grpc::ServerContext *context, - const backup::SendLogRequest *request, - google::protobuf::Empty *response) override; - grpc::Status PullBackupKey(grpc::ServerContext *context, - const backup::PullBackupKeyRequest *request, - backup::PullBackupKeyResponse *response) override; + grpc::Status ResetKey( + grpc::ServerContext *context, + grpc::ServerReader *reader, + google::protobuf::Empty *response) override; + grpc::Status SendLog( + grpc::ServerContext *context, + const backup::SendLogRequest *request, + google::protobuf::Empty *response) override; + grpc::Status PullBackupKey( + grpc::ServerContext *context, + const backup::PullBackupKeyRequest *request, + backup::PullBackupKeyResponse *response) override; grpc::Status PullCompaction( grpc::ServerContext *context, const backup::PullCompactionRequest *request, grpc::ServerWriter *writer) override; }; } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/MultiPartUploader.cpp b/services/backup/docker-server/contents/server/src/MultiPartUploader.cpp index 3ce518439..6c5208a2d 100644 --- a/services/backup/docker-server/contents/server/src/MultiPartUploader.cpp +++ b/services/backup/docker-server/contents/server/src/MultiPartUploader.cpp @@ -1,81 +1,82 @@ #include "MultiPartUploader.h" #include "Tools.h" #include #include #include #include #include #include namespace comm { namespace network { -MultiPartUploader::MultiPartUploader(std::shared_ptr client, - const std::string bucketName, - const std::string objectName) +MultiPartUploader::MultiPartUploader( + std::shared_ptr client, + const std::string bucketName, + const std::string objectName) : client(client), bucketName(bucketName), objectName(objectName) { this->completeMultipartUploadRequest.SetBucket(this->bucketName); this->completeMultipartUploadRequest.SetKey(this->objectName); Aws::S3::Model::CreateMultipartUploadRequest createRequest; createRequest.SetBucket(this->bucketName); createRequest.SetKey(this->objectName); createRequest.SetContentType("text/plain"); Aws::S3::Model::CreateMultipartUploadOutcome createOutcome = this->client->CreateMultipartUpload(createRequest); if (!createOutcome.IsSuccess()) { throw std::runtime_error(createOutcome.GetError().GetMessage()); } this->uploadId = createOutcome.GetResult().GetUploadId(); this->completeMultipartUploadRequest.SetUploadId(this->uploadId); } void MultiPartUploader::addPart(const std::string &part) { Aws::S3::Model::UploadPartRequest uploadRequest; uploadRequest.SetBucket(this->bucketName); uploadRequest.SetKey(this->objectName); uploadRequest.SetPartNumber(this->partNumber); uploadRequest.SetUploadId(this->uploadId); std::shared_ptr body = std::shared_ptr( new boost::interprocess::bufferstream((char *)part.data(), part.size())); uploadRequest.SetBody(body); Aws::Utils::ByteBuffer partMd5(Aws::Utils::HashingUtils::CalculateMD5(*body)); uploadRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(partMd5)); uploadRequest.SetContentLength(part.size()); Aws::S3::Model::UploadPartOutcome uploadPartOutcome = this->client->UploadPart(uploadRequest); Aws::S3::Model::CompletedPart completedPart; completedPart.SetPartNumber(this->partNumber); std::string eTag = uploadPartOutcome.GetResult().GetETag(); if (eTag.empty()) { throw std::runtime_error("etag empty"); } completedPart.SetETag(eTag); completedMultipartUpload.AddParts(completedPart); ++this->partNumber; } void MultiPartUploader::finishUpload() { this->completeMultipartUploadRequest.SetMultipartUpload( this->completedMultipartUpload); Aws::S3::Model::CompleteMultipartUploadOutcome completeUploadOutcome = this->client->CompleteMultipartUpload( this->completeMultipartUploadRequest); if (!completeUploadOutcome.IsSuccess()) { throw std::runtime_error(completeUploadOutcome.GetError().GetMessage()); } } } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/MultiPartUploader.h b/services/backup/docker-server/contents/server/src/MultiPartUploader.h index 6a1ba610a..a0bb37cbd 100644 --- a/services/backup/docker-server/contents/server/src/MultiPartUploader.h +++ b/services/backup/docker-server/contents/server/src/MultiPartUploader.h @@ -1,34 +1,36 @@ #pragma once #include #include #include #include #include namespace comm { namespace network { class MultiPartUploader { std::shared_ptr client; const std::string bucketName; const std::string objectName; size_t partCounter = 0; std::vector partsSizes; Aws::S3::Model::CompleteMultipartUploadRequest completeMultipartUploadRequest; Aws::S3::Model::CompletedMultipartUpload completedMultipartUpload; std::string uploadId; size_t partNumber = 1; public: - MultiPartUploader(std::shared_ptr client, - const std::string bucketName, const std::string objectName); + MultiPartUploader( + std::shared_ptr client, + const std::string bucketName, + const std::string objectName); void addPart(const std::string &part); void finishUpload(); }; } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/src/Tools.h b/services/backup/docker-server/contents/server/src/Tools.h index 03b81295c..98badbd4b 100644 --- a/services/backup/docker-server/contents/server/src/Tools.h +++ b/services/backup/docker-server/contents/server/src/Tools.h @@ -1,25 +1,26 @@ #pragma once namespace comm { namespace network { // 4MB limit const size_t GRPC_CHUNK_SIZE_LIMIT = 4 * 1024 * 1024; // 5MB limit const size_t AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE = 5 * 1024 * 1024; enum class OBJECT_TYPE { ENCRYPTED_BACKUP_KEY = 0, TRANSACTION_LOGS = 1, COMPACTION = 2, }; class invalid_argument_error : public std::runtime_error { public: invalid_argument_error(std::string errorMessage) - : std::runtime_error(errorMessage) {} + : std::runtime_error(errorMessage) { + } }; } // namespace network } // namespace comm diff --git a/services/backup/docker-server/contents/server/test/BackupTest.cpp b/services/backup/docker-server/contents/server/test/BackupTest.cpp index 62e6c55db..4d096778b 100644 --- a/services/backup/docker-server/contents/server/test/BackupTest.cpp +++ b/services/backup/docker-server/contents/server/test/BackupTest.cpp @@ -1,18 +1,20 @@ #include class BackupTest : public testing::Test { protected: virtual void SetUp() { //... } virtual void TearDown() { //... } }; -TEST_F(BackupTest, passingTest) { EXPECT_TRUE(true); } +TEST_F(BackupTest, passingTest) { + EXPECT_TRUE(true); +} TEST_F(BackupTest, failingTest) { // EXPECT_TRUE(false); } diff --git a/services/backup/docker-server/contents/server/test/MultiPartUploadTest.cpp b/services/backup/docker-server/contents/server/test/MultiPartUploadTest.cpp index bd7474bbf..8c031558d 100644 --- a/services/backup/docker-server/contents/server/test/MultiPartUploadTest.cpp +++ b/services/backup/docker-server/contents/server/test/MultiPartUploadTest.cpp @@ -1,75 +1,78 @@ #include #include "AwsS3Bucket.h" #include "AwsStorageManager.h" #include "MultiPartUploader.h" #include "TestTools.h" #include "Tools.h" #include #include #include using namespace comm::network; class MultiPartUploadTest : public testing::Test { protected: std::shared_ptr s3Client; const std::string bucketName = "commapp-test"; std::unique_ptr bucket; virtual void SetUp() { Aws::InitAPI({}); Aws::Client::ClientConfiguration config; config.region = "us-east-2"; s3Client = std::make_shared(config); bucket = std::make_unique(bucketName, s3Client); } - virtual void TearDown() { Aws::ShutdownAPI({}); } + virtual void TearDown() { + Aws::ShutdownAPI({}); + } }; std::string generateNByes(const size_t n) { std::string result; result.resize(n); memset((char *)result.data(), 'A', n); return result; } TEST_F(MultiPartUploadTest, ThrowingTooSmallPart) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, ThrowingTooSmallPartOneByte) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE - 1)); mpu.addPart("xxx"); EXPECT_THROW(mpu.finishUpload(), std::runtime_error); } TEST_F(MultiPartUploadTest, SuccessfulWriteMultipleChunks) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart(generateNByes(AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE)); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_THROW(bucket->getObjectData(objectName), invalid_argument_error); - EXPECT_EQ(bucket->getObjectSize(objectName), - AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3); + EXPECT_EQ( + bucket->getObjectSize(objectName), + AWS_MULTIPART_UPLOAD_MINIMUM_CHUNK_SIZE + 3); bucket->deleteObject(objectName); } TEST_F(MultiPartUploadTest, SuccessfulWriteOneChunk) { std::string objectName = createObject(*bucket); MultiPartUploader mpu(s3Client, bucketName, objectName); mpu.addPart("xxx"); mpu.finishUpload(); EXPECT_EQ(bucket->getObjectSize(objectName), 3); bucket->deleteObject(objectName); } diff --git a/services/backup/docker-server/contents/server/test/StorageManagerTest.cpp b/services/backup/docker-server/contents/server/test/StorageManagerTest.cpp index 584955f5f..43d2474b5 100644 --- a/services/backup/docker-server/contents/server/test/StorageManagerTest.cpp +++ b/services/backup/docker-server/contents/server/test/StorageManagerTest.cpp @@ -1,67 +1,73 @@ #include #include "AwsStorageManager.h" #include "TestTools.h" #include #include #include #include #include using namespace comm::network; class StorageManagerTest : public testing::Test { public: protected: std::unique_ptr storageManager; const std::string bucketName = "commapp-test"; - const std::string data = "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8" - "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6"; + const std::string data = + "yiU3VaZlKfTteO10yrWmK1Q5BOvBQrdmj2aBlnoLuhxLfRZK1n8" + "26FRXJAGhPswR1r8yxtwxyLkv3I4J4tlH4brDP10mrB99XpM6"; virtual void SetUp() { Aws::InitAPI({}); if (storageManager == nullptr) { storageManager = std::make_unique(); } } - virtual void TearDown() { Aws::ShutdownAPI({}); } + virtual void TearDown() { + Aws::ShutdownAPI({}); + } }; TEST_F(StorageManagerTest, ObjectOperationsTest) { EXPECT_TRUE(storageManager->getBucket(bucketName).isAvailable()); std::string objectName = createObject(storageManager->getBucket(bucketName)); storageManager->getBucket(bucketName).writeObject(objectName, data); - EXPECT_EQ(storageManager->getBucket(bucketName).getObjectSize(objectName), - data.size()); - EXPECT_TRUE(storageManager->getBucket(bucketName).getObjectData(objectName) == - data); + EXPECT_EQ( + storageManager->getBucket(bucketName).getObjectSize(objectName), + data.size()); + EXPECT_TRUE( + storageManager->getBucket(bucketName).getObjectData(objectName) == data); std::string chunkedData; const size_t chunkSize = data.size() / 10; std::function callback = [&chunkedData](const std::string &chunk) { chunkedData += chunk; }; storageManager->getBucket(bucketName) .getObjectDataChunks(objectName, callback, chunkSize); EXPECT_TRUE(data == chunkedData); storageManager->getBucket(bucketName) .renameObject(objectName, objectName + "c"); - EXPECT_THROW(storageManager->getBucket(bucketName).getObjectData(objectName), - std::runtime_error); + EXPECT_THROW( + storageManager->getBucket(bucketName).getObjectData(objectName), + std::runtime_error); EXPECT_TRUE( storageManager->getBucket(bucketName).getObjectData(objectName + "c") == data); storageManager->getBucket(bucketName) .renameObject(objectName + "c", objectName); storageManager->getBucket(bucketName).clearObject(objectName); EXPECT_EQ(storageManager->getBucket(bucketName).getObjectSize(objectName), 0); storageManager->getBucket(bucketName).deleteObject(objectName); - EXPECT_THROW(storageManager->getBucket(bucketName).getObjectData(objectName), - std::runtime_error); + EXPECT_THROW( + storageManager->getBucket(bucketName).getObjectData(objectName), + std::runtime_error); } diff --git a/services/backup/docker-server/contents/server/test/TestTools.cpp b/services/backup/docker-server/contents/server/test/TestTools.cpp index b207d3f08..fbb0df712 100644 --- a/services/backup/docker-server/contents/server/test/TestTools.cpp +++ b/services/backup/docker-server/contents/server/test/TestTools.cpp @@ -1,26 +1,27 @@ #include "TestTools.h" #include "AwsS3Bucket.h" namespace comm { namespace network { std::string generateObjectName() { std::chrono::milliseconds ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()); return std::to_string(ms.count()); } std::string createObject(AwsS3Bucket bucket) { std::string objectName; std::vector presentObjects; do { objectName = generateObjectName(); presentObjects = bucket.listObjects(); - } while (std::find(presentObjects.begin(), presentObjects.end(), - objectName) != presentObjects.end()); + } while ( + std::find(presentObjects.begin(), presentObjects.end(), objectName) != + presentObjects.end()); return objectName; } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/Tools.h b/services/tunnelbroker/docker-server/contents/server/src/Tools.h index 03caede79..e2bca3183 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/Tools.h +++ b/services/tunnelbroker/docker-server/contents/server/src/Tools.h @@ -1,30 +1,31 @@ #pragma once #include #include namespace comm { namespace network { namespace ping { enum class ClientState { ONLINE, OFFLINE, }; struct ClientData { const std::string id; const std::string deviceToken; folly::MPMCQueue pingRequests = folly::MPMCQueue(10); folly::MPMCQueue pingResponses = folly::MPMCQueue(10); ClientState lastState = ClientState::ONLINE; ClientData(const std::string id, const std::string deviceToken) - : id(id), deviceToken(deviceToken) {} + : id(id), deviceToken(deviceToken) { + } }; } // namespace ping } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.cpp b/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.cpp index 9d1b67924..8b7a665a7 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.cpp +++ b/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.cpp @@ -1,103 +1,104 @@ #include "TunnelBrokerServiceImpl.h" -#include #include +#include namespace comm { namespace network { using namespace std::chrono_literals; grpc::Status TunnelBrokerServiceImpl::CheckIfPrimaryDeviceOnline( - grpc::ServerContext *context, const tunnelbroker::CheckRequest *request, + grpc::ServerContext *context, + const tunnelbroker::CheckRequest *request, tunnelbroker::CheckResponse *response) { const std::string id = request->userid(); const std::string deviceToken = request->devicetoken(); auto iterator = primaries.find(id); if (iterator == primaries.end()) { response->set_checkresponsetype( tunnelbroker::CheckResponseType::PRIMARY_DOESNT_EXIST); } else if (deviceToken == iterator->second->deviceToken) { response->set_checkresponsetype( tunnelbroker::CheckResponseType::CURRENT_IS_PRIMARY); } else { // TODO: the background notif should be sent what cannot be really // simulated here I believe iterator->second->pingRequests.blockingWrite(true); // TODO: timeout currently set for 3s, to be changed const auto wait = std::chrono::seconds(3); folly::stop_watch<> watch; bool isActive; bool responseReceived = iterator->second->pingResponses.tryReadUntil( watch.getCheckpoint() + wait, isActive); if (responseReceived) { iterator->second->lastState = ping::ClientState::ONLINE; response->set_checkresponsetype( tunnelbroker::CheckResponseType::PRIMARY_ONLINE); } else { iterator->second->lastState = ping::ClientState::OFFLINE; response->set_checkresponsetype( tunnelbroker::CheckResponseType::PRIMARY_OFFLINE); } } return grpc::Status::OK; } grpc::Status TunnelBrokerServiceImpl::BecomeNewPrimaryDevice( grpc::ServerContext *context, const tunnelbroker::NewPrimaryRequest *request, tunnelbroker::NewPrimaryResponse *response) { const std::string id = request->userid(); const std::string deviceToken = request->devicetoken(); std::shared_ptr clientData = std::make_shared(id, deviceToken); auto iterator = primaries.find(id); if (iterator == primaries.end()) { primaries.insert_or_assign(id, clientData); response->set_success(true); return grpc::Status::OK; } if (iterator->second->deviceToken == deviceToken) { response->set_success(true); return grpc::Status::OK; } if (iterator->second->lastState == ping::ClientState::ONLINE) { response->set_success(false); } else { primaries.insert_or_assign(id, clientData); response->set_success(true); } return grpc::Status::OK; } -grpc::Status -TunnelBrokerServiceImpl::SendPong(grpc::ServerContext *context, - const tunnelbroker::PongRequest *request, - google::protobuf::Empty *response) { +grpc::Status TunnelBrokerServiceImpl::SendPong( + grpc::ServerContext *context, + const tunnelbroker::PongRequest *request, + google::protobuf::Empty *response) { const std::string id = request->userid(); const std::string deviceToken = request->devicetoken(); auto iterator = primaries.find(id); if (iterator == primaries.end() || iterator->second->deviceToken != deviceToken) { return grpc::Status::OK; } if (!iterator->second->pingRequests.isEmpty()) { bool value; iterator->second->pingRequests.blockingRead(value); iterator->second->pingResponses.write(true); } return grpc::Status::OK; } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.h b/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.h index 1d96fc3dc..1072297b7 100644 --- a/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.h +++ b/services/tunnelbroker/docker-server/contents/server/src/TunnelBrokerServiceImpl.h @@ -1,38 +1,39 @@ #pragma once #include #include #include #include #include "../_generated/tunnelbroker.grpc.pb.h" #include "../_generated/tunnelbroker.pb.h" #include "Tools.h" namespace comm { namespace network { class TunnelBrokerServiceImpl final : public tunnelbroker::TunnelBrokerService::Service { folly::ConcurrentHashMap> primaries; public: - grpc::Status - CheckIfPrimaryDeviceOnline(grpc::ServerContext *context, - const tunnelbroker::CheckRequest *request, - tunnelbroker::CheckResponse *response) override; - grpc::Status - BecomeNewPrimaryDevice(grpc::ServerContext *context, - const tunnelbroker::NewPrimaryRequest *request, - tunnelbroker::NewPrimaryResponse *response) override; - grpc::Status SendPong(grpc::ServerContext *context, - const tunnelbroker::PongRequest *request, - google::protobuf::Empty *response) override; + grpc::Status CheckIfPrimaryDeviceOnline( + grpc::ServerContext *context, + const tunnelbroker::CheckRequest *request, + tunnelbroker::CheckResponse *response) override; + grpc::Status BecomeNewPrimaryDevice( + grpc::ServerContext *context, + const tunnelbroker::NewPrimaryRequest *request, + tunnelbroker::NewPrimaryResponse *response) override; + grpc::Status SendPong( + grpc::ServerContext *context, + const tunnelbroker::PongRequest *request, + google::protobuf::Empty *response) override; }; } // namespace network } // namespace comm