Skip to content

Commit d7c8f86

Browse files
committed
Incremental Co-operative Rebalancing Support for HDFS Connector (confluentinc#625)
1 parent dae32f2 commit d7c8f86

File tree

3 files changed

+46
-11
lines changed

3 files changed

+46
-11
lines changed

src/main/java/io/confluent/connect/hdfs/DataWriter.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -465,15 +465,12 @@ public void open(Collection<TopicPartition> partitions) {
465465
}
466466

467467
public void close() {
468-
// Close any writers we have. We may get assigned the same partitions and end up duplicating
469-
// some effort since we'll have to reprocess those messages. It may be possible to hold on to
470-
// the TopicPartitionWriter and continue to use the temp file, but this can get significantly
471-
// more complex due to potential failures and network partitions. For example, we may get
472-
// this close, then miss a few generations of group membership, during which
473-
// data may have continued to be processed and we'd have to restart from the recovery stage,
474-
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
475-
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
476-
for (TopicPartitionWriter writer : topicPartitionWriters.values()) {
468+
close(new HashSet<>(topicPartitionWriters.keySet()));
469+
}
470+
471+
public void close(Collection<TopicPartition> partitions) {
472+
for (TopicPartition partition: partitions) {
473+
TopicPartitionWriter writer = topicPartitionWriters.get(partition);
477474
try {
478475
if (writer != null) {
479476
// In some failure modes, the writer might not have been created for all assignments
@@ -482,8 +479,8 @@ public void close() {
482479
} catch (ConnectException e) {
483480
log.warn("Unable to close writer for topic partition {}: ", writer.topicPartition(), e);
484481
}
482+
topicPartitionWriters.remove(partition);
485483
}
486-
topicPartitionWriters.clear();
487484
}
488485

489486
public void stop() {

src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void open(Collection<TopicPartition> partitions) {
166166
public void close(Collection<TopicPartition> partitions) {
167167
log.debug("Closing HDFS Sink Task {}", connectorNameAndTaskId);
168168
if (hdfsWriter != null) {
169-
hdfsWriter.close();
169+
hdfsWriter.close(partitions);
170170
}
171171
}
172172

src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.Collection;
31+
import java.util.Collections;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
@@ -341,6 +342,43 @@ final int record = 12;
341342
}
342343
}
343344

345+
@Test
346+
public void testPartialRevocation() throws Exception {
347+
setUp();
348+
349+
Collection<TopicPartition> initialAssignment = new ArrayList<>();
350+
initialAssignment.add(TOPIC_PARTITION);
351+
initialAssignment.add(TOPIC_PARTITION2);
352+
initialAssignment.add(TOPIC_PARTITION3);
353+
354+
Collection<TopicPartition> revokedPartitions = new ArrayList<>();
355+
revokedPartitions.add(TOPIC_PARTITION3);
356+
357+
String key = "key";
358+
Schema schema = createSchema();
359+
Struct record = createRecord(schema);
360+
Collection<SinkRecord> sinkRecords = Collections.singleton(
361+
new SinkRecord(TOPIC_PARTITION.topic(), TOPIC_PARTITION.partition(),
362+
Schema.STRING_SCHEMA, key, schema, record, 0));
363+
364+
HdfsSinkTask task = new HdfsSinkTask();
365+
task.initialize(context);
366+
task.start(properties);
367+
368+
// Given 3 owned partitions
369+
task.open(initialAssignment);
370+
371+
// When 1 partition revoked (partial revocation)
372+
task.close(revokedPartitions);
373+
374+
try {
375+
// Should continue processing messages from the 2 left partitions (should succeed)
376+
task.put(sinkRecords);
377+
} finally {
378+
task.stop();
379+
}
380+
}
381+
344382
private void createCommittedFiles() throws IOException {
345383
String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
346384
String file1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 0,

0 commit comments

Comments
 (0)