@@ -167,7 +167,7 @@ public class StreamTaskTest {
167
167
168
168
private final MockSourceNode <Integer , Integer > source1 = new MockSourceNode <>(intDeserializer , intDeserializer );
169
169
private final MockSourceNode <Integer , Integer > source2 = new MockSourceNode <>(intDeserializer , intDeserializer );
170
- private final MockSourceNode <Integer , Integer > source3 = new MockSourceNode <>(intDeserializer , intDeserializer ) {
170
+ private final MockSourceNode <Integer , Integer > source3 = new MockSourceNode <Integer , Integer >(intDeserializer , intDeserializer ) {
171
171
@ Override
172
172
public void process (final Record <Integer , Integer > record ) {
173
173
throw new RuntimeException ("KABOOM!" );
@@ -178,7 +178,7 @@ public void close() {
178
178
throw new RuntimeException ("KABOOM!" );
179
179
}
180
180
};
181
- private final MockSourceNode <Integer , Integer > timeoutSource = new MockSourceNode <>(intDeserializer , intDeserializer ) {
181
+ private final MockSourceNode <Integer , Integer > timeoutSource = new MockSourceNode <Integer , Integer >(intDeserializer , intDeserializer ) {
182
182
@ Override
183
183
public void process (final Record <Integer , Integer > record ) {
184
184
throw new TimeoutException ("Kaboom!" );
@@ -442,7 +442,7 @@ public void shouldAutoOffsetResetIfNoCommittedOffsetFound() {
442
442
task .addPartitionsForOffsetReset (Collections .singleton (partition1 ));
443
443
444
444
final AtomicReference <AssertionError > shouldNotSeek = new AtomicReference <>();
445
- try (final MockConsumer <byte [], byte []> consumer = new MockConsumer <>(OffsetResetStrategy .EARLIEST ) {
445
+ try (final MockConsumer <byte [], byte []> consumer = new MockConsumer <byte [], byte [] >(OffsetResetStrategy .EARLIEST ) {
446
446
@ Override
447
447
public void seek (final TopicPartition partition , final long offset ) {
448
448
final AssertionError error = shouldNotSeek .get ();
@@ -2203,6 +2203,7 @@ public void shouldCheckpointOffsetsOnPostCommit() {
2203
2203
when (stateManager .taskId ()).thenReturn (taskId );
2204
2204
when (stateManager .taskType ()).thenReturn (TaskType .ACTIVE );
2205
2205
final long offset = 543L ;
2206
+ final long consumedOffset = 345L ;
2206
2207
2207
2208
when (recordCollector .offsets ()).thenReturn (singletonMap (changelogPartition , offset ));
2208
2209
when (stateManager .changelogOffsets ())
@@ -2600,7 +2601,6 @@ public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() {
2600
2601
task .resumePollingForPartitionsWithAvailableSpace ();
2601
2602
consumer .poll (Duration .ZERO );
2602
2603
task .addRecords (partition1 , records );
2603
- task .updateNextOffsets (partition1 , new OffsetAndMetadata (offset + 1 , Optional .empty (), "" ));
2604
2604
task .updateLags ();
2605
2605
2606
2606
assertTrue (task .process (offset ));
@@ -2632,7 +2632,6 @@ public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() {
2632
2632
task .resumePollingForPartitionsWithAvailableSpace ();
2633
2633
consumer .poll (Duration .ZERO );
2634
2634
task .addRecords (partition1 , records );
2635
- task .updateNextOffsets (partition1 , new OffsetAndMetadata (offset + 1 , Optional .empty (), "" ));
2636
2635
task .updateLags ();
2637
2636
2638
2637
assertTrue (task .process (offset ));
@@ -2661,8 +2660,6 @@ public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() {
2661
2660
task .resumePollingForPartitionsWithAvailableSpace ();
2662
2661
consumer .poll (Duration .ZERO );
2663
2662
task .addRecords (partition1 , records );
2664
- task .updateNextOffsets (partition1 , new OffsetAndMetadata (offset + 1 , Optional .empty (), "" ));
2665
-
2666
2663
task .updateLags ();
2667
2664
2668
2665
assertTrue (task .process (offset ));
0 commit comments