From f6a243119d33cdd69c1701666897ce749ff4f29d Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 16 Feb 2024 17:15:10 +0530 Subject: [PATCH] Fix tests Signed-off-by: Shivansh Arora --- .../opensearch/cluster/metadata/Metadata.java | 6 +- .../cluster/metadata/TemplatesMetadata.java | 11 +- .../remote/ClusterMetadataManifest.java | 76 +++-- .../remote/RemoteClusterStateService.java | 190 +++++++----- .../RemoteClusterStateServiceTests.java | 287 ++++++++++++++---- 5 files changed, 406 insertions(+), 164 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 638267fd1bf32..7753778507c8c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -1052,7 +1052,11 @@ private static class MetadataDiff implements Diff { persistentSettings = after.persistentSettings; hashesOfConsistentSettings = after.hashesOfConsistentSettings.diff(before.hashesOfConsistentSettings); indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer()); - templates = DiffableUtils.diff(before.templates.getTemplates(), after.templates.getTemplates(), DiffableUtils.getStringKeySerializer()); + templates = DiffableUtils.diff( + before.templates.getTemplates(), + after.templates.getTemplates(), + DiffableUtils.getStringKeySerializer() + ); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java index ff4c7f62fd9c5..179d5aa6fbc52 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java @@ -9,9 +9,6 @@ package org.opensearch.cluster.metadata; import org.opensearch.cluster.AbstractDiffable; -import org.opensearch.cluster.coordination.CoordinationMetadata; -import org.opensearch.common.annotation.PublicApi; -import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -66,7 +63,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - TemplatesMetadata that = (TemplatesMetadata) o; + TemplatesMetadata that = (TemplatesMetadata) o; return Objects.equals(templates, that.templates); } @@ -111,11 +108,11 @@ public TemplatesMetadata build() { } public static void toXContent(TemplatesMetadata templates, XContentBuilder builder, Params params) throws IOException { -// builder.startObject("templates-metadata"); - for(IndexTemplateMetadata cursor : templates.getTemplates().values()) { + // builder.startObject("templates-metadata"); + for (IndexTemplateMetadata cursor : templates.getTemplates().values()) { IndexTemplateMetadata.Builder.toXContentWithTypes(cursor, builder, params); } -// builder.endObject(); + // builder.endObject(); } public static TemplatesMetadata fromXContent(XContentParser parser) throws IOException { diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index a841ca3451795..a3a84924e4563 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -211,10 +211,26 @@ private static void declareParser(ConstructingObjectParser getCustomMetadataMap() { } public boolean hasMetadataAttributesFiles() { - return uploadedCoordinationMetadata != null || uploadedSettingsMetadata != null || uploadedTemplatesMetadata != null || !uploadedCustomMetadataMap.isEmpty(); + return uploadedCoordinationMetadata != null + || uploadedSettingsMetadata != null + || uploadedTemplatesMetadata != null + || !uploadedCustomMetadataMap.isEmpty(); } public ClusterMetadataManifest( @@ -370,7 +389,9 @@ public ClusterMetadataManifest( this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; this.uploadedSettingsMetadata = uploadedSettingsMetadata; this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; - this.uploadedCustomMetadataMap = Collections.unmodifiableMap(uploadedCustomMetadataMap != null ? uploadedCustomMetadataMap : new HashMap<>()); + this.uploadedCustomMetadataMap = Collections.unmodifiableMap( + uploadedCustomMetadataMap != null ? uploadedCustomMetadataMap : new HashMap<>() + ); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -389,7 +410,9 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.uploadedCoordinationMetadata = new UploadedMetadataAttribute(in); this.uploadedSettingsMetadata = new UploadedMetadataAttribute(in); this.uploadedTemplatesMetadata = new UploadedMetadataAttribute(in); - this.uploadedCustomMetadataMap = Collections.unmodifiableMap(in.readMap(StreamInput::readString, UploadedMetadataAttribute::new)); + this.uploadedCustomMetadataMap = Collections.unmodifiableMap( + in.readMap(StreamInput::readString, UploadedMetadataAttribute::new) + ); this.globalMetadataFileName = null; } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.codecVersion = in.readInt(); @@ -436,17 +459,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); if (onOrAfterCodecVersion(CODEC_V2)) { builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); - builder.startObject(UPLOADED_COORDINATOR_METADATA.getPreferredName()); - getCoordinationMetadata().toXContent(builder, params); - builder.endObject(); - builder.startObject(UPLOADED_SETTINGS_METADATA.getPreferredName()); - getSettingsMetadata().toXContent(builder, params); - builder.endObject(); - builder.startObject(UPLOADED_TEMPLATES_METADATA.getPreferredName()); - getTemplatesMetadata().toXContent(builder, params); - builder.endObject(); + if (getCoordinationMetadata() != null) { + builder.startObject(UPLOADED_COORDINATOR_METADATA.getPreferredName()); + getCoordinationMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getSettingsMetadata() != null) { + builder.startObject(UPLOADED_SETTINGS_METADATA.getPreferredName()); + getSettingsMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getTemplatesMetadata() != null) { + builder.startObject(UPLOADED_TEMPLATES_METADATA.getPreferredName()); + getTemplatesMetadata().toXContent(builder, params); + builder.endObject(); + } builder.startObject(UPLOADED_CUSTOM_METADATA.getPreferredName()); - for (UploadedMetadataAttribute attribute: getCustomMetadataMap().values()) { + for (UploadedMetadataAttribute attribute : getCustomMetadataMap().values()) { attribute.toXContent(builder, params); } builder.endObject(); @@ -705,6 +734,7 @@ public ClusterMetadataManifest build() { public static interface UploadedMetadata { String getComponent(); + String getUploadedFilename(); } @@ -888,10 +918,14 @@ public static UploadedMetadataAttribute fromXContent(XContentParser parser) thro @Override public String toString() { - return "UploadedMetadataAttribute{" + - "attributeName='" + attributeName + '\'' + - ", uploadedFilename='" + uploadedFilename + '\'' + - '}'; + return "UploadedMetadataAttribute{" + + "attributeName='" + + attributeName + + '\'' + + ", uploadedFilename='" + + uploadedFilename + + '\'' + + '}'; } } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 56cc72a07cbda..746446ce7d54b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -32,7 +32,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; -import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -43,7 +42,6 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.threadpool.ThreadPool; -import reactor.util.annotation.NonNull; import java.io.Closeable; import java.io.IOException; @@ -68,6 +66,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import reactor.util.annotation.NonNull; + import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; @@ -333,13 +333,16 @@ public ClusterMetadataManifest writeIncrementalMetadata( ) == false; final boolean updateCoordinationMetadata = Metadata.isCoordinationMetadataEqual( - previousClusterState.metadata(), clusterState.metadata() + previousClusterState.metadata(), + clusterState.metadata() ) == false; final boolean updateSettingsMetadata = Metadata.isSettingsMetadataEqual( - previousClusterState.metadata(), clusterState.metadata() + previousClusterState.metadata(), + clusterState.metadata() ) == false; final boolean updateTemplatesMetadata = Metadata.isTemplatesMetadataEqual( - previousClusterState.metadata(), clusterState.metadata() + previousClusterState.metadata(), + clusterState.metadata() ) == false; final Map customsToUpload = getUpdatedCustoms(clusterState, previousClusterState); @@ -378,14 +381,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, // If file is empty and codec is 1 then write global metadata. if (firstUpload) { - uploadedMetadataResults = writeMetadataInParallel( - clusterState, - toUpload, - clusterState.metadata().customs(), - true, - true, - true - ); + uploadedMetadataResults = writeMetadataInParallel(clusterState, toUpload, clusterState.metadata().customs(), true, true, true); } else { uploadedMetadataResults = writeMetadataInParallel( clusterState, @@ -408,10 +404,18 @@ public ClusterMetadataManifest writeIncrementalMetadata( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), - firstUpload || updateCoordinationMetadata? uploadedMetadataResults.uploadedCoordinationMetadata : previousManifest.getCoordinationMetadata(), - firstUpload || updateSettingsMetadata ? uploadedMetadataResults.uploadedSettingsMetadata : previousManifest.getSettingsMetadata(), - firstUpload || updateTemplatesMetadata ? uploadedMetadataResults.uploadedTemplatesMetadata : previousManifest.getTemplatesMetadata(), - firstUpload || !customsToUpload.isEmpty() ? uploadedMetadataResults.uploadedCustomMetadataMap : previousManifest.getCustomMetadataMap(), + firstUpload || updateCoordinationMetadata + ? uploadedMetadataResults.uploadedCoordinationMetadata + : previousManifest.getCoordinationMetadata(), + firstUpload || updateSettingsMetadata + ? uploadedMetadataResults.uploadedSettingsMetadata + : previousManifest.getSettingsMetadata(), + firstUpload || updateTemplatesMetadata + ? uploadedMetadataResults.uploadedTemplatesMetadata + : previousManifest.getTemplatesMetadata(), + firstUpload || !customsToUpload.isEmpty() + ? uploadedMetadataResults.uploadedCustomMetadataMap + : previousManifest.getCustomMetadataMap(), false ); deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); @@ -424,28 +428,28 @@ public ClusterMetadataManifest writeIncrementalMetadata( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, coordination metadata updated : [{}], " + "settings metadata updated : [{}], templates metadata updated : [{}], custom metadata updated : [{}]", - durationMillis, - slowWriteLoggingThreshold, - numIndicesUpdated, - numIndicesUnchanged, - updateCoordinationMetadata, - updateSettingsMetadata, - updateTemplatesMetadata, - customsToUpload.size() + durationMillis, + slowWriteLoggingThreshold, + numIndicesUpdated, + numIndicesUnchanged, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata, + customsToUpload.size() ); } else { logger.info( "writing cluster state for version [{}] took [{}ms]; " + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, coordination metadata updated : [{}], " + "settings metadata updated : [{}], templates metadata updated : [{}], custom metadata updated : [{}]", - manifest.getStateVersion(), - durationMillis, - numIndicesUpdated, - numIndicesUnchanged, - updateCoordinationMetadata, - updateSettingsMetadata, - updateTemplatesMetadata, - customsToUpload.size() + manifest.getStateVersion(), + durationMillis, + numIndicesUpdated, + numIndicesUnchanged, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata, + customsToUpload.size() ); } return manifest; @@ -459,17 +463,16 @@ private UploadedMetadataResults writeMetadataInParallel( boolean uploadSettingsMetadata, boolean uploadTemplateMetadata ) throws IOException { - int totalUploadTasks = indexToUpload.size() + customToUpload.size() + - (uploadCoordinationMetadata ? 1 : 0) + - (uploadSettingsMetadata ? 1 : 0) + - (uploadTemplateMetadata ? 1 : 0); + int totalUploadTasks = indexToUpload.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata + ? 1 + : 0) + (uploadTemplateMetadata ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalUploadTasks); List> uploadTasks = new ArrayList<>(totalUploadTasks); Map results = new HashMap<>(totalUploadTasks); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); - LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap( - (ClusterMetadataManifest.UploadedMetadata uploadedMetadata) -> { + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap((ClusterMetadataManifest.UploadedMetadata uploadedMetadata) -> { logger.trace(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); results.put(uploadedMetadata.getComponent(), uploadedMetadata); }, ex -> { @@ -479,32 +482,57 @@ private UploadedMetadataResults writeMetadataInParallel( ); exceptionList.add(ex); }), - latch); + latch + ); if (uploadSettingsMetadata) { - uploadTasks.add(getAsyncMetadataWriteAction(clusterState, SETTING_METADATA, SETTINGS_METADATA_FORMAT, clusterState.metadata().persistentSettings(), listener)); + uploadTasks.add( + getAsyncMetadataWriteAction( + clusterState, + SETTING_METADATA, + SETTINGS_METADATA_FORMAT, + clusterState.metadata().persistentSettings(), + listener + ) + ); } if (uploadCoordinationMetadata) { - uploadTasks.add(getAsyncMetadataWriteAction(clusterState, COORDINATION_METADATA, COORDINATION_METADATA_FORMAT, clusterState.metadata().coordinationMetadata(), listener)); + uploadTasks.add( + getAsyncMetadataWriteAction( + clusterState, + COORDINATION_METADATA, + COORDINATION_METADATA_FORMAT, + clusterState.metadata().coordinationMetadata(), + listener + ) + ); } if (uploadTemplateMetadata) { - uploadTasks.add(getAsyncMetadataWriteAction(clusterState, TEMPLATES_METADATA, TEMPLATES_METADATA_FORMAT, clusterState.metadata().templatesMetadata(), listener)); + uploadTasks.add( + getAsyncMetadataWriteAction( + clusterState, + TEMPLATES_METADATA, + TEMPLATES_METADATA_FORMAT, + clusterState.metadata().templatesMetadata(), + listener + ) + ); } - customToUpload.forEach((key, value) -> uploadTasks.add( - getAsyncMetadataWriteAction( + customToUpload.forEach( + (key, value) -> uploadTasks.add( + getAsyncMetadataWriteAction( clusterState, String.join(DELIMITER, CUSTOM_METADATA, key), CUSTOM_METADATA_FORMAT, value, listener + ) ) - )); - indexToUpload.forEach(indexMetadata -> { - uploadTasks.add(getIndexMetadataAsyncAction(clusterState, indexMetadata, listener)); - }); + ); + indexToUpload.forEach(indexMetadata -> { uploadTasks.add(getIndexMetadataAsyncAction(clusterState, indexMetadata, listener)); }); // start async upload of all required metadata files - for(CheckedRunnable uploadTask : uploadTasks) { + for (CheckedRunnable uploadTask : uploadTasks) { uploadTask.run(); } @@ -529,10 +557,14 @@ private UploadedMetadataResults writeMetadataInParallel( String.format( Locale.ROOT, "Exception during transfer of following metadata to Remote - %s, %s, %s", - indexToUpload.stream().map(IndexMetadata::getIndexName).collect(Collectors.joining(",")), + indexToUpload.stream().map(IndexMetadata::getIndexName).collect(Collectors.joining(",")), customToUpload.keySet().stream().collect(Collectors.joining(", ")), - String.join(", ", (uploadSettingsMetadata ? "settings" : ""), (uploadCoordinationMetadata ? - "coordination" : ""), (uploadTemplateMetadata ? "templates" : "")) + String.join( + ", ", + (uploadSettingsMetadata ? "settings" : ""), + (uploadCoordinationMetadata ? "coordination" : ""), + (uploadTemplateMetadata ? "templates" : "") + ) ) ); exceptionList.forEach(exception::addSuppressed); @@ -616,12 +648,7 @@ private CheckedRunnable getAsyncMetadataWriteAction( ); final String componentMetadataFilename = metadataAttributeFileName(component, clusterState.metadata().version()); ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse( - new UploadedMetadataAttribute( - component, - componentMetadataFilename - ) - ), + resp -> latchedActionListener.onResponse(new UploadedMetadataAttribute(component, componentMetadataFilename)), ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, ex)) ); return () -> componentMetadataBlobStore.writeAsyncWithUrgentPriority( @@ -853,7 +880,7 @@ private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploa } private Map getUpdatedCustoms(ClusterState currentState, ClusterState previousState) { - if(Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { + if (Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { return new HashMap<>(); } Map updatedCustom = new HashMap<>(); @@ -897,7 +924,7 @@ static String getManifestFileName(long term, long version, boolean committed, in (committed ? "C" : "P"), // C for committed and P for published RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(codecVersion) // Keep the codec version at last place only, during read we reads last place to - // determine codec version. + // determine codec version. ); } @@ -1031,22 +1058,29 @@ private Metadata getGlobalMetadata(String clusterName, String clusterUUID, Clust blobStoreRepository.getNamedXContentRegistry() ); } else if (clusterMetadataManifest.hasMetadataAttributesFiles()) { - CoordinationMetadata coordinationMetadata = getCoordinationMetadata(clusterName, clusterUUID, - clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); - Settings settingsMetadata = getSettingsMetadata(clusterName, clusterUUID, - clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); - TemplatesMetadata templatesMetadata = getTemplatesMetadata(clusterName, clusterUUID, - clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + CoordinationMetadata coordinationMetadata = getCoordinationMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename() + ); + Settings settingsMetadata = getSettingsMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getSettingsMetadata().getUploadedFilename() + ); + TemplatesMetadata templatesMetadata = getTemplatesMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename() + ); Metadata.Builder builder = new Metadata.Builder(); builder.coordinationMetadata(coordinationMetadata); builder.persistentSettings(settingsMetadata); builder.templates(templatesMetadata); - clusterMetadataManifest.getCustomMetadataMap().forEach( - (key, value) -> { - String custom = key.split(DELIMITER)[1]; - builder.putCustom(custom, getCustomsMetadata(clusterName, clusterUUID, value.getUploadedFilename(), custom)); - } - ); + clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> { + String custom = key.split(DELIMITER)[1]; + builder.putCustom(custom, getCustomsMetadata(clusterName, clusterUUID, value.getUploadedFilename(), custom)); + }); return builder.build(); } else { return Metadata.EMPTY_METADATA; @@ -1122,11 +1156,16 @@ private TemplatesMetadata getTemplatesMetadata(String clusterName, String cluste } } - private Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, @NonNull String customMetadataFileName, String custom) { + private Metadata.Custom getCustomsMetadata( + String clusterName, + String clusterUUID, + @NonNull String customMetadataFileName, + String custom + ) { try { // Fetch Custom metadata String[] splitPath = customMetadataFileName.split("/"); - ChecksumBlobStoreFormat customChecksumBlobStoreFormat = new ChecksumBlobStoreFormat<>( + ChecksumBlobStoreFormat customChecksumBlobStoreFormat = new ChecksumBlobStoreFormat<>( "custom", METADATA_NAME_FORMAT, (parser -> Metadata.Custom.fromXContent(parser, custom)) @@ -1630,6 +1669,7 @@ public UploadedMetadataResults( this.uploadedSettingsMetadata = uploadedSettingsMetadata; this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; } + public UploadedMetadataResults() { this.uploadedIndexMetadata = new ArrayList<>(); this.uploadedCustomMetadataMap = new HashMap<>(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 65477051cdb30..94a2da4b88d7d 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -38,6 +38,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.FilterRepository; @@ -71,12 +72,16 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_CURRENT_CODEC_VERSION; @@ -84,11 +89,14 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; +import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; @@ -235,6 +243,15 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { doAnswer((i) -> { actionListenerArgumentCaptor.getValue().onResponse(null); return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; }).doAnswer((i) -> { actionListenerArgumentCaptor.getValue().onResponse(null); capturedWriteContext.set(writeContextArgumentCaptor.getValue()); @@ -263,15 +280,19 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); - assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifest.getGlobalMetadataFileName(), nullValue()); + assertThat(manifest.getCoordinationMetadata(), notNullValue()); + assertThat(manifest.getSettingsMetadata(), notNullValue()); + assertThat(manifest.getTemplatesMetadata(), notNullValue()); + assertThat(manifest.getCustomMetadataMap().size(), not(0)); assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3); - assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3); + assertEquals(6, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(6, writeContextArgumentCaptor.getAllValues().size()); byte[] writtenBytes = capturedWriteContext.get() .getStreamProvider(Integer.MAX_VALUE) @@ -351,7 +372,7 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); } catch (Exception e) { assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException); - assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete")); + assertTrue(e.getMessage().contains("Timed out waiting for transfer of metadata to complete")); } } @@ -436,13 +457,28 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { } /* - * Here we will verify the migration of manifest file from codec V0 and V1. + * Here we will verify the migration of manifest file from codec V0. * * Initially codec version is 0 and global metadata is also null, we will perform index metadata update. - * In final manifest codec version should be 1 and - * global metadata should be updated, even if it was not changed in this cluster state update + * In final manifest codec version should be 2 and have metadata files updated, + * even if it was not changed in this cluster state update */ - public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException { + public void testMigrationFromCodecV0ManifestToCodecV2Manifest() throws IOException { + verifyCodecMigrationManifest(ClusterMetadataManifest.CODEC_V0); + } + + /* + * Here we will verify the migration of manifest file from codec V1. + * + * Initially codec version is 1 and a global metadata file is there, we will perform index metadata update. + * In final manifest codec version should be 2 and have metadata files updated, + * even if it was not changed in this cluster state update + */ + public void testMigrationFromCodecV1ManifestToCodecV2Manifest() throws IOException { + verifyCodecMigrationManifest(ClusterMetadataManifest.CODEC_V1); + } + + private void verifyCodecMigrationManifest(int previousCodec) throws IOException { mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) @@ -462,7 +498,7 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti // previous manifest with codec 0 and null global metadata final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() - .codecVersion(ClusterMetadataManifest.CODEC_V0) + .codecVersion(previousCodec) .globalMetadataFileName(null) .indices(Collections.emptyList()) .build(); @@ -475,9 +511,9 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti ); // global metadata is updated - assertThat(manifestAfterUpdate.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); // Manifest file with codec version with 1 is updated. - assertThat(manifestAfterUpdate.getCodecVersion(), is(ClusterMetadataManifest.CODEC_V1)); + assertThat(manifestAfterUpdate.getCodecVersion(), is(MANIFEST_CURRENT_CODEC_VERSION)); } public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { @@ -489,7 +525,7 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { .build(); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() - .codecVersion(2) + .codecVersion(1) .globalMetadataFileName("global-metadata-file") .indices(Collections.emptyList()) .build(); @@ -502,8 +538,8 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .codecVersion(2) .indices(Collections.emptyList()) - .globalMetadataFileName("mock-filename") .clusterTerm(1L) .stateVersion(1L) .stateUUID("state-uuid") @@ -511,11 +547,17 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { .previousClusterUUID("prev-cluster-uuid") .build(); - assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); - assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); - assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); - assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); - assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertNull(manifest.getGlobalMetadataFileName()); + assertNotNull(manifest.getCoordinationMetadata()); + assertNotNull(manifest.getSettingsMetadata()); + assertNotNull(manifest.getTemplatesMetadata()); + assertNotEquals(0, manifest.getCustomMetadataMap().size()); + + assertEquals(expectedManifest.getClusterTerm(), manifest.getClusterTerm()); + assertEquals(expectedManifest.getStateVersion(), manifest.getStateVersion()); + assertEquals(expectedManifest.getClusterUUID(), manifest.getClusterUUID()); + assertEquals(expectedManifest.getStateUUID(), manifest.getStateUUID()); + assertEquals(expectedManifest.getCodecVersion(), manifest.getCodecVersion()); } /* @@ -530,7 +572,10 @@ public void testGlobalMetadataOnlyUpdated() throws IOException { .build(); final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() .codecVersion(2) - .globalMetadataFileName("global-metadata-file") + .coordinationMetadata(new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination-metadata-file")) + .settingMetadata(new UploadedMetadataAttribute(SETTING_METADATA, "setting-metadata-file")) + .templatesMetadata(new UploadedMetadataAttribute(TEMPLATES_METADATA, "templates-metadata-file")) + .customMetadataMap(new HashMap<>()) .indices(Collections.emptyList()) .build(); remoteClusterStateService.start(); @@ -560,26 +605,34 @@ public void testGlobalMetadataOnlyUpdated() throws IOException { // Verify that index metadata information is same in manifest files assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(manifestAfterGlobalMetadataUpdate.getIndices().size())); - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexName(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexName()) - ); - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexUUID(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexUUID()) - ); - - // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata update - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getUploadedFilename(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getUploadedFilename()) - ); + IntStream.range(0, manifestAfterGlobalMetadataUpdate.getIndices().size()).forEach(i -> { + assertEquals( + manifestAfterIndexMetadataUpdate.getIndices().get(i).getIndexName(), + manifestAfterGlobalMetadataUpdate.getIndices().get(i).getIndexName() + ); + assertEquals( + manifestAfterIndexMetadataUpdate.getIndices().get(i).getIndexUUID(), + manifestAfterGlobalMetadataUpdate.getIndices().get(i).getIndexUUID() + ); + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata update + assertEquals( + manifestAfterIndexMetadataUpdate.getIndices().get(i).getUploadedFilename(), + manifestAfterGlobalMetadataUpdate.getIndices().get(i).getUploadedFilename() + ); + }); - // global metadata file would have changed + // setting metadata file would have changed assertFalse( - manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName() + manifestAfterIndexMetadataUpdate.getSettingsMetadata() + .getUploadedFilename() .equalsIgnoreCase(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) ); + assertEquals( + manifestAfterIndexMetadataUpdate.getCoordinationMetadata().getUploadedFilename(), + manifestAfterGlobalMetadataUpdate.getCoordinationMetadata().getUploadedFilename() + ); + assertEquals(manifestAfterIndexMetadataUpdate.getTemplatesMetadata(), manifestAfterGlobalMetadataUpdate.getTemplatesMetadata()); + assertEquals(manifestAfterIndexMetadataUpdate.getCustomMetadataMap(), manifestAfterGlobalMetadataUpdate.getCustomMetadataMap()); } /* @@ -795,7 +848,10 @@ public void testReadGlobalMetadata() throws IOException { .stateUUID("state-uuid") .clusterUUID("cluster-uuid") .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) - .globalMetadataFileName("global-metadata-file") + .coordinationMetadata(new UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file")) + .settingMetadata(new UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file")) + .templatesMetadata(new UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file")) + .put(IndexGraveyard.TYPE, new UploadedMetadataAttribute(CUSTOM_METADATA + DELIMITER + IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file")) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") @@ -830,7 +886,7 @@ public void testReadGlobalMetadataIOException() throws IOException { .stateVersion(1L) .stateUUID("state-uuid") .clusterUUID("cluster-uuid") - .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .codecVersion(ClusterMetadataManifest.CODEC_V1) .globalMetadataFileName(globalIndexMetadataName) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) @@ -1124,18 +1180,24 @@ public void testFileNames() { assertThat(splittedIndexMetadataFileName[1], is(RemoteStoreUtils.invertLong(indexMetadata.getVersion()))); assertThat(splittedIndexMetadataFileName[3], is(String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION))); + verifyManifestFileNameWithCodec(MANIFEST_CURRENT_CODEC_VERSION); + verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V1); + verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V0); + } + + private void verifyManifestFileNameWithCodec(int codecVersion) { int term = randomIntBetween(5, 10); int version = randomIntBetween(5, 10); - String manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, true); + String manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, true, codecVersion); assertThat(manifestFileName.split(DELIMITER).length, is(6)); String[] splittedName = manifestFileName.split(DELIMITER); assertThat(splittedName[0], is(MANIFEST_FILE_PREFIX)); assertThat(splittedName[1], is(RemoteStoreUtils.invertLong(term))); assertThat(splittedName[2], is(RemoteStoreUtils.invertLong(version))); assertThat(splittedName[3], is("C")); - assertThat(splittedName[5], is(String.valueOf(MANIFEST_CURRENT_CODEC_VERSION))); + assertThat(splittedName[5], is(String.valueOf(codecVersion))); - manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, false); + manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, false, codecVersion); splittedName = manifestFileName.split(DELIMITER); assertThat(splittedName[3], is("P")); } @@ -1251,12 +1313,16 @@ private void mockObjectsForGettingPreviousClusterUUID( new UploadedIndexMetadata("index1", "index-uuid1", "key1"), new UploadedIndexMetadata("index2", "index-uuid2", "key2") ); + Map customMetadataMap = new HashMap<>(); final ClusterMetadataManifest clusterManifest1 = generateClusterMetadataManifest( "cluster-uuid1", clusterUUIDsPointers.get("cluster-uuid1"), randomAlphaOfLength(10), uploadedIndexMetadataList1, - "test-metadata1", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid1", true) ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build(); @@ -1275,7 +1341,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap1 = Map.of("index-uuid1", indexMetadata1, "index-uuid2", indexMetadata2); mockBlobContainerForGlobalMetadata(blobContainer1, clusterManifest1, metadata1); - mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V2); List uploadedIndexMetadataList2 = List.of( new UploadedIndexMetadata("index1", "index-uuid1", "key1"), @@ -1286,7 +1352,10 @@ private void mockObjectsForGettingPreviousClusterUUID( clusterUUIDsPointers.get("cluster-uuid2"), randomAlphaOfLength(10), uploadedIndexMetadataList2, - "test-metadata2", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid2", true) ); IndexMetadata indexMetadata3 = IndexMetadata.builder("index1") @@ -1304,7 +1373,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap2 = Map.of("index-uuid1", indexMetadata3, "index-uuid2", indexMetadata4); mockBlobContainerForGlobalMetadata(blobContainer2, clusterManifest2, metadata2); - mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V2); // differGlobalMetadata controls which one of IndexMetadata or Metadata object would be different // when comparing cluster-uuid3 and cluster-uuid1 state. @@ -1326,17 +1395,19 @@ private void mockObjectsForGettingPreviousClusterUUID( Metadata metadata3 = Metadata.builder() .persistentSettings(Settings.builder().put(Metadata.SETTING_READ_ONLY_SETTING.getKey(), !differGlobalMetadata).build()) .build(); - final ClusterMetadataManifest clusterManifest3 = generateClusterMetadataManifest( "cluster-uuid3", clusterUUIDsPointers.get("cluster-uuid3"), randomAlphaOfLength(10), uploadedIndexMetadataList3, - "test-metadata3", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid3", true) ); mockBlobContainerForGlobalMetadata(blobContainer3, clusterManifest3, metadata3); - mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V2); ArrayList mockBlobContainerOrderedList = new ArrayList<>( List.of(blobContainer1, blobContainer1, blobContainer3, blobContainer3, blobContainer2, blobContainer2) @@ -1356,7 +1427,7 @@ private void mockObjectsForGettingPreviousClusterUUID( when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); } - private ClusterMetadataManifest generateClusterMetadataManifest( + private ClusterMetadataManifest generateV1ClusterMetadataManifest( String clusterUUID, String previousClusterUUID, String stateUUID, @@ -1380,6 +1451,36 @@ private ClusterMetadataManifest generateClusterMetadataManifest( .build(); } + private ClusterMetadataManifest generateClusterMetadataManifest( + String clusterUUID, + String previousClusterUUID, + String stateUUID, + List uploadedIndexMetadata, + Map customMetadataMap, + UploadedMetadataAttribute coordinationMetadata, + UploadedMetadataAttribute settingsMetadata, + UploadedMetadataAttribute templatesMetadata, + Boolean isUUIDCommitted + ) { + return ClusterMetadataManifest.builder() + .indices(uploadedIndexMetadata) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID(stateUUID) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(previousClusterUUID) + .committed(true) + .clusterUUIDCommitted(isUUIDCommitted) + .coordinationMetadata(coordinationMetadata) + .settingMetadata(settingsMetadata) + .templatesMetadata(templatesMetadata) + .customMetadataMap(customMetadataMap) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .build(); + } + private BlobContainer mockBlobStoreObjects() { return mockBlobStoreObjects(BlobContainer.class); } @@ -1419,7 +1520,7 @@ private void mockBlobContainer( int codecVersion ) throws IOException { String manifestFileName = codecVersion >= ClusterMetadataManifest.CODEC_V1 - ? "manifest__manifestFileName__abcd__abcd__abcd__1" + ? "manifest__manifestFileName__abcd__abcd__abcd__" + codecVersion : "manifestFileName"; BlobMetadata blobMetadata = new PlainBlobMetadata(manifestFileName, 1); when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) @@ -1460,7 +1561,8 @@ private void mockBlobContainerForGlobalMetadata( ClusterMetadataManifest clusterMetadataManifest, Metadata metadata ) throws IOException { - String mockManifestFileName = "manifest__1__2__C__456__1"; + int codecVersion = clusterMetadataManifest.getCodecVersion(); + String mockManifestFileName = "manifest__1__2__C__456__" + codecVersion; BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1); when( blobContainer.listBlobsByPrefixInSortedOrder( @@ -1477,19 +1579,84 @@ private void mockBlobContainerForGlobalMetadata( FORMAT_PARAMS ); when(blobContainer.readBlob(mockManifestFileName)).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); + if (codecVersion >= ClusterMetadataManifest.CODEC_V2) { + String coordinationFileName = getFileNameFromPath(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.COORDINATION_METADATA_FORMAT.blobName(coordinationFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.COORDINATION_METADATA_FORMAT.serialize( + metadata.coordinationMetadata(), + coordinationFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); + + String settingsFileName = getFileNameFromPath(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.SETTINGS_METADATA_FORMAT.blobName(settingsFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.SETTINGS_METADATA_FORMAT.serialize( + metadata.persistentSettings(), + settingsFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); + + String templatesFileName = getFileNameFromPath(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.blobName(templatesFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.serialize( + metadata.templatesMetadata(), + templatesFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); - String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); - when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))).thenAnswer( - (invocationOnMock) -> { - BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize( - metadata, - "global-metadata-file", - blobStoreRepository.getCompressor(), - FORMAT_PARAMS + Map customFileMap = clusterMetadataManifest.getCustomMetadataMap() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> getFileNameFromPath(entry.getValue().getUploadedFilename()))); + + for (Map.Entry entry : customFileMap.entrySet()) { + String custom = entry.getKey(); + String fileName = entry.getValue(); + when(blobContainer.readBlob(RemoteClusterStateService.CUSTOM_METADATA_FORMAT.blobName(fileName))).thenAnswer( + (invocation) -> { + BytesReference bytesReference = RemoteClusterStateService.CUSTOM_METADATA_FORMAT.serialize( + metadata.custom(custom), + fileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } ); - return new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes()); } - ); + } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))) + .thenAnswer((invocationOnMock) -> { + BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize( + metadata, + "global-metadata-file", + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes()); + }); + } + } + + private String getFileNameFromPath(String filePath) { + String[] splitPath = filePath.split("/"); + return splitPath[splitPath.length - 1]; } private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {