Page MenuHomePhabricator

[services] Tunnelbroker - Fix delivering messages one by one and remove mutex.
ClosedPublic

Authored by max on Feb 23 2022, 5:36 PM.
Tags
None
Referenced Files
Unknown Object (File)
Thu, Jan 16, 11:09 PM
Unknown Object (File)
Thu, Jan 16, 10:46 PM
Unknown Object (File)
Thu, Jan 16, 10:32 PM
Unknown Object (File)
Thu, Jan 16, 10:29 PM
Unknown Object (File)
Thu, Jan 16, 10:28 PM
Unknown Object (File)
Thu, Jan 16, 10:28 PM
Unknown Object (File)
Thu, Jan 16, 10:05 PM
Unknown Object (File)
Thu, Jan 16, 9:58 PM

Details

Summary

Using messageID to remove a certain message from the delivery queue, instead of using a receiver deviceID to remove all messages for the device.

1. Message structure changed

To achieve this the DeliveryBroker internal message structure needs to be changed from the current:

map[reciever deviceID] → vector<Message information>

to:

map[receiver deviceID] → Queue → Message information.

2. Conditional variables+mutexes became folly:: UnboundedQueue.

Conditional variables and mutex were removed in favor of folly:: UnboundedQueue

3. Remove is became pop as we are using a queue instead of a map.

The remove function changed to pop to remove the first message from the queue.

4. Function names usage refactoring due to the above changes.

Usage of the functions is also refactored in AMQP and gRPC handlers.

Review notes:

These diffs changes are closely interconnected with each other and it's effective for review to use one diff instead of splitting it. This is a smaller chunk of changes that can be buildable. Removing mutexes is closely interconnected with the structural changes that's why it is related to two tasks simultaneously.

Related linear tasks:

Test Plan

Run yarn run-tunnelbroker-service and successfully built the service.
Messages are delivered one by one and removed from the queue.
Passed unit tests on D3645.

Diff Detail

Repository
rCOMM Comm
Branch
message_id_add
Lint
No Lint Coverage
Unit
No Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

It sounds like a good idea, but there's one important issue. By using unordered_map we're losing an information about ordering of the messages. I assume, that the delivery broker should work like FIFO, but after this change the ordering becomes random. It might be also possible that we don't need to keep the ordering... but that sounds unlikely.

This revision now requires changes to proceed.Feb 25 2022, 4:14 AM
In D3279#88399, @palys-swm wrote:

It sounds like a good idea, but there's one important issue. By using unordered_map we're losing an information about ordering of the messages. I assume, that the delivery broker should work like FIFO, but after this change the ordering becomes random. It might be also possible that we don't need to keep the ordering... but that sounds unlikely.

Good catch! I've used unorder_map for the performance reason, but it looks like a bigger issue.

It might be also possible that we don't need to keep the ordering... but that sounds unlikely.

Yes, we need to keep the order of messages. Will fix it.

Switching to MPMCQueue and remove mutex and conditional variables.

max retitled this revision from [services] Tunnelbroker - Fix delivering messages one by one to [services] Tunnelbroker - Fix delivering messages one by one and remove mutex..Apr 4 2022, 7:13 AM
In D3279#88399, @palys-swm wrote:

It sounds like a good idea, but there's one important issue. By using unordered_map we're losing an information about ordering of the messages. I assume, that the delivery broker should work like FIFO, but after this change the ordering becomes random. It might be also possible that we don't need to keep the ordering... but that sounds unlikely.

Switching to ConcurrentHashmap<MPMCQueue> in this case we are preserving the order of the messages and can add and pop messages by the pointer.

Fixing build error due to pointers.

Switch to use UnboundedQueue instead of MPMCQueue.

tomek requested changes to this revision.Apr 6 2022, 6:34 AM

Overall, it should be a stack instead of a diff. We could e.g. start with storing message id, then introduce the queue and then remove synchronization. But for now, let's keep it.

services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
18–27 ↗(On Diff #11089)

Is it possible to assign this->messagesMap.find(toDeviceID) to a variable so that we don't need to call find twice?

44–47 ↗(On Diff #11089)

It would be a good idea to explain why do we insert a new queue when we can't find one (it looks like we do that so that we can call blocking pop from TunnelbrokerServiceImpl and why we're not removing the queue when it becomes empty (we're calling erase from TunnelbrokerServiceImpl)

services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
212–214 ↗(On Diff #11089)

Are you sure this is correct? Do we want to erase only when the queue is not empty? Shouldn't we inverse this condition?

This revision now requires changes to proceed.Apr 6 2022, 6:34 AM

Fix erase condition. Fix tools new namespace error.

Following the discussion at ENG-609, a max size check for the delivery broker queue was added.

max marked 3 inline comments as done.
max added inline comments.
services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
18–27 ↗(On Diff #11089)

Is it possible to assign this->messagesMap.find(toDeviceID) to a variable so that we don't need to call find twice?

Thanks for the catch here. I've tried this in the playground, but it seems that holding the iterator here from this->messagesMap.find(toDeviceID) or holding a pointer to the unique pointer to the queue from this->messagesMap.find(toDeviceID)->second is like overengineering here for the second call of the find and seems no any performance benefits from it.

Btw in this case it's thread-safe to hold an iterator.

So, I'm not sure it is worth it.

44–47 ↗(On Diff #11089)

It would be a good idea to explain why do we insert a new queue when we can't find one (it looks like we do that so that we can call blocking pop from TunnelbrokerServiceImpl

You are right here. We are inserting the queue for the blocking wait in case we listen first before the insert happens. That's why we need the queue to exist and insert it.

why we're not removing the queue when it becomes empty (we're calling erase from TunnelbrokerServiceImpl)

The use case for the erase is when we need to remove the whole queue for the certain deviceID without popping each message from the upper layer (grpc handler, database handler).

why we're not removing the queue when it becomes empty

I think the control of erasing here must be at the upper level, which gives us the flexibility in this case for the optimization and not "insert-remove" the queue every time when the queue is empty for a certain device. Like, remove it when the device logs off.

services/tunnelbroker/docker-server/contents/server/src/Service/TunnelbrokerServiceImpl.cpp
212–214 ↗(On Diff #11089)

Are you sure this is correct? Do we want to erase only when the queue is not empty? Shouldn't we inverse this condition?

Fixed, great thanks for this catch!

tomek requested changes to this revision.Apr 19 2022, 2:13 AM
tomek added inline comments.
services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
44–47 ↗(On Diff #11089)

It would be a good idea to explain why do we insert a new queue when we can't find one (it looks like we do that so that we can call blocking pop from TunnelbrokerServiceImpl

You are right here. We are inserting the queue for the blocking wait in case we listen first before the insert happens. That's why we need the queue to exist and insert it.

why we're not removing the queue when it becomes empty (we're calling erase from TunnelbrokerServiceImpl)

The use case for the erase is when we need to remove the whole queue for the certain deviceID without popping each message from the upper layer (grpc handler, database handler).

why we're not removing the queue when it becomes empty

I think the control of erasing here must be at the upper level, which gives us the flexibility in this case for the optimization and not "insert-remove" the queue every time when the queue is empty for a certain device. Like, remove it when the device logs off.

Please explain that in the code comments - this isn't too obvious what happens here and we should save the maintainers time figuring out what's going on.

This revision now requires changes to proceed.Apr 19 2022, 2:13 AM
max marked 3 inline comments as done.

A comment about pop implementation was added.

max added inline comments.
services/tunnelbroker/docker-server/contents/server/src/DeliveryBroker/DeliveryBroker.cpp
44–47 ↗(On Diff #11089)

Please explain that in the code comments - this isn't too obvious what happens here and we should save the maintainers time figuring out what's going on.

The comment about the pop implementation and adding an empty queue makes sense to me. I've added it.

Not sure we need comments about erase because it's a one-line function with the usage implementation in the code.

max marked an inline comment as done.

Droping back to MPMCQueue due to the ENG-609 discussion.

tomek added 1 blocking reviewer(s): karol.

Looks ok! It might be a good idea for @karol-bisztyga to also take a look

I'm confused by the description here.

Using messageID to remove a certain message from the delivery queue, instead of using a receiver deviceID to remove all messages for the device.

We don't use message id for removing (or did I miss something?): erase uses deviceID.

It's good that you got rid of cond vars etc but can't we delete messageID? Why exactly do we need it?

If we really need it, I believe we can move the code that generates it to the inner scope as I suggested in the inline comment.

With all that said, I may be missing a bigger picture here so feel free to explain what exactly is going on. Thanks!

Back to your queue.

services/tunnelbroker/src/Amqp/AmqpManager.h
26–27 ↗(On Diff #11663)

To be honest, it would be more logical

services/tunnelbroker/src/DeliveryBroker/DeliveryBroker.cpp
45 ↗(On Diff #11663)
services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
157 ↗(On Diff #11663)

Do we need this here? Cannot we generate this id in the send's scope?

This revision now requires changes to proceed.Apr 25 2022, 2:28 AM

Fix nit in the comment, send() parameters order was changed.

In D3279#106203, @karol-bisztyga wrote:

I'm confused by the description here.

Using messageID to remove a certain message from the delivery queue, instead of using a receiver deviceID to remove all messages for the device.

We don't use message id for removing (or did I miss something?): erase uses deviceID.

Yes, we don't. MessageID is inside the queue structure and we are popping the message from it without knowing the messageID. We need this messageID later (after it was popped) for removing the certainly delivered message from the database and using it as a unique message identifier.

It's good that you got rid of cond vars etc but can't we delete messageID? Why exactly do we need it?

If we really need it, I believe we can move the code that generates it to the inner scope as I suggested in the inline comment.

Reply to the comment about the scope.
We removed the whole queue by the erase method to clean the queue for the certain deviceID without popping the message on-by-one from the queue and free the memory.

Hope I've added additional context here...

services/tunnelbroker/src/Amqp/AmqpManager.h
26–27 ↗(On Diff #11663)

To be honest, it would be more logical

Let's go with this parameters order. Changed it.

services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
157 ↗(On Diff #11663)

Do we need this here? Cannot we generate this id in the send's scope?

We need this messageID generated at this scope for future message storing in the database. I think moving it inside and then using some getter is over-engineering here because we need this ID for the database storing item and they must be the same.

Thanks for clarifying, please answer that last question so I can fully understand.

services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
157 ↗(On Diff #11663)

for future message storing in the database

So this "message storing" is going to be implemented in this scope here, right?

This revision is now accepted and ready to land.Apr 25 2022, 4:36 AM
In D3279#106247, @karol-bisztyga wrote:

Thanks for clarifying, please answer that last question so I can fully understand.

Thanks for the fast reply ;)

services/tunnelbroker/src/Service/TunnelbrokerServiceImpl.cpp
157 ↗(On Diff #11663)

for future message storing in the database

So this "message storing" is going to be implemented in this scope here, right?

Yes, right. That's why we need this generated messageID at this scope.

max marked an inline comment as done.

Rebase on master.