Skip to content

Commit b99be96

Browse files
authored
KAFKA-18206: EmbeddedKafkaCluster must set features (#18189)
related to KAFKA-18206, set features in EmbeddedKafkaCluster in both streams and connect module, note that this PR also fix potential transaction with empty records in sendPrivileged method as transaction version 2 doesn't allow this kind of scenario. Reviewers: Justine Olshan <[email protected]>
1 parent 102de21 commit b99be96

File tree

3 files changed

+14
-4
lines changed

3 files changed

+14
-4
lines changed

connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.kafka.common.acl.AclPermissionType;
3131
import org.apache.kafka.common.config.ConfigDef;
3232
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
33-
import org.apache.kafka.common.errors.ProducerFencedException;
33+
import org.apache.kafka.common.errors.InvalidProducerEpochException;
3434
import org.apache.kafka.common.resource.PatternType;
3535
import org.apache.kafka.common.resource.ResourcePattern;
3636
import org.apache.kafka.common.resource.ResourceType;
@@ -1225,7 +1225,7 @@ private KafkaProducer<byte[], byte[]> transactionalProducer(String clientId, Str
12251225
private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> producer, String topic) {
12261226
producer.beginTransaction();
12271227
assertThrows(
1228-
ProducerFencedException.class,
1228+
InvalidProducerEpochException.class,
12291229
() -> {
12301230
producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96}));
12311231
producer.commitTransaction();

core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,8 @@ class KRaftClusterTest {
696696
new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), OpType.SET))))))
697697
validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
698698
("log.roll.ms", "1234567"),
699-
("max.connections.per.ip", "60"))), exhaustive = true)
699+
("max.connections.per.ip", "60"),
700+
("min.insync.replicas", "1"))), exhaustive = true)
700701

701702
admin.createTopics(util.Arrays.asList(
702703
new NewTopic("foo", 2, 3.toShort),

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.metadata.properties.MetaProperties;
2626
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
2727
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
28+
import org.apache.kafka.server.common.Feature;
2829
import org.apache.kafka.server.common.MetadataVersion;
2930

3031
import java.io.File;
@@ -55,7 +56,15 @@ public static class Builder {
5556
private BootstrapMetadata bootstrapMetadata;
5657

5758
public Builder() {
58-
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
59+
this(BootstrapMetadata.fromVersions(
60+
MetadataVersion.latestTesting(),
61+
Feature.PRODUCTION_FEATURES.stream()
62+
.collect(Collectors.toMap(
63+
Feature::featureName,
64+
feature -> feature.defaultLevel(MetadataVersion.latestTesting()),
65+
(existing, replacement) -> existing,
66+
TreeMap::new)),
67+
"testkit"));
5968
}
6069

6170
public Builder(BootstrapMetadata bootstrapMetadata) {

0 commit comments

Comments
 (0)