diff --git a/services/lib/src/DatabaseManagerBase.cpp b/services/lib/src/DatabaseManagerBase.cpp index c533f262a..1e34d42f6 100644 --- a/services/lib/src/DatabaseManagerBase.cpp +++ b/services/lib/src/DatabaseManagerBase.cpp @@ -1,105 +1,104 @@ #include "DatabaseManagerBase.h" #include "Item.h" #include #include #include #include #include #include #include -#include #include namespace comm { namespace network { namespace database { void DatabaseManagerBase::innerPutItem( std::shared_ptr item, const Aws::DynamoDB::Model::PutItemRequest &request) { const Aws::DynamoDB::Model::PutItemOutcome outcome = getDynamoDBClient()->PutItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } void DatabaseManagerBase::innerRemoveItem(const Item &item) { Aws::DynamoDB::Model::DeleteItemRequest request; request.SetTableName(item.getTableName()); PrimaryKeyDescriptor pk = item.getPrimaryKeyDescriptor(); PrimaryKeyValue primaryKeyValue = item.getPrimaryKeyValue(); request.AddKey( pk.partitionKey, Aws::DynamoDB::Model::AttributeValue(primaryKeyValue.partitionKey)); if (pk.sortKey != nullptr && primaryKeyValue.sortKey != nullptr) { request.AddKey( *pk.sortKey, Aws::DynamoDB::Model::AttributeValue(*primaryKeyValue.sortKey)); } const Aws::DynamoDB::Model::DeleteItemOutcome &outcome = getDynamoDBClient()->DeleteItem(request); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } void DatabaseManagerBase::innerBatchWriteItem( const std::string &tableName, const size_t &chunkSize, const size_t &backoffFirstRetryDelay, const size_t &maxBackoffTime, std::vector &writeRequests) { // Split write requests to chunks by chunkSize size and write // them by batch Aws::DynamoDB::Model::BatchWriteItemOutcome outcome; std::vector writeRequestsChunk; std::vector::iterator chunkPositionStart, chunkPositionEnd; for (size_t i = 0; i < writeRequests.size(); i += chunkSize) { chunkPositionStart = writeRequests.begin() + i; chunkPositionEnd = writeRequests.begin() + std::min(writeRequests.size(), i + chunkSize); writeRequestsChunk = std::vector( chunkPositionStart, chunkPositionEnd); Aws::DynamoDB::Model::BatchWriteItemRequest writeBatchRequest; writeBatchRequest.AddRequestItems(tableName, writeRequestsChunk); outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } size_t delayRetry, delayMs, jitterMs; while (!outcome.GetResult().GetUnprocessedItems().empty()) { if (delayMs == maxBackoffTime) { throw std::runtime_error( "InnerBatchWriteItem error: maximum wait time to put unprocessed " "items to DynamoDB is exceeded."); } jitterMs = std::rand() % 99 + 1; delayRetry++; delayMs = std::min( size_t(backoffFirstRetryDelay * std::pow(2, delayRetry) + jitterMs), maxBackoffTime); LOG(INFO) << "Waiting for a backoff " << delayMs << "ms delay before putting unprocessed items from batch write " "to DynamoDB"; std::this_thread::sleep_for(std::chrono::milliseconds(delayMs)); writeBatchRequest.SetRequestItems( outcome.GetResult().GetUnprocessedItems()); outcome = getDynamoDBClient()->BatchWriteItem(writeBatchRequest); if (!outcome.IsSuccess()) { throw std::runtime_error(outcome.GetError().GetMessage()); } } } } } // namespace database } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h index 998e8bb7a..729b1d546 100644 --- a/services/lib/src/server-base-reactors/ServerBidiReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerBidiReactorBase.h @@ -1,173 +1,172 @@ #pragma once #include "BaseReactor.h" #include #include -#include #include #include namespace comm { namespace network { namespace reactor { struct ServerBidiReactorStatus { grpc::Status status; bool sendLastResponse; ServerBidiReactorStatus( grpc::Status status = grpc::Status::OK, bool sendLastResponse = false) : status(status), sendLastResponse(sendLastResponse) { } }; // This is how this type of reactor works: // - repeat: // - read a request from the client // - write a response to the client // - terminate the connection template class ServerBidiReactorBase : public grpc::ServerBidiReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; Response response; protected: ServerBidiReactorStatus status; bool readingAborted = false; public: ServerBidiReactorBase(); // these methods come from the BaseReactor(go there for more information) void terminate(const grpc::Status &status) override; void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnDone() override; void OnReadDone(bool ok) override; void OnWriteDone(bool ok) override; void terminate(ServerBidiReactorStatus status); ServerBidiReactorStatus getStatus() const; void setStatus(const ServerBidiReactorStatus &status); // - argument request - request that was sent by the client and received by // the server in the current cycle // - argument response - response that will be sent to the client in the // current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr handleRequest(Request request, Response *response) = 0; }; template ServerBidiReactorBase::ServerBidiReactorBase() { this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerBidiReactorBase::terminate( const grpc::Status &status) { this->terminate(ServerBidiReactorStatus(status)); } template void ServerBidiReactorBase::OnDone() { this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template void ServerBidiReactorBase::terminate( ServerBidiReactorStatus status) { this->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->setStatus(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } if (this->getStatus().sendLastResponse) { this->StartWriteAndFinish( &this->response, grpc::WriteOptions(), this->getStatus().status); } else { this->Finish(this->getStatus().status); } this->statusHolder->state = ReactorState::TERMINATED; } template ServerBidiReactorStatus ServerBidiReactorBase::getStatus() const { return this->status; } template void ServerBidiReactorBase::setStatus( const ServerBidiReactorStatus &status) { this->status = status; } template void ServerBidiReactorBase::OnReadDone(bool ok) { if (!ok) { this->readingAborted = true; // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(ServerBidiReactorStatus(grpc::Status::OK)); return; } try { this->response = Response(); std::unique_ptr status = this->handleRequest(this->request, &this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what()))); } } template void ServerBidiReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(ServerBidiReactorStatus( grpc::Status(grpc::StatusCode::ABORTED, "write failed"))); return; } this->StartRead(&this->request); } template std::shared_ptr ServerBidiReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerReadReactorBase.h b/services/lib/src/server-base-reactors/ServerReadReactorBase.h index 98c6c8cf4..3b31cdd27 100644 --- a/services/lib/src/server-base-reactors/ServerReadReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerReadReactorBase.h @@ -1,121 +1,120 @@ #pragma once #include "BaseReactor.h" #include #include #include -#include #include #include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read N requests from the client // - write a final response to the client (may be empty) // - terminate the connection template class ServerReadReactorBase : public grpc::ServerReadReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Request request; protected: Response *response; public: ServerReadReactorBase(Response *response); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 void OnReadDone(bool ok) override; void terminate(const grpc::Status &status) override; void OnDone() override; // - argument request - data read from the client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr readRequest(Request request) = 0; }; template ServerReadReactorBase::ServerReadReactorBase( Response *response) : response(response) { this->statusHolder->state = ReactorState::RUNNING; this->StartRead(&this->request); } template void ServerReadReactorBase::OnReadDone(bool ok) { if (!ok) { // Ending a connection on the other side results in the `ok` flag being set // to false. It makes it impossible to detect a failure based just on the // flag. We should manually check if the data we received is valid this->terminate(grpc::Status::OK); return; } try { std::unique_ptr status = this->readRequest(this->request); if (status != nullptr) { this->terminate(*status); return; } } catch (std::runtime_error &e) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); return; } this->StartRead(&this->request); } template void ServerReadReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->Finish(this->statusHolder->getStatus()); this->statusHolder->state = ReactorState::TERMINATED; } template void ServerReadReactorBase::OnDone() { this->statusHolder->state = ReactorState::DONE; this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerReadReactorBase::getStatusHolder() { return this->statusHolder; } } // namespace reactor } // namespace network } // namespace comm diff --git a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h index b1bc2ab11..9a367bf8f 100644 --- a/services/lib/src/server-base-reactors/ServerWriteReactorBase.h +++ b/services/lib/src/server-base-reactors/ServerWriteReactorBase.h @@ -1,144 +1,143 @@ #pragma once #include "BaseReactor.h" #include #include #include -#include #include #include namespace comm { namespace network { namespace reactor { // This is how this type of reactor works: // - read a request from the client // - write N responses to the client // - terminate the connection template class ServerWriteReactorBase : public grpc::ServerWriteReactor, public BaseReactor { std::shared_ptr statusHolder = std::make_shared(); Response response; bool initialized = false; void nextWrite(); protected: // this is a const ref since it's not meant to be modified const Request &request; public: ServerWriteReactorBase(const Request *request); // this should be called explicitly right after the reactor is created void start(); // these methods come from the BaseReactor(go there for more information) void validate() override{}; void doneCallback() override{}; void terminateCallback() override{}; std::shared_ptr getStatusHolder() override; // these methods come from gRPC // https://github.com/grpc/grpc/blob/v1.39.x/include/grpcpp/impl/codegen/client_callback.h#L237 virtual void initialize(){}; void OnWriteDone(bool ok) override; void terminate(const grpc::Status &status); void OnDone() override; // - argument response - should be filled with data that will be sent to the // client in the current cycle // - returns status - if the connection is about to be // continued, nullptr should be returned. Any other returned value will // terminate the connection with a given status virtual std::unique_ptr writeResponse(Response *response) = 0; }; template void ServerWriteReactorBase::terminate( const grpc::Status &status) { this->statusHolder->setStatus(status); try { this->terminateCallback(); this->validate(); } catch (std::runtime_error &e) { this->statusHolder->setStatus( grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } if (!this->statusHolder->getStatus().ok()) { LOG(ERROR) << this->statusHolder->getStatus().error_message(); } if (this->statusHolder->state != ReactorState::RUNNING) { return; } this->Finish(this->statusHolder->getStatus()); this->statusHolder->state = ReactorState::TERMINATED; } template ServerWriteReactorBase::ServerWriteReactorBase( const Request *request) : request(*request) { // we cannot call this->start() here because it's going to call it on // the base class, not derived leading to the runtime error of calling // a pure virtual function // start has to be exposed as a public function and called explicitly // to initialize writing } template void ServerWriteReactorBase::nextWrite() { try { if (!this->initialized) { this->initialize(); this->initialized = true; } this->response = Response(); std::unique_ptr status = this->writeResponse(&this->response); if (status != nullptr) { this->terminate(*status); return; } this->StartWrite(&this->response); } catch (std::runtime_error &e) { LOG(ERROR) << "error: " << e.what(); this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, e.what())); } } template void ServerWriteReactorBase::start() { this->statusHolder->state = ReactorState::RUNNING; this->nextWrite(); } template void ServerWriteReactorBase::OnDone() { this->doneCallback(); // This looks weird but apparently it is okay to do this. More information: // https://phabricator.ashoat.com/D3246#87890 delete this; } template std::shared_ptr ServerWriteReactorBase::getStatusHolder() { return this->statusHolder; } template void ServerWriteReactorBase::OnWriteDone(bool ok) { if (!ok) { this->terminate(grpc::Status(grpc::StatusCode::INTERNAL, "writing error")); return; } this->nextWrite(); } } // namespace reactor } // namespace network } // namespace comm