diff --git a/services/tunnelbroker/test/AmqpManagerTest.cpp b/services/tunnelbroker/test/AmqpManagerTest.cpp --- a/services/tunnelbroker/test/AmqpManagerTest.cpp +++ b/services/tunnelbroker/test/AmqpManagerTest.cpp @@ -80,3 +80,36 @@ << receivedMessage.payload; AmqpManager::getInstance().ack(receivedMessage.deliveryTag); } + +TEST_F(AmqpManagerTest, MultipleThreadsMessagesSendingStressTest) { + const size_t MESSAGES_NUMBER = 10; + const size_t THREADS_NUMBER = 100; + const std::string toDeviceID = + "mobile:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); + + std::vector sendingThreads; + for (size_t i = 0; i < THREADS_NUMBER; ++i) { + sendingThreads.push_back(std::thread([toDeviceID, MESSAGES_NUMBER]() { + for (size_t i = 0; i < MESSAGES_NUMBER; ++i) { + const std::string messageID = tools::generateUUID(); + const std::string fromDeviceID = + "web:" + tools::generateRandomString(DEVICEID_CHAR_LENGTH); + const std::string payload = tools::generateRandomString(512); + const database::MessageItem messageItem{ + messageID, fromDeviceID, toDeviceID, payload, ""}; + EXPECT_TRUE(AmqpManager::getInstance().send(&messageItem)); + } + })); + } + + std::vector receivedMessage; + for (size_t i = 0; i < MESSAGES_NUMBER * THREADS_NUMBER; ++i) { + receivedMessage.push_back(DeliveryBroker::getInstance().pop(toDeviceID)); + AmqpManager::getInstance().ack(receivedMessage.back().deliveryTag); + } + for (std::thread &thread : sendingThreads) { + thread.join(); + } + EXPECT_TRUE(DeliveryBroker::getInstance().isEmpty(toDeviceID)); + EXPECT_EQ(receivedMessage.size(), MESSAGES_NUMBER * THREADS_NUMBER); +}