Skip to content

Commit

Permalink
Create new remote state interfaces and add implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed May 22, 2024
1 parent 6d33900 commit 99687b0
Show file tree
Hide file tree
Showing 26 changed files with 900 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1158,15 +1158,22 @@ private void createConfiguration() {
if (nodeName != null) {
baseConfig.put("node.name", nodeName);
}
baseConfig.put("path.repo", confPathRepo.toAbsolutePath().toString());
baseConfig.put("path.repo", "/Users/soosinha/os_data/repos");
baseConfig.put("path.data", confPathData.toAbsolutePath().toString());
baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString());
baseConfig.put("cluster.remote_store.state.enabled", "true");

baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString());
baseConfig.put("node.attr.testattr", "test");
if (StringUtils.isNotBlank(zone)) {
baseConfig.put("cluster.routing.allocation.awareness.attributes", "zone");
baseConfig.put("node.attr.zone", zone);
}
baseConfig.put("node.attr.remote_store.state.repository", "my-fs-repository");
baseConfig.put("node.attr.remote_store.segment.repository", "my-fs-repository");
baseConfig.put("node.attr.remote_store.translog.repository", "my-fs-repository");
baseConfig.put("node.attr.remote_store.repository.my-fs-repository.type", "fs");
baseConfig.put("node.attr.remote_store.repository.my-fs-repository.settings.location", "/Users/soosinha/os_data/repos/gradlerepo");
baseConfig.put("node.portsfile", "true");
baseConfig.put("http.port", httpPort);
baseConfig.put("transport.port", transportPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID));
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add(INDEX_ROUTING_PATH_TOKEN).add(index.getUUID()));
String[] fileNameTokens = uploadedFilename.split("/");
String blobFileName = fileNameTokens[fileNameTokens.length -1];
return () -> readAsync(
blobContainer,
uploadedFilename,
blobFileName,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure)
);
Expand All @@ -246,14 +248,15 @@ public void readAsync(BlobContainer blobContainer, String name, ExecutorService
try {
listener.onResponse(read(blobContainer, name));
} catch (Exception e) {
logger.error("routing table download failed : ", e);
listener.onFailure(e);
}
});
}

public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) {
try {
new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path));
return new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path));
} catch (IOException e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@

package org.opensearch.gateway.remote;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject <T> {

public abstract BlobContainer getBlobContainer();
public abstract BlobPathParameters getBlobPathParameters();
public abstract String getBlobNameFormat();
public abstract String getBlobName();
public abstract String getFullBlobName();

public String getBlobFileName() {
if (getFullBlobName() == null) {
generateBlobFileName();
}
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];
}
public abstract String generateBlobFileName();
public abstract RemoteObjectStore<T> getBackingStore();
public abstract ChecksumBlobStoreFormat getChecksumBlobStoreFormat();
public abstract UploadedMetadata getUploadedMetadata();

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

public class BlobPathParameters {

public static final String FILE_NAME_DELIMITER = "__";

private List<String> pathTokens;
private String filePrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,8 @@ public String getComponent() {
}

public String getUploadedFilename() {
String[] splitPath = uploadedFilename.split("/");
return splitPath[splitPath.length - 1];
// todo see how this change affects existing users
return uploadedFilename;
}

public String getIndexName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParseException;
Expand Down Expand Up @@ -253,6 +255,11 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
return builder.build();
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

public List<String> findRemovedIndices(Map<String, IndexMetadata> indices, Map<String, IndexMetadata> previousIndices) {
List<String> removedIndices = new ArrayList<>();
for (String index : previousIndices.keySet()) {
Expand Down
Loading

0 comments on commit 99687b0

Please sign in to comment.