Skip to content

Commit

Permalink
Refactor registerRepository method (#108788)
Browse files Browse the repository at this point in the history
This PR is a syntactic change for `registerRepository` in
`RepositoriesService`. I use `SubscribableListener` to display order of
events and reduce boilerplate code around failures delegation
`listener.delegateFailureAndWrap`.

It's a part of larger change for verification logic, which should take
advantage of this "sequential" version of code. #108531
  • Loading branch information
mhl-b committed May 17, 2024
1 parent 8ff8eff commit 38283c8
Showing 1 changed file with 87 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -144,111 +145,111 @@ public RepositoriesService(
/**
* Registers new repository in the cluster
* <p>
* This method can be only called on the master node. It tries to create a new repository on the master
* and if it was successful it adds new repository to cluster metadata.
* This method can be only called on the master node.
* It tries to create a new repository on the master, and if it was successful, it adds a new repository to cluster metadata.
*
* @param request register repository request
* @param listener register repository listener
* @param responseListener register repository listener
*/
public void registerRepository(final PutRepositoryRequest request, final ActionListener<AcknowledgedResponse> listener) {
public void registerRepository(final PutRepositoryRequest request, final ActionListener<AcknowledgedResponse> responseListener) {
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";
validateRepositoryName(request.name());

// Trying to create the new repository on master to make sure it works
try {
validateRepositoryCanBeCreated(request);
} catch (Exception e) {
listener.onFailure(e);
return;
}
// Aggregated result of two asynchronous operations when the cluster acknowledged and state changed
record RegisterRepositoryTaskResult(AcknowledgedResponse ackResponse, boolean changed) {}

final ListenableFuture<AcknowledgedResponse> acknowledgementStep = new ListenableFuture<>();
final ListenableFuture<Boolean> publicationStep = new ListenableFuture<>(); // Boolean==changed.
SubscribableListener

if (request.verify()) {
// Trying to create the new repository on master to make sure it works
.<Void>newForked(validationStep -> ActionListener.completeWith(validationStep, () -> {
validateRepositoryCanBeCreated(request);
return null;
}))

// When publication has completed (and all acks received or timed out) then verify the repository.
// (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
// to wait for the publication to be complete too)
final ListenableFuture<List<DiscoveryNode>> verifyStep = new ListenableFuture<>();
publicationStep.addListener(
listener.delegateFailureAndWrap(
(delegate, changed) -> acknowledgementStep.addListener(
delegate.delegateFailureAndWrap((l, clusterStateUpdateResponse) -> {
if (clusterStateUpdateResponse.isAcknowledged() && changed) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(request.name(), verifyStep);
} else {
verifyStep.onResponse(null);
.<RegisterRepositoryTaskResult>andThen((clusterUpdateStep, ignored) -> {
final ListenableFuture<AcknowledgedResponse> acknowledgementStep = new ListenableFuture<>();
final ListenableFuture<Boolean> publicationStep = new ListenableFuture<>(); // Boolean==changed.
submitUnbatchedTask(
"put_repository [" + request.name() + "]",
new RegisterRepositoryTask(this, request, acknowledgementStep) {
@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to create repository [" + request.name() + "]", e);
publicationStep.onFailure(e);
super.onFailure(e);
}

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
// repository is created on both master and data nodes
return discoveryNode.isMasterNode() || discoveryNode.canContainData();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
if (changed) {
if (found) {
logger.info("updated repository [{}]", request.name());
} else {
logger.info("put repository [{}]", request.name());
}
}
})
publicationStep.onResponse(oldState != newState);
}
}
);
publicationStep.addListener(
clusterUpdateStep.delegateFailureAndWrap(
(stateChangeListener, changed) -> acknowledgementStep.addListener(
stateChangeListener.map(acknowledgedResponse -> new RegisterRepositoryTaskResult(acknowledgedResponse, changed))
)
)
)
);
);
})
.<AcknowledgedResponse>andThen((verificationStep, taskResult) -> {
if (request.verify() == false) {
verificationStep.onResponse(taskResult.ackResponse);
} else {
SubscribableListener

// When verification has completed, get the repository data for the first time
final ListenableFuture<RepositoryData> getRepositoryDataStep = new ListenableFuture<>();
verifyStep.addListener(
listener.delegateFailureAndWrap(
(l, ignored) -> threadPool.generic()
.execute(
ActionRunnable.wrap(
getRepositoryDataStep,
ll -> repository(request.name()).getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading, do we need to fork, see #101445?
ll
.<List<DiscoveryNode>>newForked(verifyRepositoryStep -> {
if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) {
verifyRepository(request.name(), verifyRepositoryStep);
} else {
verifyRepositoryStep.onResponse(null);
}
})
// When verification has completed, get the repository data for the first time
.<RepositoryData>andThen(
(getRepositoryDataStep, ignored) -> threadPool.generic()
.execute(
ActionRunnable.wrap(
getRepositoryDataStep,
ll -> repository(request.name()).getRepositoryData(
// TODO contemplate threading, do we need to fork, see #101445?
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ll
)
)
)
)
// When the repository metadata is ready, update the repository UUID stored in the cluster state, if available
.<Void>andThen(
(updateRepoUuidStep, repositoryData) -> updateRepositoryUuidInMetadata(
clusterService,
request.name(),
repositoryData,
updateRepoUuidStep
)
)
)
);

// When the repository metadata is ready, update the repository UUID stored in the cluster state, if available
final ListenableFuture<Void> updateRepoUuidStep = new ListenableFuture<>();
getRepositoryDataStep.addListener(
listener.delegateFailureAndWrap(
(l, repositoryData) -> updateRepositoryUuidInMetadata(
clusterService,
request.name(),
repositoryData,
updateRepoUuidStep
)
)
);

// Finally respond to the outer listener with the response from the original cluster state update
updateRepoUuidStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> acknowledgementStep.addListener(l)));

} else {
acknowledgementStep.addListener(listener);
}

submitUnbatchedTask("put_repository [" + request.name() + "]", new RegisterRepositoryTask(this, request, acknowledgementStep) {
@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to create repository [" + request.name() + "]", e);
publicationStep.onFailure(e);
super.onFailure(e);
}

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
// repository is created on both master and data nodes
return discoveryNode.isMasterNode() || discoveryNode.canContainData();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
if (changed) {
if (found) {
logger.info("updated repository [{}]", request.name());
} else {
logger.info("put repository [{}]", request.name());
}
.andThenApply(uuidUpdated -> taskResult.ackResponse)
.addListener(verificationStep);
}
publicationStep.onResponse(oldState != newState);
}
});
})
.addListener(responseListener);
}

/**
Expand Down

0 comments on commit 38283c8

Please sign in to comment.