-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: simplify transaction code #4602
Conversation
if (head->NotifySuspended(owner_->committed_txid(), sid, key)) { | ||
DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key << " committed txid " | ||
<< owner_->committed_txid(); | ||
if (head->NotifySuspended(sid, key)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed owner_->committed_txid()
argument that has not been used there
@@ -584,10 +584,10 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { | |||
// If any of the following flags are present, we are guaranteed to run in this function: | |||
// 1. AWAKED_Q -> Blocking transactions are executed immediately after waking up, they don't | |||
// occupy a place in txq and have highest priority | |||
// 2. SUSPENDED_Q -> Suspended shards are run to clean up and finalize blocking keys | |||
// 2. WAS_SUSPENDED -> Suspended transactions are run to clean up and finalize blocking keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to WAS_SUSPENDED
to denote the suspension event happened some time in the past
src/server/transaction.cc
Outdated
@@ -577,7 +577,7 @@ void Transaction::PrepareMultiForScheduleSingleHop(Namespace* ns, ShardId sid, D | |||
} | |||
|
|||
// Runs in the dbslice thread. Returns true if the transaction concluded. | |||
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { | |||
bool Transaction::RunInShard(EngineShard* shard) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing txq_ooo
and use sd.local_mask
for this (historically OOO was determined by the coordinator but now we do it per shard)
FinishHop(); | ||
|
||
// And then poll execution to continue processing the queued transactions. | ||
shard->PollExecution("unwatchcb", nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
swapping the order
bc->NotifyPending(); | ||
|
||
shard->PollExecution("unlockmulti", nullptr); | ||
shard->FinalizeMulti(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code moved as it into FinalizeMulti
@@ -1432,7 +1435,6 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view | |||
CHECK(it != args.cend()); | |||
|
|||
// Change state to awaked and store index of awakened key | |||
sd.local_mask &= ~SUSPENDED_Q; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we keep WAS_SUSPENDED forever.
bool was_suspended = sd.local_mask & SUSPENDED_Q; | ||
// was_suspended is true meaning that this transaction was suspended and then | ||
// it was woken up by another transaction in either this thread or a key in another thread. | ||
// if awaked_prerun is true - it means it was woken up by a transaction in this thread, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you give example for a transaction that is woken up by transactions that runs on different thread?
From what I know the suspended transactions are waiting on a key to be added and once this key is added they run, so it meant that the waking transaction added this key i.e run on the same thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blpop that waits on multiple keys can be waken up on one shard and still be "suspended" on another shard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not really suspended, since it will advance, but it's not gonna be awaken which is reserved only for the shard that really notified that tx.
src/server/transaction.cc
Outdated
@@ -609,7 +614,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { | |||
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation | |||
// and successive hops are run by continuation_trans_ in engine shard. | |||
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). | |||
if ((is_concluding || !txq_ooo) && sd.pq_pos != TxQueue::kEnd) { | |||
if ((is_concluding || !is_ooo) && sd.pq_pos != TxQueue::kEnd) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you changed behaviour here.
Now is_ooo is based on OUT_OF_ORDER
but what if this transaction is the head of tx queue? line 670 in engine_shard.cc
here you will not remove it from head , line 671 set continuation_trans_ to head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can add a check that if we set continuation_trans_ than sd.pq_pos is TxQueue::kEnd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I thought about this but decided it's ok, now I need to see why the dcheck fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, reverted the logic, changed the names of the variables, hopefully it's more clear now
src/server/transaction.cc
Outdated
@@ -618,7 +623,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { | |||
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. | |||
// If it's a final hop we should release the locks. | |||
if (is_concluding) { | |||
bool became_suspended = sd.local_mask & SUSPENDED_Q; | |||
bool became_suspended = sd.local_mask & WAS_SUSPENDED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it will be a little bit more clear to write
became_suspended = !was_suspended && sd.local_mask & WAS_SUSPENDED
and the if statement in line 640 will be if (!became_suspended)
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). | ||
if ((is_concluding || !txq_ooo) && sd.pq_pos != TxQueue::kEnd) { | ||
// Otherwise we can remove ourselves only when we're concluding (so no more hops follow). | ||
if (sd.pq_pos != TxQueue::kEnd && (is_concluding || allow_q_removal)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks right.
Minor comment is that I understand that we allow queue removal only if we are concluding or we are the tx is head , therefore I think it will be more clear to pass is_head instead of allow_q_removal, as this param has no meaning when you call run transaction which is not in txq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's actually not correct. I tried first sd.pq_pos == txq()->Head()
without passing anything. but for multi-hop transactions that are OOO we do not set continous_transaction_
, therefore we may break consistency guarantees if we remove them even if they are at head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont understand than.
If there is a multi hop transaction that is OOO and it was at head position we will call the RunInShard with allow_q_removal = true because it will run from the flow of engine_shard.cc:669 rigtht?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we do not enter there if there is a continuous transaction defined.
but we can also call from if (trans && disarmed) {
check (the last invocation)
No description provided.