Skip to content

Commit 8b2c9ad

Browse files
Merge branch 'trunk' into KAFKA-18634
2 parents 81648ef + a1d5dc0 commit 8b2c9ad

File tree

8 files changed

+112
-171
lines changed

8 files changed

+112
-171
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ protected synchronized boolean retainTopic(String topic, boolean isInternal, lon
9494
if (isInternal && !includeInternalTopics)
9595
return false;
9696

97-
return subscription.matchesSubscribedPattern(topic);
97+
return subscription.matchesSubscribedPattern(topic) || subscription.isAssignedFromRe2j(topic);
9898
}
9999
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,20 @@ public synchronized boolean hasAutoAssignedPartitions() {
490490
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE || this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
491491
}
492492

493+
public synchronized boolean isAssignedFromRe2j(String topic) {
494+
if (!hasRe2JPatternSubscription()) {
495+
return false;
496+
}
497+
498+
for (TopicPartition topicPartition : assignment.partitionSet()) {
499+
if (topicPartition.topic().equals(topic)) {
500+
return true;
501+
}
502+
}
503+
504+
return false;
505+
}
506+
493507
public synchronized void position(TopicPartition tp, FetchPosition position) {
494508
assignedState(tp).position(position);
495509
}

core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,33 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
215215
awaitAssignment(consumer, assignment)
216216
}
217217

218+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
219+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
220+
def testRe2JPatternSubscriptionFetch(quorum: String, groupProtocol: String): Unit = {
221+
val topic1 = "topic1" // matches subscribed pattern
222+
createTopic(topic1, 2, brokerCount)
223+
224+
val consumer = createConsumer()
225+
assertEquals(0, consumer.assignment().size)
226+
227+
val pattern = new SubscriptionPattern("topic.*")
228+
consumer.subscribe(pattern)
229+
230+
val assignment = Set(
231+
new TopicPartition(topic, 0),
232+
new TopicPartition(topic, 1),
233+
new TopicPartition(topic1, 0),
234+
new TopicPartition(topic1, 1))
235+
awaitAssignment(consumer, assignment)
236+
237+
val producer = createProducer()
238+
val totalRecords = 10L
239+
val startingTimestamp = System.currentTimeMillis()
240+
val tp = new TopicPartition(topic1, 0)
241+
sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
242+
consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp, tp = tp)
243+
}
244+
218245
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
219246
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
220247
def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = {

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3624,6 +3624,7 @@ class UnifiedLogTest {
36243624
assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, LogStartOffsetIncrementReason.ClientRecordDeletion))
36253625
}
36263626

3627+
@Test
36273628
def testBackgroundDeletionWithIOException(): Unit = {
36283629
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
36293630
val log = createLog(logDir, logConfig)
@@ -3683,6 +3684,7 @@ class UnifiedLogTest {
36833684
assertEquals(None, log.maybeUpdateHighWatermark(101L))
36843685
}
36853686

3687+
@Test
36863688
def testEnableRemoteLogStorageOnCompactedTopics(): Unit = {
36873689
var logConfig = LogTestUtils.createLogConfig()
36883690
var log = createLog(logDir, logConfig)
@@ -3936,6 +3938,7 @@ class UnifiedLogTest {
39363938
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
39373939
}
39383940

3941+
@Test
39393942
def testNextTransactionVerificationGuardNotCleared(): Unit = {
39403943
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
39413944

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ SmokeTestDriver.VerificationResult result() {
104104
// We set 2 timeout condition to fail the test before passing the verification:
105105
// (1) 10 min timeout, (2) 30 tries of polling without getting any data
106106
@ParameterizedTest
107-
@CsvSource({"false, false", "true, false", "true, true"})
107+
@CsvSource({"false, false", "true, false"})
108108
public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
109109
Exit.setExitProcedure((statusCode, message) -> {
110110
throw new AssertionError("Test called exit(). code:" + statusCode + " message:" + message);

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,11 @@ public static boolean stateUpdaterEnabled(final Map<String, Object> configs) {
12931293
public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__";
12941294

12951295
public static boolean processingThreadsEnabled(final Map<String, Object> configs) {
1296+
// note: we did disable testing "processing threads"` in SmokeTestDriverIntegrationTest due to
1297+
// high failure rate, and the feature being incomplete with no active work
1298+
//
1299+
// we should re-enable testing this feature in SmokeTestDriverIntegrationTest
1300+
// once it is complete (or maybe even earlier when we resumg working on it
12961301
return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false);
12971302
}
12981303

0 commit comments

Comments
 (0)