-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: implement sharded pub/sub #4518
Conversation
src/server/main_service.cc
Outdated
|
||
// Sharded pub sub | ||
// Command form: SPUBLISH shardchannel message | ||
if (cid->name() == "SPUBLISH") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to bypass DetermineKeys
only for this specific case.
Signed-off-by: kostas <kostas@dragonflydb.io>
.github/workflows/ci.yml
Outdated
FLAGS_fiber_safety_margin=4096 FLAGS_list_experimental_v2=true timeout 20m ctest -V -L DFLY | ||
FLAGS_fiber_safety_margin=4096 FLAGS_list_experimental_v2=true timeout 20m ctest -V -L DFLY -E allocation_tracker | ||
# Run allocation tracker test separately. It generates a TON of logs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocation tracker test generates a gazillion number of logs. I silenced it that way by excluding the test from running with alsologtostderr flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool is_sharded_pub_sub_ = false; | ||
bool is_p_pub_sub_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how would an enum help here? A variation of these booleans can be true/false depending on the command so not sure how an enum would help here.
IMO these should be a c-style
bitfield but CommandRegistry is kinda fixed size so the memory footprint of this is insignificant anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand we can't have is_p_pub_sub_ and is_sharded_pub_sub_ simultaneously maybe is_pub_sub_ too. So we can have enum PubSubT {NONE, P_PUB_SUB, SHARDED_PUB_SUB, PUB_SUB}
is_p_pub_sub_ = true; | ||
} else if (name_ == "SPUBLISH" || name_ == "SSUBSCRIBE" || name_ == "SUNSUBSCRIBE") { | ||
is_sharded_pub_sub_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I remember we can rename commands. is it applicable to this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! Not really, it does not affect us.
CommandRegistry has a hash map
for commands and a hash map
for renamed commands. When we do a lookup, we do so via Service::FindCmd
which looks on both of those hash maps.
Furthermore, when we register a command in the registry via >>
operator we handle renamed commands. As CommandId is constructed before we emplace it in the registry this part shall have no issues with renamed commands
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However this might have https://github.com/dragonflydb/dragonfly/pull/4518/files#r1932116546!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/server/main_service.cc
Outdated
OpResult<KeyIndex> DetermineClusterKeys(const CommandId* cid, CmdArgList args) { | ||
if (!cid->IsShardedPSub()) { | ||
return DetermineKeys(cid, args); | ||
} | ||
|
||
// Sharded pub sub | ||
// Command form: SPUBLISH shardchannel message | ||
if (cid->name() == "SPUBLISH") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this code inside DetermineKeys. Because the name isn\t really correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is we need to bypass DetermineKeys
behaviour only for this particular case so we can't really move it there.
I agree the naming is not great. Any ideas of a better name ? Otherwise I can think of something 🤷
.github/workflows/ci.yml
Outdated
FLAGS_fiber_safety_margin=4096 FLAGS_list_experimental_v2=true timeout 20m ctest -V -L DFLY -E allocation_tracker_test | ||
# Run allocation tracker test separately. It generates a TON of logs | ||
FLAGS_fiber_safety_margin=4096 FLAGS_force_epoll=true GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 timeout 5m ./allocation_tracker_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we don't!
Resolves #3001 by adding the basic functionality for sharded pub/sub for Dragonfly cluster.
What's left for sharded pub sub is described in #4517 but the gist is that we need to evict subscribed connections after a slot migration. This will be added in a follow up PR.