Skip to content

Commit 91fa677

Browse files
author
John Roesler
committed
Revert "KAFKA-10867: Improved task idling (apache#9840)"
This reverts commit 4d28391. Closes apache#10119 Reviewers: Chia-Ping Tsai <[email protected]>
1 parent f4e475c commit 91fa677

File tree

15 files changed

+177
-517
lines changed

15 files changed

+177
-517
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ public class StreamsConfig extends AbstractConfig {
144144
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
145145

146146
public static final int DUMMY_THREAD_INDEX = 1;
147-
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
148147

149148
/**
150149
* Prefix used to provide default topic configs to be applied when creating internal topics.

streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ private StreamTask createActiveTask(final TaskId taskId,
242242
time,
243243
stateManager,
244244
recordCollector,
245-
context,
246-
logContext
245+
context
247246
);
248247

249248
log.trace("Created task {} with assigned partitions {}", taskId, inputPartitions);

streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java

+5-143
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,16 @@
1717
package org.apache.kafka.streams.processor.internals;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20-
import org.apache.kafka.clients.consumer.ConsumerRecords;
2120
import org.apache.kafka.common.TopicPartition;
2221
import org.apache.kafka.common.metrics.Sensor;
23-
import org.apache.kafka.common.utils.LogContext;
24-
import org.apache.kafka.streams.StreamsConfig;
25-
import org.slf4j.Logger;
2622

2723
import java.util.Collections;
2824
import java.util.Comparator;
29-
import java.util.HashMap;
30-
import java.util.HashSet;
31-
import java.util.Iterator;
3225
import java.util.Map;
3326
import java.util.PriorityQueue;
3427
import java.util.Set;
28+
import java.util.Iterator;
29+
import java.util.HashSet;
3530
import java.util.function.Function;
3631

3732
/**
@@ -58,18 +53,14 @@
5853
*/
5954
public class PartitionGroup {
6055

61-
private final Logger logger;
6256
private final Map<TopicPartition, RecordQueue> partitionQueues;
63-
private final Sensor enforcedProcessingSensor;
64-
private final long maxTaskIdleMs;
6557
private final Sensor recordLatenessSensor;
6658
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
6759

6860
private long streamTime;
6961
private int totalBuffered;
7062
private boolean allBuffered;
71-
private final Map<TopicPartition, Long> fetchedLags = new HashMap<>();
72-
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();
63+
7364

7465
static class RecordInfo {
7566
RecordQueue queue;
@@ -87,144 +78,15 @@ RecordQueue queue() {
8778
}
8879
}
8980

90-
PartitionGroup(final LogContext logContext,
91-
final Map<TopicPartition, RecordQueue> partitionQueues,
92-
final Sensor recordLatenessSensor,
93-
final Sensor enforcedProcessingSensor,
94-
final long maxTaskIdleMs) {
95-
this.logger = logContext.logger(PartitionGroup.class);
81+
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
9682
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
9783
this.partitionQueues = partitionQueues;
98-
this.enforcedProcessingSensor = enforcedProcessingSensor;
99-
this.maxTaskIdleMs = maxTaskIdleMs;
10084
this.recordLatenessSensor = recordLatenessSensor;
10185
totalBuffered = 0;
10286
allBuffered = false;
10387
streamTime = RecordQueue.UNKNOWN;
10488
}
10589

106-
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
107-
final Long lag = metadata.lag();
108-
if (lag != null) {
109-
logger.trace("added fetched lag {}: {}", partition, lag);
110-
fetchedLags.put(partition, lag);
111-
}
112-
}
113-
114-
public boolean readyToProcess(final long wallClockTime) {
115-
if (logger.isTraceEnabled()) {
116-
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
117-
logger.trace(
118-
"buffered/lag {}: {}/{}",
119-
entry.getKey(),
120-
entry.getValue().size(),
121-
fetchedLags.get(entry.getKey())
122-
);
123-
}
124-
}
125-
// Log-level strategy:
126-
// TRACE for messages that don't wait for fetches
127-
// TRACE when we waited for a fetch and decided to wait some more, as configured
128-
// TRACE when we are ready for processing and didn't have to enforce processing
129-
// INFO when we enforce processing, since this has to wait for fetches AND may result in disorder
130-
131-
if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
132-
if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
133-
final Set<TopicPartition> bufferedPartitions = new HashSet<>();
134-
final Set<TopicPartition> emptyPartitions = new HashSet<>();
135-
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
136-
if (entry.getValue().isEmpty()) {
137-
emptyPartitions.add(entry.getKey());
138-
} else {
139-
bufferedPartitions.add(entry.getKey());
140-
}
141-
}
142-
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
143-
"\n\tThere may be out-of-order processing for this task as a result." +
144-
"\n\tBuffered partitions: {}" +
145-
"\n\tNon-buffered partitions: {}",
146-
bufferedPartitions,
147-
emptyPartitions);
148-
}
149-
return true;
150-
}
151-
152-
final Set<TopicPartition> queued = new HashSet<>();
153-
Map<TopicPartition, Long> enforced = null;
154-
155-
for (final Map.Entry<TopicPartition, RecordQueue> entry : partitionQueues.entrySet()) {
156-
final TopicPartition partition = entry.getKey();
157-
final RecordQueue queue = entry.getValue();
158-
159-
final Long nullableFetchedLag = fetchedLags.get(partition);
160-
161-
if (!queue.isEmpty()) {
162-
// this partition is ready for processing
163-
idlePartitionDeadlines.remove(partition);
164-
queued.add(partition);
165-
} else if (nullableFetchedLag == null) {
166-
// must wait to fetch metadata for the partition
167-
idlePartitionDeadlines.remove(partition);
168-
logger.trace("Waiting to fetch data for {}", partition);
169-
return false;
170-
} else if (nullableFetchedLag > 0L) {
171-
// must wait to poll the data we know to be on the broker
172-
idlePartitionDeadlines.remove(partition);
173-
logger.trace(
174-
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
175-
partition,
176-
nullableFetchedLag
177-
);
178-
return false;
179-
} else {
180-
// p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up.
181-
// One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs
182-
// instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero
183-
// lag instead of when we happen to run this method, but realistically it's probably a small difference
184-
// and using wall clock time seems more intuitive for users,
185-
// since the log message will be as of wallClockTime.
186-
idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs);
187-
final long deadline = idlePartitionDeadlines.get(partition);
188-
if (wallClockTime < deadline) {
189-
logger.trace(
190-
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
191-
partition,
192-
wallClockTime,
193-
maxTaskIdleMs,
194-
deadline
195-
);
196-
return false;
197-
} else {
198-
// this partition is ready for processing due to the task idling deadline passing
199-
if (enforced == null) {
200-
enforced = new HashMap<>();
201-
}
202-
enforced.put(partition, deadline);
203-
}
204-
}
205-
}
206-
if (enforced == null) {
207-
logger.trace("All partitions were buffered locally, so this task is ready for processing.");
208-
return true;
209-
} else if (queued.isEmpty()) {
210-
logger.trace("No partitions were buffered locally, so this task is not ready for processing.");
211-
return false;
212-
} else {
213-
enforcedProcessingSensor.record(1.0d, wallClockTime);
214-
logger.info("Continuing to process although some partition timestamps were not buffered locally." +
215-
"\n\tThere may be out-of-order processing for this task as a result." +
216-
"\n\tPartitions with local data: {}." +
217-
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
218-
"\n\tConfigured max.task.idle.ms: {}." +
219-
"\n\tCurrent wall-clock time: {}.",
220-
queued,
221-
enforced,
222-
maxTaskIdleMs,
223-
wallClockTime);
224-
return true;
225-
}
226-
}
227-
22890
// visible for testing
22991
long partitionTimestamp(final TopicPartition partition) {
23092
final RecordQueue queue = partitionQueues.get(partition);
@@ -377,7 +239,7 @@ int numBuffered() {
377239
return totalBuffered;
378240
}
379241

380-
boolean allPartitionsBufferedLocally() {
242+
boolean allPartitionsBuffered() {
381243
return allBuffered;
382244
}
383245

streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.kafka.streams.processor.internals;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20-
import org.apache.kafka.clients.consumer.ConsumerRecords;
2120
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2221
import org.apache.kafka.common.TopicPartition;
2322
import org.apache.kafka.common.metrics.Sensor;
@@ -287,11 +286,6 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
287286
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
288287
}
289288

290-
@Override
291-
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
292-
throw new IllegalStateException("Attempted to update metadata for standby task " + id());
293-
}
294-
295289
InternalProcessorContext processorContext() {
296290
return processorContext;
297291
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

+36-32
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
22-
import org.apache.kafka.clients.consumer.ConsumerRecords;
2322
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2423
import org.apache.kafka.common.KafkaException;
2524
import org.apache.kafka.common.TopicPartition;
@@ -80,6 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
8079
// leaked into this class, which is to checkpoint after committing if EOS is not enabled.
8180
private final boolean eosEnabled;
8281

82+
private final long maxTaskIdleMs;
8383
private final int maxBufferedSize;
8484
private final PartitionGroup partitionGroup;
8585
private final RecordCollector recordCollector;
@@ -96,11 +96,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
9696
private final Sensor processLatencySensor;
9797
private final Sensor punctuateLatencySensor;
9898
private final Sensor bufferedRecordsSensor;
99+
private final Sensor enforcedProcessingSensor;
99100
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
100101
private final InternalProcessorContext processorContext;
101102
private final RecordQueueCreator recordQueueCreator;
102103

103104
private StampedRecord record;
105+
private long idleStartTimeMs;
104106
private boolean commitNeeded = false;
105107
private boolean commitRequested = false;
106108
private boolean hasPendingTxCommit = false;
@@ -116,8 +118,7 @@ public StreamTask(final TaskId id,
116118
final Time time,
117119
final ProcessorStateManager stateMgr,
118120
final RecordCollector recordCollector,
119-
final InternalProcessorContext processorContext,
120-
final LogContext logContext) {
121+
final InternalProcessorContext processorContext) {
121122
super(
122123
id,
123124
topology,
@@ -141,6 +142,12 @@ public StreamTask(final TaskId id,
141142
this.streamsMetrics = streamsMetrics;
142143
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
143144
final String taskId = id.toString();
145+
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
146+
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
147+
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
148+
} else {
149+
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
150+
}
144151
processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
145152
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
146153
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
@@ -163,30 +170,17 @@ public StreamTask(final TaskId id,
163170

164171
streamTimePunctuationQueue = new PunctuationQueue();
165172
systemTimePunctuationQueue = new PunctuationQueue();
173+
maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
166174
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
167175

168176
// initialize the consumed and committed offset cache
169177
consumedOffsets = new HashMap<>();
170178

171-
recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
179+
recordQueueCreator = new RecordQueueCreator(logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
172180

173181
recordInfo = new PartitionGroup.RecordInfo();
174-
175-
final Sensor enforcedProcessingSensor;
176-
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
177-
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, streamsMetrics);
178-
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics, parent);
179-
} else {
180-
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
181-
}
182-
final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
183-
partitionGroup = new PartitionGroup(
184-
logContext,
185-
createPartitionQueues(),
186-
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
187-
enforcedProcessingSensor,
188-
maxTaskIdleMs
189-
);
182+
partitionGroup = new PartitionGroup(createPartitionQueues(),
183+
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics));
190184

191185
stateMgr.registerGlobalStateStores(topology.globalStateStores());
192186
}
@@ -242,6 +236,7 @@ public void completeRestoration() {
242236
initializeMetadata();
243237
initializeTopology();
244238
processorContext.initialize();
239+
idleStartTimeMs = RecordQueue.UNKNOWN;
245240

246241
transitionTo(State.RUNNING);
247242

@@ -632,12 +627,7 @@ record = null;
632627

633628
/**
634629
* An active task is processable if its buffer contains data for all of its input
635-
* source topic partitions, or if it is enforced to be processable.
636-
*
637-
* Note that this method is _NOT_ idempotent, because the internal bookkeeping
638-
* consumes the partition metadata. For example, unit tests may have to invoke
639-
* {@link #addFetchedMetadata(TopicPartition, ConsumerRecords.Metadata)} again
640-
* invoking this method.
630+
* source topic partitions, or if it is enforced to be processable
641631
*/
642632
public boolean isProcessable(final long wallClockTime) {
643633
if (state() == State.CLOSED) {
@@ -655,7 +645,26 @@ public boolean isProcessable(final long wallClockTime) {
655645
return false;
656646
}
657647

658-
return partitionGroup.readyToProcess(wallClockTime);
648+
if (partitionGroup.allPartitionsBuffered()) {
649+
idleStartTimeMs = RecordQueue.UNKNOWN;
650+
return true;
651+
} else if (partitionGroup.numBuffered() > 0) {
652+
if (idleStartTimeMs == RecordQueue.UNKNOWN) {
653+
idleStartTimeMs = wallClockTime;
654+
}
655+
656+
if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
657+
enforcedProcessingSensor.record(1.0d, wallClockTime);
658+
return true;
659+
} else {
660+
return false;
661+
}
662+
} else {
663+
// there's no data in any of the topics; we should reset the enforced
664+
// processing timer
665+
idleStartTimeMs = RecordQueue.UNKNOWN;
666+
return false;
667+
}
659668
}
660669

661670
/**
@@ -922,11 +931,6 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
922931
}
923932
}
924933

925-
@Override
926-
public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) {
927-
partitionGroup.addFetchedMetadata(partition, metadata);
928-
}
929-
930934
/**
931935
* Schedules a punctuation for the processor
932936
*

0 commit comments

Comments
 (0)