diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.cpp b/services/tunnelbroker/src/Amqp/AmqpManager.cpp index 648075c2f..db78d4c18 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.cpp +++ b/services/tunnelbroker/src/Amqp/AmqpManager.cpp @@ -1,144 +1,144 @@ #include "AmqpManager.h" #include "ConfigManager.h" #include "Constants.h" #include "DeliveryBroker.h" #include "Tools.h" #include namespace comm { namespace network { AmqpManager &AmqpManager::getInstance() { static AmqpManager instance; return instance; } void AmqpManager::connectInternal() { const std::string amqpUri = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_URI); const std::string tunnelbrokerID = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_TUNNELBROKER_ID); const std::string fanoutExchangeName = config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE); std::cout << "AMQP: Connecting to " << amqpUri << std::endl; auto *loop = uv_default_loop(); AMQP::LibUvHandler handler(loop); AMQP::TcpConnection connection(&handler, AMQP::Address(amqpUri)); this->amqpChannel = std::make_unique(&connection); this->amqpChannel->onError([this](const char *message) { std::cout << "AMQP: channel error: " << message << ", will try to reconnect" << std::endl; this->amqpReady = false; }); AMQP::Table arguments; arguments["x-message-ttl"] = AMQP_MESSAGE_TTL; arguments["x-expires"] = AMQP_QUEUE_TTL; this->amqpChannel->declareExchange(fanoutExchangeName, AMQP::fanout); this->amqpChannel->declareQueue(tunnelbrokerID, AMQP::durable, arguments) .onSuccess([this, tunnelbrokerID, fanoutExchangeName]( const std::string &name, uint32_t messagecount, uint32_t consumercount) { std::cout << "AMQP: Queue " << name << " created" << std::endl; this->amqpChannel->bindQueue(fanoutExchangeName, tunnelbrokerID, "") .onError([this, tunnelbrokerID, fanoutExchangeName]( const char *message) { std::cout << "AMQP: Failed to bind queue: " << tunnelbrokerID << " to exchange: " << fanoutExchangeName << std::endl; this->amqpReady = false; }); this->amqpReady = true; this->amqpChannel->consume(tunnelbrokerID) .onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { try { AMQP::Table headers = message.headers(); const std::string payload(message.body()); const std::string toDeviceID(headers[AMQP_HEADER_TO_DEVICEID]); const std::string fromDeviceID( headers[AMQP_HEADER_FROM_DEVICEID]); std::cout << "AMQP: Message consumed for deviceID: " << toDeviceID << std::endl; DeliveryBroker::getInstance().push( deliveryTag, toDeviceID, fromDeviceID, payload); } catch (const std::exception &e) { std::cout << "AMQP: Message parsing exception: " << e.what() << std::endl; } }) .onError([](const char *message) { std::cout << "AMQP: Error on message consume: " << message << std::endl; }); }) .onError([](const char *message) { throw std::runtime_error( "AMQP: Queue creation error: " + std::string(message)); }); uv_run(loop, UV_RUN_DEFAULT); }; void AmqpManager::connect() { while (true) { - long long currentTimestamp = tools::getCurrentTimestamp(); + int64_t currentTimestamp = tools::getCurrentTimestamp(); if (this->lastConnectionTimestamp && currentTimestamp - this->lastConnectionTimestamp < AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) { throw std::runtime_error( "AMQP reconnection attempt interval too short, tried to reconnect " "after " + std::to_string(currentTimestamp - this->lastConnectionTimestamp) + "ms, the shortest allowed interval is " + std::to_string(AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL) + "ms"); } this->lastConnectionTimestamp = currentTimestamp; this->connectInternal(); } } bool AmqpManager::send( std::string toDeviceID, std::string fromDeviceID, std::string payload) { if (!this->amqpReady) { std::cout << "AMQP: Message send error: channel not ready" << std::endl; return false; } try { AMQP::Envelope env(payload.c_str(), payload.size()); AMQP::Table headers; headers[AMQP_HEADER_FROM_DEVICEID] = fromDeviceID; headers[AMQP_HEADER_TO_DEVICEID] = toDeviceID; // Set delivery mode to: Durable (2) env.setDeliveryMode(2); env.setHeaders(std::move(headers)); this->amqpChannel->publish( config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_AMQP_FANOUT_EXCHANGE), "", env); } catch (std::runtime_error &e) { std::cout << "AMQP: Error while publishing message: " << e.what() << std::endl; return false; } return true; }; void AmqpManager::ack(uint64_t deliveryTag) { if (!this->amqpReady) { std::cout << "AMQP: Message ACK error: channel not ready" << std::endl; return; } this->amqpChannel->ack(deliveryTag); } } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Amqp/AmqpManager.h b/services/tunnelbroker/src/Amqp/AmqpManager.h index 7743218f3..875a0f73d 100644 --- a/services/tunnelbroker/src/Amqp/AmqpManager.h +++ b/services/tunnelbroker/src/Amqp/AmqpManager.h @@ -1,33 +1,33 @@ #pragma once #include #include #include #include #include namespace comm { namespace network { class AmqpManager { AmqpManager(){}; std::unique_ptr amqpChannel; std::atomic amqpReady; - std::atomic lastConnectionTimestamp; + std::atomic lastConnectionTimestamp; void connectInternal(); public: static AmqpManager &getInstance(); void connect(); bool send(std::string toDeviceID, std::string fromDeviceID, std::string payload); void ack(uint64_t deliveryTag); AmqpManager(AmqpManager const &) = delete; void operator=(AmqpManager const &) = delete; }; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Constants.h b/services/tunnelbroker/src/Constants.h index b11441dd5..f39ce1168 100644 --- a/services/tunnelbroker/src/Constants.h +++ b/services/tunnelbroker/src/Constants.h @@ -1,52 +1,52 @@ #pragma once #include #include #include namespace comm { namespace network { // AWS DynamoDB const std::string DEVICE_SESSIONS_TABLE_NAME = "tunnelbroker-device-session"; const std::string DEVICE_SESSIONS_VERIFICATION_MESSAGES_TABLE_NAME = "tunnelbroker-verification-message"; const std::string DEVICE_PUBLIC_KEY_TABLE_NAME = "tunnelbroker-public-key"; const std::string MESSAGES_TABLE_NAME = "tunnelbroker-message"; // Sessions const size_t SIGNATURE_REQUEST_LENGTH = 64; const size_t SESSION_ID_LENGTH = 64; const size_t SESSION_RECORD_TTL = 30 * 24 * 3600; // 30 days const size_t SESSION_SIGN_RECORD_TTL = 24 * 3600; // 24 hours const std::regex SESSION_ID_FORMAT_REGEX( "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); // gRPC Server const std::string SERVER_LISTEN_ADDRESS = "0.0.0.0:50051"; // AMQP (RabbitMQ) const std::string AMQP_FANOUT_EXCHANGE_NAME = "allBrokers"; // message TTL const size_t AMQP_MESSAGE_TTL = 300 * 1000; // 5 min // queue TTL in case of no consumers (tunnelbroker is down) const size_t AMQP_QUEUE_TTL = 24 * 3600 * 1000; // 24 hours // routing message headers name const std::string AMQP_HEADER_FROM_DEVICEID = "fromDeviceid"; const std::string AMQP_HEADER_TO_DEVICEID = "toDeviceid"; -const long long AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = +const int64_t AMQP_SHORTEST_RECONNECTION_ATTEMPT_INTERVAL = 1000 * 60; // 1 min // DeviceID const size_t DEVICEID_CHAR_LENGTH = 64; const std::regex DEVICEID_FORMAT_REGEX( "^(ks|mobile|web):[a-zA-Z0-9]{" + std::to_string(DEVICEID_CHAR_LENGTH) + "}$"); // Config const std::string CONFIG_FILE_PATH = std::string(std::getenv("HOME")) + "/tunnelbroker/tunnelbroker.ini"; } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DeviceSessionItem.cpp b/services/tunnelbroker/src/Database/DeviceSessionItem.cpp index 412d2fd06..eaa743355 100644 --- a/services/tunnelbroker/src/Database/DeviceSessionItem.cpp +++ b/services/tunnelbroker/src/Database/DeviceSessionItem.cpp @@ -1,124 +1,124 @@ #include "DeviceSessionItem.h" #include "ConfigManager.h" #include "Tools.h" #include namespace comm { namespace network { namespace database { const std::string DeviceSessionItem::FIELD_SESSION_ID = "SessionId"; const std::string DeviceSessionItem::FIELD_DEVICE_ID = "DeviceId"; const std::string DeviceSessionItem::FIELD_PUBKEY = "PubKey"; const std::string DeviceSessionItem::FIELD_NOTIFY_TOKEN = "NotifyToken"; const std::string DeviceSessionItem::FIELD_DEVICE_TYPE = "DeviceType"; const std::string DeviceSessionItem::FIELD_APP_VERSION = "AppVersion"; const std::string DeviceSessionItem::FIELD_DEVICE_OS = "DeviceOS"; const std::string DeviceSessionItem::FIELD_CHECKPOINT_TIME = "CheckpointTime"; const std::string DeviceSessionItem::FIELD_EXPIRE = "Expire"; DeviceSessionItem::DeviceSessionItem( const std::string sessionID, const std::string deviceID, const std::string pubKey, const std::string notifyToken, const std::string deviceType, const std::string appVersion, const std::string deviceOs) : sessionID(sessionID), deviceID(deviceID), pubKey(pubKey), notifyToken(notifyToken), deviceType(deviceType), appVersion(appVersion), deviceOs(deviceOs) { this->validate(); } DeviceSessionItem::DeviceSessionItem(const AttributeValues &itemFromDB) { this->assignItemFromDatabase(itemFromDB); } void DeviceSessionItem::validate() const { if (!tools::validateSessionID(this->sessionID)) { throw std::runtime_error("Error: SessionID format is wrong."); } if (!tools::validateDeviceID(this->deviceID)) { throw std::runtime_error("Error: DeviceID format is wrong."); } tools::checkIfNotEmpty("pubKey", this->pubKey); tools::checkIfNotEmpty("notifyToken", this->notifyToken); tools::checkIfNotEmpty("deviceType", this->deviceType); tools::checkIfNotEmpty("appVersion", this->appVersion); tools::checkIfNotEmpty("deviceOs", this->deviceOs); } void DeviceSessionItem::assignItemFromDatabase( const AttributeValues &itemFromDB) { try { this->sessionID = itemFromDB.at(DeviceSessionItem::FIELD_SESSION_ID).GetS(); this->deviceID = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_ID).GetS(); this->pubKey = itemFromDB.at(DeviceSessionItem::FIELD_PUBKEY).GetS(); this->notifyToken = itemFromDB.at(DeviceSessionItem::FIELD_NOTIFY_TOKEN).GetS(); this->deviceType = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_TYPE).GetS(); this->appVersion = itemFromDB.at(DeviceSessionItem::FIELD_APP_VERSION).GetS(); this->deviceOs = itemFromDB.at(DeviceSessionItem::FIELD_DEVICE_OS).GetS(); this->checkpointTime = std::stoll( std::string( itemFromDB.at(DeviceSessionItem::FIELD_CHECKPOINT_TIME).GetS()) .c_str()); } catch (std::logic_error &e) { throw std::runtime_error( "Invalid device session database value " + std::string(e.what())); } this->validate(); } std::string DeviceSessionItem::getTableName() const { return config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DYNAMODB_SESSIONS_TABLE); } std::string DeviceSessionItem::getPrimaryKey() const { return DeviceSessionItem::FIELD_SESSION_ID; } std::string DeviceSessionItem::getSessionID() const { return this->sessionID; } std::string DeviceSessionItem::getDeviceID() const { return this->deviceID; } std::string DeviceSessionItem::getPubKey() const { return this->pubKey; } std::string DeviceSessionItem::getNotifyToken() const { return this->notifyToken; } std::string DeviceSessionItem::getDeviceType() const { return this->deviceType; } std::string DeviceSessionItem::getAppVersion() const { return this->appVersion; } std::string DeviceSessionItem::getDeviceOs() const { return this->deviceOs; } -long long DeviceSessionItem::getCheckpointTime() const { +int64_t DeviceSessionItem::getCheckpointTime() const { return this->checkpointTime; } } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Database/DeviceSessionItem.h b/services/tunnelbroker/src/Database/DeviceSessionItem.h index ea5979120..eb17277b2 100644 --- a/services/tunnelbroker/src/Database/DeviceSessionItem.h +++ b/services/tunnelbroker/src/Database/DeviceSessionItem.h @@ -1,61 +1,61 @@ #pragma once #include "Item.h" #include namespace comm { namespace network { namespace database { class DeviceSessionItem : public Item { std::string sessionID; std::string deviceID; std::string pubKey; std::string notifyToken; std::string deviceType; std::string appVersion; std::string deviceOs; - long long checkpointTime = 0; + int64_t checkpointTime = 0; void validate() const override; public: static const std::string FIELD_SESSION_ID; static const std::string FIELD_DEVICE_ID; static const std::string FIELD_PUBKEY; static const std::string FIELD_NOTIFY_TOKEN; static const std::string FIELD_DEVICE_TYPE; static const std::string FIELD_APP_VERSION; static const std::string FIELD_DEVICE_OS; static const std::string FIELD_CHECKPOINT_TIME; static const std::string FIELD_EXPIRE; std::string getPrimaryKey() const override; std::string getTableName() const override; std::string getSessionID() const; std::string getDeviceID() const; std::string getPubKey() const; std::string getNotifyToken() const; std::string getDeviceType() const; std::string getAppVersion() const; std::string getDeviceOs() const; - long long getCheckpointTime() const; + int64_t getCheckpointTime() const; DeviceSessionItem() { } DeviceSessionItem( const std::string sessionID, const std::string deviceID, const std::string pubKey, const std::string notifyToken, const std::string deviceType, const std::string appVersion, const std::string deviceOs); DeviceSessionItem(const AttributeValues &itemFromDB); void assignItemFromDatabase(const AttributeValues &itemFromDB) override; }; } // namespace database } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/Tools.cpp b/services/tunnelbroker/src/Tools/Tools.cpp index 2bc665428..1138a76f9 100644 --- a/services/tunnelbroker/src/Tools/Tools.cpp +++ b/services/tunnelbroker/src/Tools/Tools.cpp @@ -1,87 +1,87 @@ #include "Tools.h" #include "ConfigManager.h" #include "Constants.h" #include #include #include #include #include #include #include #include namespace comm { namespace network { namespace tools { std::string generateRandomString(std::size_t length) { const std::string CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; thread_local std::random_device generator; std::uniform_int_distribution<> distribution(0, CHARACTERS.size() - 1); std::string random_string; for (std::size_t i = 0; i < length; ++i) { random_string += CHARACTERS[distribution(generator)]; } return random_string; } -long long getCurrentTimestamp() { +int64_t getCurrentTimestamp() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()) .count(); } bool validateDeviceID(std::string deviceID) { try { static const std::regex deviceIDKeyserverRegexp("^ks:.*"); if (std::regex_match(deviceID, deviceIDKeyserverRegexp)) { return ( deviceID == config::ConfigManager::getInstance().getParameter( config::ConfigManager::OPTION_DEFAULT_KEYSERVER_ID)); } return std::regex_match(deviceID, DEVICEID_FORMAT_REGEX); } catch (const std::exception &e) { std::cout << "Tools: " << "Got an exception at `validateDeviceID`: " << e.what() << std::endl; return false; } } std::string generateUUID() { thread_local boost::uuids::random_generator random_generator; return boost::uuids::to_string(random_generator()); } bool validateSessionID(std::string sessionID) { try { return std::regex_match(sessionID, SESSION_ID_FORMAT_REGEX); } catch (const std::exception &e) { std::cout << "Tools: " << "Got an exception at `validateSessionId`: " << e.what() << std::endl; return false; } } void checkIfNotEmpty(std::string fieldName, std::string stringToCheck) { if (stringToCheck.empty()) { throw std::runtime_error( "Error: Required text field " + fieldName + " is empty."); } } void checkIfNotZero(std::string fieldName, uint64_t numberToCheck) { if (numberToCheck == 0) { throw std::runtime_error( "Error: Required number " + fieldName + " is zero."); } } } // namespace tools } // namespace network } // namespace comm diff --git a/services/tunnelbroker/src/Tools/Tools.h b/services/tunnelbroker/src/Tools/Tools.h index eb73aa486..6de719125 100644 --- a/services/tunnelbroker/src/Tools/Tools.h +++ b/services/tunnelbroker/src/Tools/Tools.h @@ -1,20 +1,20 @@ #pragma once #include #include namespace comm { namespace network { namespace tools { std::string generateRandomString(std::size_t length); -long long getCurrentTimestamp(); +int64_t getCurrentTimestamp(); bool validateDeviceID(std::string deviceID); std::string generateUUID(); bool validateSessionID(std::string sessionID); void checkIfNotEmpty(std::string fieldName, std::string stringToCheck); void checkIfNotZero(std::string fieldName, uint64_t numberToCheck); } // namespace tools } // namespace network } // namespace comm