Skip to content

Commit

Permalink
Add remote store tests for kafka ingestion
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 21, 2025
1 parent fdb1316 commit b758603
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,32 @@

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import static org.hamcrest.Matchers.is;
import static org.awaitility.Awaitility.await;

/**
* Integration test for Kafka ingestion with segment replication
*/
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
static final String topicName = "test";
static final String indexName = "testindex";
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";

private KafkaContainer kafka;
private Producer<String, String> producer;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class);
}

@Before
private void setup() {
setupKafka();
}

@After
private void cleanup() {
stopKafka();
}

public class IngestFromKafkaIT extends KafkaIngestionBaseIT {
/**
* test ingestion-kafka-plugin is installed
*/
Expand Down Expand Up @@ -117,123 +77,4 @@ public void testKafkaIngestion() {
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
}

public void testSegmentReplicationWithPeerRecovery() throws Exception {
// Step 1: Create primary and replica nodes. Create index with 1 replica and kafka as ingestion source.

internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
mapping
);

ensureYellowAndNoInitializingShards(indexName);
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);
assertTrue(nodeA.equals(primaryNodeName(indexName)));
assertTrue(nodeB.equals(replicaNodeName(indexName)));

// Step 2: Produce update messages and validate segment replication

produceData("1", "name1", "24");
produceData("2", "name2", "20");
refresh(indexName);
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
SearchResponse primaryResponse = client(nodeA).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L));
SearchResponse replicaResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L));

// Step 3: Stop current primary node and validate replica promotion.

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

// Step 4: Verify new primary node is able to index documents

produceData("3", "name3", "30");
produceData("4", "name4", "31");
refresh(indexName);
waitForSearchableDocs(4, Arrays.asList(nodeB));

SearchResponse newPrimaryResponse = client(nodeB).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(newPrimaryResponse.getHits().getTotalHits().value(), is(3L));

// Step 5: Add a new node and assign the replica shard. Verify peer recovery works.

final String nodeC = internalCluster().startDataOnlyNode();
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
ensureGreen(indexName);
assertTrue(nodeC.equals(replicaNodeName(indexName)));

waitForSearchableDocs(4, Arrays.asList(nodeC));
SearchResponse newReplicaResponse = client(nodeC).prepareSearch(indexName).setQuery(query).setPreference("_only_local").get();
assertThat(newReplicaResponse.getHits().getTotalHits().value(), is(3L));

// Step 6: Produce new updates and verify segment replication works when primary and replica index are not empty.
produceData("5", "name5", "40");
produceData("6", "name6", "41");
refresh(indexName);
waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC));
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();

// setup producer
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
}

private void stopKafka() {
if (producer != null) {
producer.close();
}

if (kafka != null) {
kafka.stop();
}
}

private void produceData(String id, String name, String age) {
String payload = String.format(
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
name,
age
);
producer.send(new ProducerRecord<>(topicName, "null", payload));
}

private void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value();
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Base test class for Kafka ingestion tests
*/
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
static final String topicName = "test";
static final String indexName = "testindex";
static final String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";

protected KafkaContainer kafka;
protected Producer<String, String> producer;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(KafkaPlugin.class);
}

@Before
private void setup() {
setupKafka();
}

@After
private void cleanup() {
stopKafka();
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
kafka.start();

// setup producer
String boostrapServers = kafka.getBootstrapServers();
KafkaUtils.createTopic(topicName, 1, boostrapServers);
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
}

private void stopKafka() {
if (producer != null) {
producer.close();
}

if (kafka != null) {
kafka.stop();
}
}

protected void produceData(String id, String name, String age) {
String payload = String.format(
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
id,
name,
age
);
producer.send(new ProducerRecord<>(topicName, "null", payload));
}

protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value();
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}
}
Loading

0 comments on commit b758603

Please sign in to comment.