Skip to content

Commit

Permalink
Add transport call to publish remote state
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed May 16, 2024
1 parent 54f1f3d commit f222634
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public CoordinationState(
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
}

public boolean isRemoteStateEnabled() {
return isRemoteStateEnabled;
}

public long getCurrentTerm() {
return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getCurrentTerm();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
+ clusterState;

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent
clusterChangedEvent, coordinationState.get().isRemoteStateEnabled()
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class PublicationTransportHandler {
private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);

public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state";
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";

private final TransportService transportService;
Expand Down Expand Up @@ -123,6 +124,15 @@ public PublicationTransportHandler(
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
);

transportService.registerRequestHandler(
PUBLISH_REMOTE_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
RemotePublishRequest::new,
(request, channel, task) -> channel.sendResponse(handleIncomingRemotePublishRequest(request))
);

transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -264,8 +274,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent);
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand Down Expand Up @@ -310,12 +320,14 @@ public class PublicationContext {
private final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;

PublicationContext(ClusterChangedEvent clusterChangedEvent) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemoteStateEnabled;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -379,6 +391,9 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
if (sendRemoteState && destination.isRemoteStoreNode()) {
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
}
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
Expand Down Expand Up @@ -424,6 +439,43 @@ public String executor() {
);
}

private void sendRemoteClusterState(DiscoveryNode destination, ClusterState clusterState, ActionListener<PublishWithJoinResponse> listener) {
try {
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID());
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<
PublishWithJoinResponse>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e);
listener.onFailure(e);
}
}

private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
BytesReference bytes = serializedStates.get(destination.getVersion());
if (bytes == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {

OpenSearchException e = expectThrows(
OpenSearchException.class,
() -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState))
() -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState), false)
);
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(IOException.class));
Expand Down

0 comments on commit f222634

Please sign in to comment.