Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialize publish requests on generic thread-pool #108814

Conversation

nicktindall
Copy link
Contributor

@nicktindall nicktindall commented May 20, 2024

This PR moves the publish_state handler from the CLUSTER_COORDINATION thread pool to the GENERIC one. This means the initial handling of the publish request, including the deserialisation of the cluster state, happens on one of the GENERIC threads instead of the CLUSTER_COORDINATION thread. Once we have deserialised the cluster state and done some validation, we delegate to the CLUSTER_COORDINATION pool to apply the new state.

The consequences of this include

  • The generic pool contains multiple threads, but the cluster coordination pool contains a single thread. So (in theory) multiple instances of the handler could now be executing concurrently, where previously these messages would be handled serially.
  • The delegation to the cluster coordination thread for the apply means we add some more asynchrony.

Closes #106352

@nicktindall nicktindall added >bug :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. labels May 20, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @nicktindall, I've created a changelog YAML for you.

acceptState(
incomingState,
transportChannel,
(acceptedState) -> lastSeenClusterState.compareAndSet(lastSeen, acceptedState)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onSuccess will end up being called by the CLUSTER_COORDINATION thread only after a successful apply, so I wonder if we need to use compare-and-set here, I don't think it's possible for lastSeenCluster to have changed between this being dispatched AND the new state being successfully applied. I would think either

  • the state has not changed and onSuccess is called correctly
  • the state has changed and the apply fails due to the version check, so onSuccess is not called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I raise this is because we could do away with the onSuccess callback if we were able to safely call lastSeenClusterState.set for both the full and diff payloads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can change in between, dispatch and execution, yes - there could be another update in flight when we read the value when then completes and updates it before we get to run.

I am however not sure whether this matters. I'll think about this a little more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one still concerns me a bit... the task of applying an update is done in three steps

  1. deserialise payload (on GENERIC)
  2. apply payload if valid (on CLUSTER_COORDINATION)
  3. update lastSeen, send response (on CLUSTER_COORDINATION)

only (2) happens in the mutex. So you could have the following interleaving (for cluster states a - version 7 and b - version 8)

  1. a-1
  2. b-1
  3. a-2 (succeeds, bump version to 7)
  4. b-2 (succeeds, bump version to 8)
  5. b-3 (set lastSeen to 8)
  6. a-3 (set lastSeen to 7) - this means the next diff would be applied to 7, and a 7/9 hybrid would be applied locally?

The above could only happen if there were multiple threads in the CLUSTER_COORDINATION pool, but you said that is configurable.

Unless I've missed something.

Possible solutions

  • Use compare-and-set also for the non-diff case
    • this feels wrong, I suspect there are times it is correct to updatelastSeen even though it changed since we took a reference to it
  • Put an additional check to only bump lastSeen when term and version are >= existing term and version?
    • I don't like this because it'd be duplicating business logic
  • Move (3) into the mutex?
    • This feels least bad to me

Do we run tests with CLUSTER_COORDINATION size > 1 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence of 6 steps is indeed something that could have happened prior to #83576 and can again happen after this change - we could end up having applied state version 8 but with lastSeen at state version 7. But that's ok, each diff includes a UUID which identifies the base version which we check here:

if (fromUuid.equals(state.stateUUID) == false) {
throw new IncompatibleClusterStateVersionException(state.version, state.stateUUID, toVersion, fromUuid);
}

That IncompatibleClusterStateVersionException is caught and handled here:

if (transportException.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug(
() -> format(
"resending full cluster state to node %s reason %s",
destination,
transportException.getDetailedMessage()
)
);
sendFullClusterState(destination, delegate);
return;
}

IOW if we receive a diff between versions 8 & 9 but lastSeen is at version 7 then we reject the diff and the master sends us the full state at version 9 for us to apply. Somewhat inefficient for sure but still correct.

This case will be exercised in org.elasticsearch.cluster.coordination.CoordinatorTests in a cluster.runRandomly() call, but with rather low probability I think. We could try adding a test there which specifically checks this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! not a problem we need to solve here then :)

#CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS
@nicktindall nicktindall marked this pull request as ready for review May 20, 2024 06:13
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@elasticsearchmachine elasticsearchmachine added the Team:Distributed Meta label for distributed team label May 20, 2024
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments, mostly stylistic tho.

acceptState(
incomingState,
transportChannel,
(acceptedState) -> lastSeenClusterState.compareAndSet(lastSeen, acceptedState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can change in between, dispatch and execution, yes - there could be another update in flight when we read the value when then completes and updates it before we get to run.

I am however not sure whether this matters. I'll think about this a little more.

We can make incomingState final if we factor out the deserialisation
and application of the diff
@nicktindall nicktindall removed the request for review from ywangd May 21, 2024 01:03
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit otherwise LGTM

Comment on lines 222 to 227
// 6. nodes deserialize committed cluster state
// 7. nodes apply committed cluster state
// 8. master receives ApplyCommitResponses
// 9. apply committed state on master (last one to apply cluster state)
// 10. complete the publication listener back on the master service thread
public static final int CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 except that this extra step is happening between steps 3 & 4 in the old list. org.elasticsearch.cluster.coordination.ApplyCommitRequest only carries the term and version, it's org.elasticsearch.cluster.coordination.PublishRequest which carries the state that's being published.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (one more tiny suggestion but no need for another review)

nicktindall and others added 2 commits May 21, 2024 17:54
…blicationTransportHandler.java

Co-authored-by: David Turner <david.turner@elastic.co>
…blicationTransportHandler.java

Co-authored-by: David Turner <david.turner@elastic.co>
@nicktindall nicktindall merged commit 1778d40 into elastic:main May 21, 2024
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. Team:Distributed Meta label for distributed team v8.15.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Don't use CLUSTER_COORDINATION to deserialize incoming cluster states
3 participants