-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: enforce strong consistency in channel_store #4740
Conversation
src/server/channel_store.cc
Outdated
@@ -152,7 +153,7 @@ unsigned ChannelStore::SendMessages(std::string_view channel, facade::ArgRange m | |||
it++; | |||
} | |||
}; | |||
shard_set->pool()->DispatchBrief(std::move(cb)); | |||
shard_set->pool()->AwaitBrief(std::move(cb)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This funciton is called inside DbSlice::DeleteExpiredStep which we asume do not preempt but now this breaks this assumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Then we can:
- Use
DispatchBrief
only when called byDeleteExpiredStep
- Ditch strong consistency when sending messages via publish and add a sleep to the test
I rather opt in for (1).
Other than that good catch!
Wdyt ? @adiholden
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add sleep to the test case as from what I understand the publish does not guaranties to send the message to all subscribers but that the message was queued to be sent.
Also when sending the message we use the conn->SendPubMessageAsync which I think also does not guarantee that after you call AwaitBrief here the massege was already sent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
@@ -124,8 +124,9 @@ unsigned ChannelStore::SendMessages(std::string_view channel, facade::ArgRange m | |||
|
|||
// Make sure none of the threads publish buffer limits is reached. We don't reserve memory ahead | |||
// and don't prevent the buffer from possibly filling, but the approach is good enough for | |||
// limiting fast producers. Most importantly, we can use DispatchBrief below as we block here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually dont understand the comment that was removed.. what is "we block here"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it because I replaced DispatchBrief with AwaitBrief
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but ignore, what is important is to the semantic right. I will adjust the comment
src/server/channel_store.cc
Outdated
ChannelStore::control_block.most_recent.load(memory_order_relaxed)); | ||
// Do not use memory_order_relaxed, we need to fetch the latest value of | ||
// the control block | ||
ChannelStore::control_block.most_recent.load()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use explicitly the memory order you want use
I am pretty sure we used |
@@ -2970,6 +2969,9 @@ async def test_cluster_sharded_pub_sub(df_factory: DflyInstanceFactory): | |||
consumer.ssubscribe("kostas") | |||
|
|||
await c_nodes[0].execute_command("SPUBLISH kostas hello") | |||
# We need to sleep cause we use DispatchBrief internally. Otherwise we can't really gurantee |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can not guarantee in any case. how do you know that the message arrive to the client? maybe he is in australia?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't. My point was, that after the client gets a reply from a published message, we know dragonfly processed and sent all the messages (even if there were no subscribers). I agree it's an overkill, sleep
is more than enough here
What I mean by "strongly consistent" is that after I get an |
Channel store uses a
read-copy-update
to distribute the changes of the channel store to all proactors. The problem is that we usememory_order_relaxed
to load the new pointer to the channel store for each proactor which*does not guarantee*
that we fetch the latest value of the channel store. Hence, the fix is to use sequencial consistency such to force fetch the latest value of the channel store. Should fix #4724Fixes #4659 and #4724