Skip to content

Commit

Permalink
Add vector data upload implementation to RemoteIndexBuildStrategy
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Feb 21, 2025
1 parent f9d8b53 commit 3596778
Show file tree
Hide file tree
Showing 10 changed files with 899 additions and 56 deletions.
25 changes: 23 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Booleans;
Expand All @@ -28,6 +27,7 @@
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.os.OsProbe;
import org.opensearch.transport.client.Client;

import java.security.InvalidParameterException;
import java.util.Arrays;
Expand Down Expand Up @@ -96,6 +96,7 @@ public class KNNSettings {
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
public static final String KNN_REMOTE_VECTOR_BUILD_THRESHOLD = "knn.remote_index_build.size_threshold";

/**
* Default setting values
Expand Down Expand Up @@ -126,6 +127,8 @@ public class KNNSettings {
// 10% of the JVM heap
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;
// TODO: Tune this default value based on benchmarking
public static final ByteSizeValue KNN_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE = new ByteSizeValue(50, ByteSizeUnit.MB);

/**
* Settings Definition
Expand Down Expand Up @@ -388,6 +391,15 @@ public class KNNSettings {
*/
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* Cluster level setting which indicates the size threshold above which remote vector builds will be enabled.
*/
public static final Setting<ByteSizeValue> KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING = Setting.byteSizeSetting(
KNN_REMOTE_VECTOR_BUILD_THRESHOLD,
KNN_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE,
Dynamic,
NodeScope
);
/**
* Dynamic settings
*/
Expand Down Expand Up @@ -550,6 +562,10 @@ private Setting<?> getSetting(String key) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

if (KNN_REMOTE_VECTOR_BUILD_THRESHOLD.equals(key)) {
return KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand Down Expand Up @@ -577,7 +593,8 @@ public List<Setting<?>> getSettings() {
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
KNN_REMOTE_VECTOR_REPO_SETTING,
KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -657,6 +674,10 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
.getAsBoolean(KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED, false);
}

public static ByteSizeValue getKnnRemoteVectorBuildThreshold() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_VECTOR_BUILD_THRESHOLD);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

import org.apache.lucene.index.FieldInfo;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;

import java.io.IOException;
import java.util.function.Supplier;

import static org.opensearch.knn.common.FieldInfoExtractor.extractKNNEngine;
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;

/**
* Creates the {@link NativeIndexBuildStrategy}
Expand All @@ -38,7 +42,11 @@ public NativeIndexBuildStrategyFactory(Supplier<RepositoriesService> repositorie
* @param fieldInfo
* @return
*/
public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
public NativeIndexBuildStrategy getBuildStrategy(
final FieldInfo fieldInfo,
final int totalLiveDocs,
final KNNVectorValues<?> knnVectorValues
) throws IOException {
final KNNEngine knnEngine = extractKNNEngine(fieldInfo);
boolean isTemplate = fieldInfo.attributes().containsKey(MODEL_ID);
boolean iterative = !isTemplate && KNNEngine.FAISS == knnEngine;
Expand All @@ -47,11 +55,15 @@ public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
? MemOptimizedNativeIndexBuildStrategy.getInstance()
: DefaultIndexBuildStrategy.getInstance();

if (repositoriesServiceSupplier != null
initializeVectorValues(knnVectorValues);
long vectorBlobLength = (long) knnVectorValues.bytesPerVector() * totalLiveDocs;

if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
&& repositoriesServiceSupplier != null
&& indexSettings != null
&& knnEngine.supportsRemoteIndexBuild()
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy);
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings, vectorBlobLength)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
} else {
return strategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class NativeIndexWriter {

private final SegmentWriteState state;
private final FieldInfo fieldInfo;
private final NativeIndexBuildStrategy indexBuilder;
private final NativeIndexBuildStrategyFactory indexBuilderFactory;
@Nullable
private final QuantizationState quantizationState;

Expand Down Expand Up @@ -148,6 +148,11 @@ private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValu
knnVectorValuesSupplier,
totalLiveDocs
);
NativeIndexBuildStrategy indexBuilder = indexBuilderFactory.getBuildStrategy(
fieldInfo,
totalLiveDocs,
knnVectorValuesSupplier.get()
);
indexBuilder.buildAndWriteIndex(nativeIndexParams);
CodecUtil.writeFooter(output);
}
Expand Down Expand Up @@ -316,6 +321,6 @@ private static NativeIndexWriter createWriter(
@Nullable final QuantizationState quantizationState,
NativeIndexBuildStrategyFactory nativeIndexBuildStrategyFactory
) {
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory.getBuildStrategy(fieldInfo), quantizationState);
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory, quantizationState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.nativeindex.remote;

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;

/**
* {@link InputStream} implementation of doc ids backed by {@link KNNVectorValues} rather than any file. Intended for use by {@link RemoteIndexBuildStrategy}
*/
@Log4j2
public class DocIdInputStream extends InputStream {
private final KNNVectorValues<?> knnVectorValues;
// Doc ids are 4 byte integers, byte read() only returns a single byte, so we will need to track the byte position within a doc id.
// For simplicity, and to maintain the byte ordering, we use a buffer with size of 1 int.
private ByteBuffer currentBuffer;

public DocIdInputStream(KNNVectorValues<?> knnVectorValues) throws IOException {
this.currentBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN);
this.knnVectorValues = knnVectorValues;
initializeVectorValues(this.knnVectorValues);
reloadBuffer();
}

@Override
public int read() throws IOException {
if (currentBuffer == null) {
return -1;
}

if (!currentBuffer.hasRemaining()) {
advanceAndReloadBuffer();
if (currentBuffer == null) {
return -1;
}
}

return currentBuffer.get() & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (currentBuffer == null) {
return -1;
}

int available = currentBuffer.remaining();
if (available <= 0) {
advanceAndReloadBuffer();
if (currentBuffer == null) {
return -1;
}
available = currentBuffer.remaining();
}

int bytesToRead = Math.min(available, len);
currentBuffer.get(b, off, bytesToRead);
return bytesToRead;
}

/**
* Advances to the next doc, and then refills the buffer with the new doc.
* @throws IOException
*/
private void advanceAndReloadBuffer() throws IOException {
int docId = knnVectorValues.nextDoc();
if (docId != -1 && docId != DocIdSetIterator.NO_MORE_DOCS) {
reloadBuffer();
} else {
// Reset buffer to null to indicate that there are no more docs to be read
currentBuffer = null;
}
}

/**
* Reload {@link currentBuffer} with the current doc id that {@link knnVectorValues} is pointing to
* @throws IOException
*/
private void reloadBuffer() throws IOException {
currentBuffer.clear();
currentBuffer.putInt(knnVectorValues.docId());
currentBuffer.position(0);
}
}
Loading

0 comments on commit 3596778

Please sign in to comment.