From fdb13162259a45cfa09eab22bd2f0f54a1a4c10e Mon Sep 17 00:00:00 2001 From: Varun Bharadwaj Date: Sun, 16 Feb 2025 10:59:06 -0800 Subject: [PATCH] Refactor ingestion engine to work with segment replication and peer recovery Signed-off-by: Varun Bharadwaj --- .../plugin/kafka/IngestFromKafkaIT.java | 188 +++- ...ava => TestContainerThreadLeakFilter.java} | 7 +- .../index/engine/IngestionEngine.java | 838 ++---------------- .../index/engine/InternalEngine.java | 75 +- .../InternalEngineTranslogManager.java | 35 + .../translog/InternalNoOpTranslogManager.java | 173 ++++ .../translog/InternalTranslogManager.java | 3 +- .../opensearch/index/translog/Translog.java | 2 + .../pollingingest/IngestionEngineFactory.java | 5 + .../index/engine/IngestionEngineTests.java | 23 +- .../index/engine/EngineTestCase.java | 9 + 11 files changed, 485 insertions(+), 873 deletions(-) rename plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/{TestContainerWatchdogThreadLeakFilter.java => TestContainerThreadLeakFilter.java} (76%) create mode 100644 server/src/main/java/org/opensearch/index/translog/InternalEngineTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/InternalNoOpTranslogManager.java diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e7d8e36acb302..49fc5c0012c19 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -20,12 +20,16 @@ import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import java.util.Arrays; import java.util.Collection; @@ -43,19 +47,33 @@ import static org.awaitility.Awaitility.await; /** - * Integration test for Kafka ingestion + * Integration test for Kafka ingestion with segment replication */ -@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class) +@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class IngestFromKafkaIT extends OpenSearchIntegTestCase { static final String topicName = "test"; + static final String indexName = "testindex"; + static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; private KafkaContainer kafka; + private Producer producer; @Override protected Collection> nodePlugins() { return Arrays.asList(KafkaPlugin.class); } + @Before + private void setup() { + setupKafka(); + } + + @After + private void cleanup() { + stopKafka(); + } + /** * test ingestion-kafka-plugin is installed */ @@ -75,31 +93,102 @@ public void testPluginsAreInstalled() { } public void testKafkaIngestion() { - try { - setupKafka(); - // create an index with ingestion source from kafka - createIndex( - "test", - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("ingestion_source.type", "kafka") - .put("ingestion_source.pointer.init.reset", "earliest") - .put("ingestion_source.param.topic", "test") - .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) - .build(), - "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" - ); - - RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - refresh("test"); - SearchResponse response = client().prepareSearch("test").setQuery(query).get(); - assertThat(response.getHits().getTotalHits().value(), is(1L)); - }); - } finally { - stopKafka(); - } + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test"); + SearchResponse response = client().prepareSearch("test").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); + } + + public void testSegmentReplicationWithPeerRecovery() throws Exception { + // Step 1: Create primary and replica nodes. Create index with 1 replica and kafka as ingestion source. + + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .build(), + mapping + ); + + ensureYellowAndNoInitializingShards(indexName); + final String nodeB = internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + assertTrue(nodeA.equals(primaryNodeName(indexName))); + assertTrue(nodeB.equals(replicaNodeName(indexName))); + + // Step 2: Produce update messages and validate segment replication + + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + refresh(indexName); + waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB)); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); + SearchResponse primaryResponse = client(nodeA).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L)); + SearchResponse replicaResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L)); + + // Step 3: Stop current primary node and validate replica promotion. + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); + ensureYellowAndNoInitializingShards(indexName); + assertTrue(nodeB.equals(primaryNodeName(indexName))); + + // Step 4: Verify new primary node is able to index documents + + produceData("3", "name3", "30"); + produceData("4", "name4", "31"); + refresh(indexName); + waitForSearchableDocs(4, Arrays.asList(nodeB)); + + SearchResponse newPrimaryResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(newPrimaryResponse.getHits().getTotalHits().value(), is(3L)); + + // Step 5: Add a new node and assign the replica shard. Verify peer recovery works. + + final String nodeC = internalCluster().startDataOnlyNode(); + client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get(); + ensureGreen(indexName); + assertTrue(nodeC.equals(replicaNodeName(indexName))); + + waitForSearchableDocs(4, Arrays.asList(nodeC)); + SearchResponse newReplicaResponse = client(nodeC).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get(); + assertThat(newReplicaResponse.getHits().getTotalHits().value(), is(3L)); + + // Step 6: Produce new updates and verify segment replication works when primary and replica index are not empty. + produceData("5", "name5", "40"); + produceData("6", "name6", "41"); + refresh(indexName); + waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC)); } private void setupKafka() { @@ -107,29 +196,44 @@ private void setupKafka() { // disable topic auto creation .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); kafka.start(); - prepareKafkaData(); + + // setup producer + String boostrapServers = kafka.getBootstrapServers(); + KafkaUtils.createTopic(topicName, 1, boostrapServers); + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); } private void stopKafka() { + if (producer != null) { + producer.close(); + } + if (kafka != null) { kafka.stop(); } } - private void prepareKafkaData() { - String boostrapServers = kafka.getBootstrapServers(); - KafkaUtils.createTopic(topicName, 1, boostrapServers); - Properties props = new Properties(); - props.put("bootstrap.servers", kafka.getBootstrapServers()); - Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")); - producer.send( - new ProducerRecord<>( - topicName, - "null", - "{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}" - ) + private void produceData(String id, String name, String age) { + String payload = String.format( + "{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}", + id, + name, + age ); - producer.close(); + producer.send(new ProducerRecord<>(topicName, "null", payload)); + } + + private void waitForSearchableDocs(long docCount, List nodes) throws Exception { + assertBusy(() -> { + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value(); + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); + } + } + }, 1, TimeUnit.MINUTES); } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java similarity index 76% rename from plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java rename to plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java index 50b88c6233a46..91e2c83ebfa48 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java @@ -13,11 +13,12 @@ /** * The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers, * for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never - * stopped. This filter excludes that thread from the thread leak detection logic. + * stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper + * thread which is not closed on time. */ -public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter { +public final class TestContainerThreadLeakFilter implements ThreadFilter { @Override public boolean reject(Thread t) { - return t.getName().startsWith("testcontainers-pull-watchdog-"); + return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk"); } } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 58c6371d51c0a..fecaea02dcb27 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -8,146 +8,54 @@ package org.opensearch.index.engine; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.util.InfoStream; import org.opensearch.ExceptionsHelper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IngestionSource; -import org.opensearch.common.Booleans; -import org.opensearch.common.Nullable; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.lucene.LoggerInfoStream; import org.opensearch.common.lucene.Lucene; -import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AbstractRunnable; -import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.common.util.io.IOUtils; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.IndexSettings; import org.opensearch.index.IngestionConsumerFactory; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; -import org.opensearch.index.merge.MergeStats; -import org.opensearch.index.merge.OnGoingMerge; -import org.opensearch.index.seqno.SeqNoStats; -import org.opensearch.index.shard.OpenSearchMergePolicy; -import org.opensearch.index.translog.NoOpTranslogManager; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.InternalEngineTranslogManager; +import org.opensearch.index.translog.InternalNoOpTranslogManager; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogCorruptedException; -import org.opensearch.index.translog.TranslogManager; -import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.listener.CompositeTranslogEventListener; import org.opensearch.indices.pollingingest.DefaultStreamPoller; import org.opensearch.indices.pollingingest.StreamPoller; -import org.opensearch.search.suggest.completion.CompletionStats; -import org.opensearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.UnaryOperator; import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT; /** * IngestionEngine is an engine that ingests data from a stream source. */ -public class IngestionEngine extends Engine { - - private volatile SegmentInfos lastCommittedSegmentInfos; - private final CompletionStatsCache completionStatsCache; - private final IndexWriter indexWriter; - private final OpenSearchReaderManager internalReaderManager; - private final ExternalReaderManager externalReaderManager; - private final Lock flushLock = new ReentrantLock(); - private final ReentrantLock optimizeLock = new ReentrantLock(); - private final OpenSearchConcurrentMergeScheduler mergeScheduler; - private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); - private final TranslogManager translogManager; - private final DocumentMapperForType documentMapperForType; - private final IngestionConsumerFactory ingestionConsumerFactory; - private StreamPoller streamPoller; +public class IngestionEngine extends InternalEngine { - /** - * UUID value that is updated every time the engine is force merged. - */ - @Nullable - private volatile String forceMergeUUID; + private StreamPoller streamPoller; + private final IngestionConsumerFactory ingestionConsumerFactory; + private final DocumentMapperForType documentMapperForType; public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) { super(engineConfig); - store.incRef(); - boolean success = false; - try { - this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); - IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata(); - assert indexMetadata != null; - mergeScheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); - indexWriter = createWriter(); - externalReaderManager = createReaderManager(new InternalEngine.RefreshWarmerListener(logger, isClosed, engineConfig)); - internalReaderManager = externalReaderManager.internalReaderManager; - translogManager = new NoOpTranslogManager( - shardId, - readLock, - this::ensureOpen, - new TranslogStats(0, 0, 0, 0, 0), - EMPTY_TRANSLOG_SNAPSHOT - ); - documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); - this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); - - success = true; - } catch (IOException | TranslogCorruptedException e) { - throw new EngineCreationFailureException(shardId, "failed to create engine", e); - } finally { - if (!success) { - if (streamPoller != null) { - try { - streamPoller.close(); - } catch (IOException e) { - logger.error("failed to close stream poller", e); - throw new RuntimeException(e); - } - } - if (!isClosed.get()) { - // failure, we need to dec the store reference - store.decRef(); - } - } - } + this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); + this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); + } /** @@ -171,7 +79,7 @@ public void start() { ); logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId()); - Map commitData = commitDataAsMap(); + Map commitData = commitDataAsMap(indexWriter); StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf( ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT) ); @@ -192,23 +100,13 @@ public void start() { } streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState); - streamPoller.start(); - } - private IndexWriter createWriter() throws IOException { - try { - final IndexWriterConfig iwc = getIndexWriterConfig(); - return createWriter(store.directory(), iwc); - } catch (LockObtainFailedException ex) { - logger.warn("could not lock IndexWriter", ex); - throw ex; + // Poller is only started on the primary shard. Replica shards will rely on segment replication. + if (!engineConfig.isReadOnlyReplica()) { + streamPoller.start(); } } - public DocumentMapperForType getDocumentMapperForType() { - return documentMapperForType; - } - protected Set fetchPersistedOffsets(DirectoryReader directoryReader, IngestionShardPointer batchStart) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); @@ -230,195 +128,6 @@ protected Set fetchPersistedOffsets(DirectoryReader direc return result; } - /** - * a copy of ExternalReaderManager from InternalEngine - */ - @SuppressForbidden(reason = "reference counting is required here") - static final class ExternalReaderManager extends ReferenceManager { - private final BiConsumer refreshListener; - private final OpenSearchReaderManager internalReaderManager; - private boolean isWarmedUp; // guarded by refreshLock - - ExternalReaderManager( - OpenSearchReaderManager internalReaderManager, - BiConsumer refreshListener - ) throws IOException { - this.refreshListener = refreshListener; - this.internalReaderManager = internalReaderManager; - this.current = internalReaderManager.acquire(); // steal the reference without warming up - } - - @Override - protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { - // we simply run a blocking refresh on the internal reference manager and then steal it's reader - // it's a save operation since we acquire the reader which incs it's reference but then down the road - // steal it by calling incRef on the "stolen" reader - internalReaderManager.maybeRefreshBlocking(); - final OpenSearchDirectoryReader newReader = internalReaderManager.acquire(); - if (isWarmedUp == false || newReader != referenceToRefresh) { - boolean success = false; - try { - refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null); - isWarmedUp = true; - success = true; - } finally { - if (success == false) { - internalReaderManager.release(newReader); - } - } - } - // nothing has changed - both ref managers share the same instance so we can use reference equality - if (referenceToRefresh == newReader) { - internalReaderManager.release(newReader); - return null; - } else { - return newReader; // steal the reference - } - } - - @Override - protected boolean tryIncRef(OpenSearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(OpenSearchDirectoryReader reference) { - return reference.getRefCount(); - } - - @Override - protected void decRef(OpenSearchDirectoryReader reference) throws IOException { - reference.decRef(); - } - } - - private ExternalReaderManager createReaderManager(InternalEngine.RefreshWarmerListener externalRefreshListener) throws EngineException { - boolean success = false; - OpenSearchReaderManager internalReaderManager = null; - try { - try { - final OpenSearchDirectoryReader directoryReader = OpenSearchDirectoryReader.wrap( - DirectoryReader.open(indexWriter), - shardId - ); - internalReaderManager = new OpenSearchReaderManager(directoryReader); - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - ExternalReaderManager externalReaderManager = new ExternalReaderManager(internalReaderManager, externalRefreshListener); - success = true; - return externalReaderManager; - } catch (IOException e) { - maybeFailEngine("start", e); - try { - indexWriter.rollback(); - } catch (IOException inner) { // iw is closed below - e.addSuppressed(inner); - } - throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); - } - } finally { - if (success == false) { // release everything we created on a failure - IOUtils.closeWhileHandlingException(internalReaderManager, indexWriter); - } - } - } - - // pkg-private for testing - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return new IndexWriter(directory, iwc); - } - - private IndexWriterConfig getIndexWriterConfig() { - final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); - iwc.setCommitOnClose(false); // we by default don't commit on close - iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - // with tests.verbose, lucene sets this up: plumb to align with filesystem stream - boolean verbose = false; - try { - verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); - } catch (Exception ignore) {} - iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); - iwc.setMergeScheduler(mergeScheduler); - // set merge scheduler - MergePolicy mergePolicy = config().getMergePolicy(); - boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString())); - if (shuffleForcedMerge) { - // We wrap the merge policy for all indices even though it is mostly useful for time-based indices - // but there should be no overhead for other type of indices so it's simpler than adding a setting - // to enable it. - mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); - } - - if (config().getIndexSettings().isMergeOnFlushEnabled()) { - final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis(); - if (maxFullFlushMergeWaitMillis > 0) { - iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis); - final Optional> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy(); - if (mergeOnFlushPolicy.isPresent()) { - mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy); - } - } - } else { - // Disable merge on refresh - iwc.setMaxFullFlushMergeWaitMillis(0); - } - - iwc.setCheckPendingFlushUpdate(config().getIndexSettings().isCheckPendingFlushEnabled()); - iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); - iwc.setSimilarity(engineConfig.getSimilarity()); - iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); - iwc.setUseCompoundFile(engineConfig.useCompoundFile()); - if (config().getIndexSort() != null) { - iwc.setIndexSort(config().getIndexSort()); - } - if (config().getLeafSorter() != null) { - iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order - } - - return new IndexWriterConfig(new StandardAnalyzer()); - } - - @Override - public TranslogManager translogManager() { - // ingestion engine does not have translog - return translogManager; - } - - @Override - protected SegmentInfos getLastCommittedSegmentInfos() { - return lastCommittedSegmentInfos; - } - - @Override - protected SegmentInfos getLatestSegmentInfos() { - throw new UnsupportedOperationException(); - } - - @Override - public String getHistoryUUID() { - return loadHistoryUUID(lastCommittedSegmentInfos.userData); - } - - @Override - public long getWritingBytes() { - return 0; - } - - @Override - public CompletionStats completionStats(String... fieldNamePatterns) { - return completionStatsCache.get(fieldNamePatterns); - } - - @Override - public long getIndexThrottleTimeInMillis() { - return 0; - } - - @Override - public boolean isThrottled() { - return false; - } - @Override public IndexResult index(Index index) throws IOException { assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field(); @@ -459,16 +168,6 @@ public GetResult get(Get get, BiFunction search return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); } - @Override - protected ReferenceManager getReferenceManager(SearcherScope scope) { - return externalReaderManager; - } - - @Override - public Closeable acquireHistoryRetentionLock() { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -477,199 +176,36 @@ public Translog.Snapshot newChangesSnapshot( boolean requiredFullRange, boolean accurateCount ) throws IOException { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException { - return 0; - } - - @Override - public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { - return false; - } - - @Override - public long getMinRetainedSeqNo() { - return 0; - } - - @Override - public long getPersistedLocalCheckpoint() { - return 0; - } - - @Override - public long getProcessedLocalCheckpoint() { - return 0; - } - - @Override - public SeqNoStats getSeqNoStats(long globalCheckpoint) { - return null; - } - - @Override - public long getLastSyncedGlobalCheckpoint() { - return 0; - } - - @Override - public long getIndexBufferRAMBytesUsed() { - return 0; - } - - @Override - public List segments(boolean verbose) { - try (ReleasableLock lock = readLock.acquire()) { - Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose); - - // fill in the merges flag - Set onGoingMerges = mergeScheduler.onGoingMerges(); - for (OnGoingMerge onGoingMerge : onGoingMerges) { - for (SegmentCommitInfo segmentInfoPerCommit : onGoingMerge.getMergedSegments()) { - for (Segment segment : segmentsArr) { - if (segment.getName().equals(segmentInfoPerCommit.info.name)) { - segment.mergeId = onGoingMerge.getId(); - break; - } - } - } - } - return Arrays.asList(segmentsArr); - } - } - - @Override - public void refresh(String source) throws EngineException { - refresh(source, SearcherScope.EXTERNAL, true); - } - - final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - boolean refreshed; - try { - // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. - if (store.tryIncRef()) { - // increment the ref just to ensure nobody closes the store during a refresh - try { - // even though we maintain 2 managers we really do the heavy-lifting only once. - // the second refresh will only do the extra work we have to do for warming caches etc. - ReferenceManager referenceManager = getReferenceManager(scope); - // it is intentional that we never refresh both internal / external together - if (block) { - referenceManager.maybeRefreshBlocking(); - refreshed = true; - } else { - refreshed = referenceManager.maybeRefresh(); - } - } finally { - store.decRef(); - } - } else { - refreshed = false; - } - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - try { - failEngine("refresh failed source[" + source + "]", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new RefreshFailedEngineException(shardId, e); - } - // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes - // for a long time: - maybePruneDeletes(); - // TODO: use OS merge scheduler - mergeScheduler.refreshConfig(); - return refreshed; - } - - @Override - public boolean maybeRefresh(String source) throws EngineException { - return refresh(source, SearcherScope.EXTERNAL, false); - } - - @Override - public void writeIndexingBuffer() throws EngineException { - refresh("write indexing buffer", SearcherScope.INTERNAL, false); - } - - @Override - public boolean shouldPeriodicallyFlush() { - return false; - } - - @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - ensureOpen(); - if (force && waitIfOngoing == false) { - assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; - throw new IllegalArgumentException( - "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing - ); - } - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - if (flushLock.tryLock() == false) { - // if we can't get the lock right away we block if needed otherwise barf - if (waitIfOngoing == false) { - return; - } - logger.trace("waiting for in-flight flush to finish"); - flushLock.lock(); - logger.trace("acquired flush lock after blocking"); - } else { - logger.trace("acquired flush lock immediately"); - } - try { - // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, - // - // do we need to consider #3 and #4 as in InternalEngine? - // (3) the newly created commit points to a different translog generation (can free translog), - // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. - boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges(); - if (hasUncommittedChanges || force) { - logger.trace("starting commit for flush;"); - - // TODO: do we need to close the latest commit as done in InternalEngine? - commitIndexWriter(indexWriter); - - logger.trace("finished commit for flush"); - - // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved - logger.debug("new commit on flush, hasUncommittedChanges:{}, force:{}", hasUncommittedChanges, force); - - // we need to refresh in order to clear older version values - refresh("version_table_flush", SearcherScope.INTERNAL, true); - } - } catch (FlushFailedEngineException ex) { - maybeFailEngine("flush", ex); - throw ex; - } catch (IOException e) { - throw new FlushFailedEngineException(shardId, e); - } finally { - flushLock.unlock(); - } - } + return EMPTY_TRANSLOG_SNAPSHOT; } /** - * Commits the specified index writer. - * - * @param writer the index writer to commit + * This method is a copy of commitIndexWriter method from {@link InternalEngine} with some additions for ingestion + * source. */ - protected void commitIndexWriter(final IndexWriter writer) throws IOException { + @Override + protected void commitIndexWriter(final IndexWriter writer, final String translogUUID) throws IOException { try { + final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); writer.setLiveCommitData(() -> { /* - * The user data captured the min and max range of the stream poller + * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes + * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want + * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the + * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently + * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the + * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time + * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(2); - + final Map commitData = new HashMap<>(7); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + commitData.put(HISTORY_UUID_KEY, historyUUID); + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); + + // ingestion engine needs to record batch start pointer commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString()); final String currentForceMergeUUID = forceMergeUUID; if (currentForceMergeUUID != null) { @@ -678,6 +214,7 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); + shouldPeriodicallyFlushAfterBigMerge.set(false); writer.commit(); } catch (final Exception ex) { try { @@ -705,268 +242,6 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { } } - @Override - public MergeStats getMergeStats() { - return mergeScheduler.stats(); - } - - @Override - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - mergeScheduler.refreshConfig(); - // TODO: do we need more? - } - - protected Map commitDataAsMap() { - return commitDataAsMap(indexWriter); - } - - /** - * Gets the commit data from {@link IndexWriter} as a map. - */ - protected static Map commitDataAsMap(final IndexWriter indexWriter) { - final Map commitData = new HashMap<>(8); - for (Map.Entry entry : indexWriter.getLiveCommitData()) { - commitData.put(entry.getKey(), entry.getValue()); - } - return commitData; - } - - @Override - public void forceMerge( - boolean flush, - int maxNumSegments, - boolean onlyExpungeDeletes, - boolean upgrade, - boolean upgradeOnlyAncientSegments, - String forceMergeUUID - ) throws EngineException, IOException { - /* - * We do NOT acquire the readlock here since we are waiting on the merges to finish - * that's fine since the IW.rollback should stop all the threads and trigger an IOException - * causing us to fail the forceMerge - * - * The way we implement upgrades is a bit hackish in the sense that we set an instance - * variable and that this setting will thus apply to the next forced merge that will be run. - * This is ok because (1) this is the only place we call forceMerge, (2) we have a single - * thread for optimize, and the 'optimizeLock' guarding this code, and (3) ConcurrentMergeScheduler - * syncs calls to findForcedMerges. - */ - assert indexWriter.getConfig().getMergePolicy() instanceof OpenSearchMergePolicy : "MergePolicy is " - + indexWriter.getConfig().getMergePolicy().getClass().getName(); - OpenSearchMergePolicy mp = (OpenSearchMergePolicy) indexWriter.getConfig().getMergePolicy(); - optimizeLock.lock(); - try { - ensureOpen(); - if (upgrade) { - logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments); - mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments); - } - store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize - try { - if (onlyExpungeDeletes) { - assert upgrade == false; - indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/); - } else if (maxNumSegments <= 0) { - assert upgrade == false; - indexWriter.maybeMerge(); - } else { - indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); - this.forceMergeUUID = forceMergeUUID; - } - if (flush) { - flush(false, true); - } - if (upgrade) { - logger.info("finished segment upgrade"); - } - } finally { - store.decRef(); - } - } catch (AlreadyClosedException ex) { - /* in this case we first check if the engine is still open. If so this exception is just fine - * and expected. We don't hold any locks while we block on forceMerge otherwise it would block - * closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures - * we are handling a tragic even exception here */ - ensureOpen(ex); - failOnTragicEvent(ex); - throw ex; - } catch (Exception e) { - try { - maybeFailEngine(FORCE_MERGE, e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw e; - } finally { - try { - // reset it just to make sure we reset it in a case of an error - mp.setUpgradeInProgress(false, false); - } finally { - optimizeLock.unlock(); - } - } - } - - @Override - public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { - store.incRef(); - try { - var reader = getReferenceManager(SearcherScope.INTERNAL).acquire(); - return new GatedCloseable<>(reader.getIndexCommit(), () -> { - store.decRef(); - getReferenceManager(SearcherScope.INTERNAL).release(reader); - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public GatedCloseable acquireSafeIndexCommit() throws EngineException { - // TODO: do we need this? likely not - return acquireLastIndexCommit(false); - } - - @Override - public SafeCommitInfo getSafeCommitInfo() { - // TODO: do we need this? - return SafeCommitInfo.EMPTY; - } - - @Override - protected void closeNoLock(String reason, CountDownLatch closedLatch) { - if (isClosed.compareAndSet(false, true)) { - assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() - : "Either the write lock must be held or the engine must be currently be failing itself"; - try { - try { - IOUtils.close(externalReaderManager, internalReaderManager); - } catch (Exception e) { - logger.warn("Failed to close ReaderManager", e); - } - - // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed - logger.trace("rollback indexWriter"); - try { - indexWriter.rollback(); - } catch (AlreadyClosedException ex) { - failOnTragicEvent(ex); - throw ex; - } - logger.trace("rollback indexWriter done"); - } catch (Exception e) { - logger.warn("failed to rollback writer on close", e); - } finally { - try { - store.decRef(); - logger.debug("engine closed [{}]", reason); - } finally { - closedLatch.countDown(); - } - } - } - } - - private boolean failOnTragicEvent(AlreadyClosedException ex) { - final boolean engineFailed; - // if we are already closed due to some tragic exception - // we need to fail the engine. it might have already been failed before - // but we are double-checking it's failed and closed - if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { - final Exception tragicException; - if (indexWriter.getTragicException() instanceof Exception) { - tragicException = (Exception) indexWriter.getTragicException(); - } else { - tragicException = new RuntimeException(indexWriter.getTragicException()); - } - failEngine("already closed by tragic event on the index writer", tragicException); - engineFailed = true; - } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? - // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by - // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error - throw new AssertionError("Unexpected AlreadyClosedException", ex); - } else { - engineFailed = false; - } - return engineFailed; - } - - private final class EngineMergeScheduler extends OpenSearchConcurrentMergeScheduler { - private final AtomicInteger numMergesInFlight = new AtomicInteger(0); - private final AtomicBoolean isThrottling = new AtomicBoolean(); - - EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - super(shardId, indexSettings); - } - - @Override - public synchronized void beforeMerge(OnGoingMerge merge) { - int maxNumMerges = mergeScheduler.getMaxMergeCount(); - if (numMergesInFlight.incrementAndGet() > maxNumMerges) { - if (isThrottling.getAndSet(true) == false) { - logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - activateThrottling(); - } - } - } - - @Override - public synchronized void afterMerge(OnGoingMerge merge) { - int maxNumMerges = mergeScheduler.getMaxMergeCount(); - if (numMergesInFlight.decrementAndGet() < maxNumMerges) { - if (isThrottling.getAndSet(false)) { - logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - deactivateThrottling(); - } - } - if (indexWriter.hasPendingMerges() == false - && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { - // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer - // we deadlock on engine#close for instance. - engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (isClosed.get() == false) { - logger.warn("failed to flush after merge has finished"); - } - } - - @Override - protected void doRun() { - // if we have no pending merges and we are supposed to flush once merges have finished to - // free up transient disk usage of the (presumably biggish) segments that were just merged - flush(); - } - }); - } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { - // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change - // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. - // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. - shouldPeriodicallyFlushAfterBigMerge.set(true); - } - } - - @Override - protected void handleMergeException(final Throwable exc) { - engineConfig.getThreadPool().generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.debug("merge failure action rejected", e); - } - - @Override - protected void doRun() throws Exception { - /* - * We do this on another thread rather than the merge thread that we are initially called on so that we have complete - * confidence that the call stack does not contain catch statements that would cause the error that might be thrown - * here from being caught and never reaching the uncaught exception handler. - */ - failEngine(MERGE_FAILED, new MergePolicy.MergeException(exc)); - } - }); - } - } - @Override public void activateThrottling() { // TODO: add this when we have a thread pool for indexing in parallel @@ -977,38 +252,33 @@ public void deactivateThrottling() { // TODO: is this needed? } - @Override - public int fillSeqNoGaps(long primaryTerm) throws IOException { - // TODO: is this needed? - return 0; - } - @Override public void maybePruneDeletes() { // no need to prune deletes in ingestion engine } @Override - public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { - // TODO: is this needed? + public void close() throws IOException { + if (streamPoller != null) { + streamPoller.close(); + } + super.close(); } - @Override - public long getMaxSeqNoOfUpdatesOrDeletes() { - // TODO: is this needed? - return 0; + public DocumentMapperForType getDocumentMapperForType() { + return documentMapperForType; } @Override - public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { - // TODO: is this needed? + protected InternalEngineTranslogManager createTranslogManager( + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + CompositeTranslogEventListener translogEventListener + ) throws IOException { + return new InternalNoOpTranslogManager(engineConfig, shardId, readLock, translogUUID); } - @Override - public void close() throws IOException { - if (streamPoller != null) { - streamPoller.close(); - } - super.close(); + protected Map commitDataAsMap() { + return commitDataAsMap(indexWriter); } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index ff790fa1513f1..8fcc1326628cd 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -104,6 +104,7 @@ import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; +import org.opensearch.index.translog.InternalEngineTranslogManager; import org.opensearch.index.translog.InternalTranslogManager; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; @@ -144,16 +145,28 @@ */ public class InternalEngine extends Engine { + /** + * UUID value that is updated every time the engine is force merged. + */ + @Nullable + protected volatile String forceMergeUUID; + /** * When we last pruned expired tombstones from versionMap.deletes: */ private volatile long lastDeleteVersionPruneTimeMSec; - private final InternalTranslogManager translogManager; - private final OpenSearchConcurrentMergeScheduler mergeScheduler; + protected final InternalEngineTranslogManager translogManager; + protected final IndexWriter indexWriter; + protected final LocalCheckpointTracker localCheckpointTracker; + protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + protected final SoftDeletesPolicy softDeletesPolicy; + protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); - private final IndexWriter indexWriter; + @Nullable + protected final String historyUUID; + private final OpenSearchConcurrentMergeScheduler mergeScheduler; private final ExternalReaderManager externalReaderManager; private final OpenSearchReaderManager internalReaderManager; @@ -168,15 +181,12 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; - private final LocalCheckpointTracker localCheckpointTracker; - private final CombinedDeletionPolicy combinedDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); - private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. @@ -189,14 +199,12 @@ public class InternalEngine extends Engine { private final CounterMetric numDocAppends = new CounterMetric(); private final CounterMetric numDocUpdates = new CounterMetric(); private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField(); - private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final CompletionStatsCache completionStatsCache; private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final KeyedLock noOpKeyedLock = new KeyedLock<>(); - private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); /** * If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted @@ -210,15 +218,6 @@ public class InternalEngine extends Engine { private final int maxDocs; - @Nullable - private final String historyUUID; - - /** - * UUID value that is updated every time the engine is force merged. - */ - @Nullable - private volatile String forceMergeUUID; - public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER); } @@ -249,7 +248,7 @@ public TranslogManager translogManager() { ExternalReaderManager externalReaderManager = null; OpenSearchReaderManager internalReaderManager = null; EngineMergeScheduler scheduler = null; - InternalTranslogManager translogManagerRef = null; + InternalEngineTranslogManager translogManagerRef = null; boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); @@ -280,20 +279,11 @@ public void onFailure(String reason, Exception ex) { } } }; - translogManagerRef = new InternalTranslogManager( - engineConfig.getTranslogConfig(), - engineConfig.getPrimaryTermSupplier(), - engineConfig.getGlobalCheckpointSupplier(), - translogDeletionPolicy, - shardId, - readLock, - this::getLocalCheckpointTracker, - translogUUID, - new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), - this::ensureOpen, - engineConfig.getTranslogFactory(), - engineConfig.getStartedPrimarySupplier() + CompositeTranslogEventListener compositeTranslogEventListener = new CompositeTranslogEventListener( + Arrays.asList(internalTranslogEventListener, translogEventListener), + shardId ); + translogManagerRef = createTranslogManager(translogUUID, translogDeletionPolicy, compositeTranslogEventListener); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy( @@ -362,6 +352,27 @@ public void onFailure(String reason, Exception ex) { logger.trace("created new InternalEngine"); } + protected InternalEngineTranslogManager createTranslogManager( + String translogUUID, + TranslogDeletionPolicy translogDeletionPolicy, + CompositeTranslogEventListener translogEventListener + ) throws IOException { + return new InternalTranslogManager( + engineConfig.getTranslogConfig(), + engineConfig.getPrimaryTermSupplier(), + engineConfig.getGlobalCheckpointSupplier(), + translogDeletionPolicy, + shardId, + readLock, + this::getLocalCheckpointTracker, + translogUUID, + translogEventListener, + this::ensureOpen, + engineConfig.getTranslogFactory(), + engineConfig.getStartedPrimarySupplier() + ); + } + private LocalCheckpointTracker createLocalCheckpointTracker( BiFunction localCheckpointTrackerSupplier ) throws IOException { @@ -2773,7 +2784,7 @@ public Closeable acquireHistoryRetentionLock() { /** * Gets the commit data from {@link IndexWriter} as a map. */ - private static Map commitDataAsMap(final IndexWriter indexWriter) { + protected static Map commitDataAsMap(final IndexWriter indexWriter) { final Map commitData = new HashMap<>(8); for (Map.Entry entry : indexWriter.getLiveCommitData()) { commitData.put(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/opensearch/index/translog/InternalEngineTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalEngineTranslogManager.java new file mode 100644 index 0000000000000..2f7c4ed7c56f8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/InternalEngineTranslogManager.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A {@link TranslogManager} interface for interacting with the {@link org.opensearch.index.engine.InternalEngine} + * + * @opensearch.internal + */ +public interface InternalEngineTranslogManager extends TranslogManager, Closeable { + + long getLastSyncedGlobalCheckpoint(); + + long getMaxSeqNo(); + + void trimUnreferencedReaders() throws IOException; + + boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold); + + Exception getTragicExceptionIfClosed(); + + TranslogDeletionPolicy getDeletionPolicy(); + + String getTranslogUUID(); + +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalNoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalNoOpTranslogManager.java new file mode 100644 index 0000000000000..33577af0de5e0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/InternalNoOpTranslogManager.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.util.stream.Stream; + +import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION; +import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT; + +/** + * A no-op {@link TranslogManager} implementation capable of interfacing with the {@link org.opensearch.index.engine.InternalEngine} + * + * @opensearch.internal + */ +public class InternalNoOpTranslogManager implements InternalEngineTranslogManager { + private final TranslogDeletionPolicy translogDeletionPolicy; + private final TranslogManager noOpTranslogManager; + private final String translogUUID; + + public InternalNoOpTranslogManager(EngineConfig engineConfig, ShardId shardId, ReleasableLock readLock, String translogUUID) + throws IOException { + this.translogUUID = translogUUID; + this.translogDeletionPolicy = new DefaultTranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + ); + + this.noOpTranslogManager = new NoOpTranslogManager( + shardId, + readLock, + () -> {}, + new TranslogStats(0, 0, 0, 0, 0), + EMPTY_TRANSLOG_SNAPSHOT + ); + } + + @Override + public void rollTranslogGeneration() throws TranslogException, IOException {} + + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + return 0; + } + + @Override + public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + return null; + } + + @Override + public boolean isTranslogSyncNeeded() { + return false; + } + + @Override + public boolean ensureTranslogSynced(Stream locations) throws IOException { + return true; + } + + @Override + public void syncTranslog() throws IOException {} + + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException {} + + @Override + public boolean shouldRollTranslogGeneration() { + return false; + } + + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} + + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + return 0; + } + + @Override + public void onDelete() {} + + @Override + public Releasable drainSync() { + return () -> {}; + } + + @Override + public long getMaxSeqNo() { + return SequenceNumbers.NO_OPS_PERFORMED; + } + + @Override + public TranslogStats getTranslogStats() { + return new TranslogStats(); + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return EMPTY_TRANSLOG_LOCATION; + } + + @Override + public void ensureCanFlush() {} + + @Override + public void setMinSeqNoToKeep(long seqNo) {} + + @Override + public Translog.TranslogGeneration getTranslogGeneration() { + return new Translog.TranslogGeneration(translogUUID, 0); + } + + @Override + public Translog.Operation readOperation(Translog.Location location) throws IOException { + return null; + } + + @Override + public Translog.Location add(Translog.Operation operation) throws IOException { + return new Translog.Location(0, 0, 0); + } + + @Override + public void skipTranslogRecovery() {} + + @Override + public long getLastSyncedGlobalCheckpoint() { + return 0; + } + + @Override + public void trimUnreferencedReaders() throws IOException {} + + @Override + public String getTranslogUUID() { + return translogUUID; + } + + @Override + public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + return false; + } + + @Override + public Exception getTragicExceptionIfClosed() { + return null; + } + + @Override + public TranslogDeletionPolicy getDeletionPolicy() { + return translogDeletionPolicy; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index e2210217672ef..f61ac6ad48f07 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -21,7 +21,6 @@ import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.index.translog.transfer.TranslogUploadFailedException; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -36,7 +35,7 @@ * * @opensearch.internal */ -public class InternalTranslogManager implements TranslogManager, Closeable { +public class InternalTranslogManager implements InternalEngineTranslogManager { private final ReleasableLock readLock; private final LifecycleAware engineLifeCycleAware; diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index ffda06d8d8292..b1e88624c9906 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -899,6 +899,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { return deletionPolicy; } + public static final Translog.Location EMPTY_TRANSLOG_LOCATION = new Translog.Location(0, 0, 0); + /** * Location in the translot * diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java index e124adb90365b..16688feddf53c 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.engine.NRTReplicationEngine; import java.util.Objects; @@ -29,6 +30,10 @@ public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory) @Override public Engine newReadWriteEngine(EngineConfig config) { + if (config.isReadOnlyReplica()) { + return new NRTReplicationEngine(config); + } + IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory); ingestionEngine.start(); return ingestionEngine; diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index 19718384bd926..2d00bbcba0c8c 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -36,8 +36,9 @@ import java.util.concurrent.atomic.AtomicLong; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; public class IngestionEngineTests extends EngineTestCase { @@ -46,6 +47,7 @@ public class IngestionEngineTests extends EngineTestCase { private IngestionEngine ingestionEngine; // the messages of the stream to ingest from private List messages; + private EngineConfig engineConfig; @Override @Before @@ -86,6 +88,7 @@ public void tearDown() throws Exception { ingestionEngineStore.close(); } super.tearDown(); + engineConfig = null; } public void testCreateEngine() throws IOException { @@ -95,7 +98,7 @@ public void testCreateEngine() throws IOException { ingestionEngine.flush(false, true); Map commitData = ingestionEngine.commitDataAsMap(); // verify the commit data - Assert.assertEquals(1, commitData.size()); + Assert.assertEquals(7, commitData.size()); Assert.assertEquals("2", commitData.get(StreamPoller.BATCH_START)); // verify the stored offsets @@ -120,21 +123,19 @@ public void testRecovery() throws IOException { publishData("{\"_id\":\"3\",\"_source\":{\"name\":\"john\", \"age\": 30}}"); publishData("{\"_id\":\"4\",\"_source\":{\"name\":\"jane\", \"age\": 25}}"); ingestionEngine.close(); - ingestionEngine = buildIngestionEngine(new AtomicLong(2), ingestionEngineStore, indexSettings); + ingestionEngine = buildIngestionEngine(new AtomicLong(0), ingestionEngineStore, indexSettings); waitForResults(ingestionEngine, 4); } public void testCreationFailure() throws IOException { - // Simulate an error scenario - Store mockStore = mock(Store.class); - doThrow(new IOException("Simulated IOException")).when(mockStore).readLastCommittedSegmentsInfo(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); + Store mockStore = spy(store); + doThrow(new IOException("Simulated IOException")).when(mockStore).trimUnsafeCommits(any()); + EngineConfig engineConfig = config( indexSettings, - store, + mockStore, createTempDir(), NoMergePolicy.INSTANCE, null, @@ -156,7 +157,9 @@ public void testCreationFailure() throws IOException { private IngestionEngine buildIngestionEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) throws IOException { FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); - EngineConfig engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + if (engineConfig == null) { + engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + } // overwrite the config with ingestion engine settings String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; MapperService mapperService = createMapperService(mapping); diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index d6cd5cfb81dc4..d59f4a1b161f3 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -112,6 +112,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.InternalNoOpTranslogManager; import org.opensearch.index.translog.InternalTranslogManager; import org.opensearch.index.translog.LocalTranslog; import org.opensearch.index.translog.Translog; @@ -1607,6 +1608,14 @@ public static Translog getTranslog(Engine engine) { : "only InternalEngines or NRTReplicationEngines have translogs, got: " + engine.getClass(); engine.ensureOpen(); TranslogManager translogManager = engine.translogManager(); + + // InternalNoOpTranslogManager does not use a real translog and hence not required to close. + // The only place this method is called in ingestion tests is to validate translog is closed, hence throw + // AlreadyClosedException + if (translogManager instanceof InternalNoOpTranslogManager) { + throw new AlreadyClosedException("InternalNoOpTranslogManager uses no-op translog"); + } + assert translogManager instanceof InternalTranslogManager : "only InternalTranslogManager have translogs, got: " + engine.getClass(); InternalTranslogManager internalTranslogManager = (InternalTranslogManager) translogManager;