diff --git a/src/main/java/io/confluent/connect/hdfs/FileSizeAwareRecordWriter.java b/src/main/java/io/confluent/connect/hdfs/FileSizeAwareRecordWriter.java new file mode 100644 index 000000000..ff71df4ec --- /dev/null +++ b/src/main/java/io/confluent/connect/hdfs/FileSizeAwareRecordWriter.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.hdfs; + +import io.confluent.connect.storage.format.RecordWriter; + +public interface FileSizeAwareRecordWriter extends RecordWriter { + long getFileSize(); +} diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java index 88d2dc5fb..b655d64bf 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java @@ -157,6 +157,12 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig { private static final String KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY = "Kerberos Ticket Renew " + "Period (ms)"; + public static final String FLUSH_FILE_SIZE_CONFIG = "flush.file.size"; + private static final long FLUSH_FILE_SIZE_DEFAULT = 0; + private static final String FLUSH_FILE_SIZE_DOC = "Bytes written to a single file before " + + "invoking commits for all files in the current partition."; + private static final String FLUSH_FILE_SIZE_DISPLAY = "Flush File Size"; + private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); private static final Pattern INVALID_SUB_PATTERN = Pattern.compile("\\$\\{.*}"); @@ -366,6 +372,18 @@ public static ConfigDef newConfigDef() { TOPIC_CAPTURE_GROUPS_REGEX_DISPLAY ); + configDef.define( + FLUSH_FILE_SIZE_CONFIG, + Type.LONG, + FLUSH_FILE_SIZE_DEFAULT, + Importance.MEDIUM, + FLUSH_FILE_SIZE_DOC, + "Connector", + 1, + Width.LONG, + FLUSH_FILE_SIZE_DISPLAY + ); + return configDef; } @@ -374,6 +392,7 @@ public static ConfigDef newConfigDef() { private final HiveConfig hiveConfig; private final PartitionerConfig partitionerConfig; private final Pattern topicRegexCaptureGroup; + private final long flushFileSize; private final Map propertyToConfig = new HashMap<>(); private final Set allConfigs = new HashSet<>(); private Configuration hadoopConfig; @@ -400,6 +419,7 @@ protected HdfsSinkConnectorConfig(ConfigDef configDef, Map props addToGlobal(commonConfig); addToGlobal(this); this.url = extractUrl(); + this.flushFileSize = Long.parseLong(props.getOrDefault(FLUSH_FILE_SIZE_CONFIG, "0")); try { String topicRegex = getString(TOPIC_CAPTURE_GROUPS_REGEX_CONFIG); this.topicRegexCaptureGroup = topicRegex != null ? Pattern.compile(topicRegex) : null; @@ -415,6 +435,7 @@ protected HdfsSinkConnectorConfig(ConfigDef configDef, Map props validateDirsAndRegex(); validateTimezone(); + validateFlushSizes(); } /** @@ -435,6 +456,17 @@ private void validateTimezone() { } } + private void validateFlushSizes() { + if (getInt(FLUSH_SIZE_CONFIG) <= 0 && flushFileSize <= 0) { + String message = String.format( + "%s and %s", + HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG, + HdfsSinkConnectorConfig.FLUSH_FILE_SIZE_CONFIG + ); + throw new ConfigException(message, 0, "At least one variable must be greater than 0"); + } + } + public static Map addDefaults(Map props) { ConcurrentMap propsCopy = new ConcurrentHashMap<>(props); propsCopy.putIfAbsent(STORAGE_CLASS_CONFIG, HdfsStorage.class.getName()); @@ -724,6 +756,10 @@ private void validateReplacements(String config) { } } + public long getFlushFileSize() { + return flushFileSize; + } + private static class BooleanParentRecommender implements ConfigDef.Recommender { protected String parentConfigName; diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index fbce0edf6..3e010d92e 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -18,6 +18,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.IllegalWorkerStateException; @@ -84,6 +85,7 @@ public class TopicPartitionWriter { private final SinkTaskContext context; private int recordCounter; private final int flushSize; + private final long flushFileSize; private final long rotateIntervalMs; private Long lastRotate; private final long rotateScheduleIntervalMs; @@ -193,6 +195,7 @@ public TopicPartitionWriter( topicsDir = config.getTopicsDirFromTopic(tp.topic()); flushSize = config.getInt(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG); + flushFileSize = config.getFlushFileSize(); rotateIntervalMs = config.getLong(HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG); rotateScheduleIntervalMs = config.getLong(HdfsSinkConnectorConfig .ROTATE_SCHEDULE_INTERVAL_MS_CONFIG); @@ -568,12 +571,28 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long lastRotate = lastRotate == null ? currentTimestamp : lastRotate; } + Long fileSize = null; + if (currentRecord != null && flushFileSize > 0) { + io.confluent.connect.storage.format.RecordWriter writer = getWriter( + currentRecord, + partitioner.encodePartition(currentRecord) + ); + if (!(writer instanceof FileSizeAwareRecordWriter)) { + throw new ConfigException("The Format's provided RecordWriterProvider does not support " + + "FileSizeAwareRecordWriter and cannot be used with flush.file.size > 0."); + } + fileSize = ((FileSizeAwareRecordWriter) writer).getFileSize(); + } + boolean periodicRotation = rotateIntervalMs > 0 && currentTimestamp != null && lastRotate != null && currentTimestamp - lastRotate >= rotateIntervalMs; boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate; - boolean messageSizeRotation = recordCounter >= flushSize; + boolean messageSizeRotation = flushSize > 0 && recordCounter >= flushSize; + boolean fileSizeRotation = flushFileSize > 0 + && fileSize != null + && fileSize >= flushFileSize; log.trace( "Should apply periodic time-based rotation (rotateIntervalMs: '{}', lastRotate: " @@ -600,7 +619,14 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long messageSizeRotation ); - return periodicRotation || scheduledRotation || messageSizeRotation; + log.trace( + "Should apply file size-based rotation (file size {} >= flush file size {})? {}", + fileSize, + flushFileSize, + fileSizeRotation + ); + + return periodicRotation || scheduledRotation || messageSizeRotation || fileSizeRotation; } /** diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java index f64940d31..89bf68dea 100644 --- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java @@ -15,12 +15,12 @@ package io.confluent.connect.hdfs.avro; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; import io.confluent.connect.hdfs.storage.HdfsStorage; -import io.confluent.connect.storage.format.RecordWriter; -import java.io.OutputStream; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; @@ -52,9 +52,12 @@ public String getExtension() { } @Override - public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { - return new RecordWriter() { - final DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>()); + public FileSizeAwareRecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { + return new FileSizeAwareRecordWriter() { + private long fileSize; + final TransparentDataFileWriter writer = new TransparentDataFileWriter<>( + new DataFileWriter<>(new GenericDatumWriter<>()) + ); Schema schema; @Override @@ -63,7 +66,7 @@ public void write(SinkRecord record) { schema = record.valueSchema(); try { log.info("Opening record writer for: {}", filename); - final OutputStream out = storage.create(filename, true); + final FSDataOutputStream out = storage.create(filename, true); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema); writer.setCodec(CodecFactory.fromString(conf.getAvroCodec())); writer.create(avroSchema, out); @@ -81,6 +84,7 @@ public void write(SinkRecord record) { } else { writer.append(value); } + fileSize = writer.getInnerFileStream().getPos(); } catch (IOException e) { throw new DataException(e); } @@ -90,6 +94,9 @@ public void write(SinkRecord record) { public void close() { try { writer.close(); + if (writer.getInnerFileStream() != null) { + fileSize = writer.getInnerFileStream().getPos(); + } } catch (IOException e) { throw new DataException(e); } @@ -97,6 +104,11 @@ public void close() { @Override public void commit() {} + + @Override + public long getFileSize() { + return fileSize; + } }; } } diff --git a/src/main/java/io/confluent/connect/hdfs/avro/TransparentDataFileWriter.java b/src/main/java/io/confluent/connect/hdfs/avro/TransparentDataFileWriter.java new file mode 100644 index 000000000..89121c855 --- /dev/null +++ b/src/main/java/io/confluent/connect/hdfs/avro/TransparentDataFileWriter.java @@ -0,0 +1,141 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.hdfs.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableInput; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.FSDataOutputStream; + +import java.io.Closeable; +import java.io.File; +import java.io.Flushable; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A wrapper for `DataFileWriter` which exposes the inner file stream. This is helpful for + * monitoring the file size of the underlying file. + */ +public class TransparentDataFileWriter implements Closeable, Flushable { + private final DataFileWriter dataFileWriter; + + private FSDataOutputStream innerFileStream; + + public TransparentDataFileWriter(DataFileWriter dataFileWriter) { + this.dataFileWriter = dataFileWriter; + } + + public DataFileWriter create(Schema schema, File file) throws IOException { + throw new NotImplementedException(); + } + + public DataFileWriter create(Schema schema, FSDataOutputStream outs) throws IOException { + this.innerFileStream = outs; + return dataFileWriter.create(schema, outs); + } + + public DataFileWriter create( + Schema schema, + FSDataOutputStream outs, + byte[] sync + ) throws IOException { + this.innerFileStream = outs; + return dataFileWriter.create(schema, outs, sync); + } + + public DataFileWriter create( + Schema schema, + OutputStream outs, + byte[] sync + ) throws IOException { + return dataFileWriter.create(schema, outs, sync); + } + + public DataFileWriter setCodec(CodecFactory c) { + return dataFileWriter.setCodec(c); + } + + public DataFileWriter setSyncInterval(int syncInterval) { + return dataFileWriter.setSyncInterval(syncInterval); + } + + public void setFlushOnEveryBlock(boolean flushOnEveryBlock) { + dataFileWriter.setFlushOnEveryBlock(flushOnEveryBlock); + } + + public boolean isFlushOnEveryBlock() { + return dataFileWriter.isFlushOnEveryBlock(); + } + + public DataFileWriter appendTo(File file) throws IOException { + return dataFileWriter.appendTo(file); + } + + public DataFileWriter appendTo(SeekableInput in, OutputStream out) throws IOException { + return dataFileWriter.appendTo(in, out); + } + + public DataFileWriter setMeta(String key, byte[] value) { + return dataFileWriter.setMeta(key, value); + } + + public DataFileWriter setMeta(String key, String value) { + return dataFileWriter.setMeta(key, value); + } + + public DataFileWriter setMeta(String key, long value) { + return dataFileWriter.setMeta(key, value); + } + + public static boolean isReservedMeta(String key) { + return DataFileWriter.isReservedMeta(key); + } + + public void append(D datum) throws IOException { + dataFileWriter.append(datum); + } + + public void appendEncoded(ByteBuffer datum) throws IOException { + dataFileWriter.appendEncoded(datum); + } + + public void appendAllFrom(DataFileStream otherFile, boolean recompress) throws IOException { + dataFileWriter.appendAllFrom(otherFile, recompress); + } + + public long sync() throws IOException { + return dataFileWriter.sync(); + } + + @Override + public void flush() throws IOException { + dataFileWriter.flush(); + } + + @Override + public void close() throws IOException { + dataFileWriter.close(); + } + + public FSDataOutputStream getInnerFileStream() { + return innerFileStream; + } +} diff --git a/src/main/java/io/confluent/connect/hdfs/json/JsonRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/json/JsonRecordWriterProvider.java index 2c1c64d98..9f1e24f54 100644 --- a/src/main/java/io/confluent/connect/hdfs/json/JsonRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/json/JsonRecordWriterProvider.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; @@ -30,7 +32,6 @@ import io.confluent.connect.hdfs.HdfsSinkConnectorConfig; import io.confluent.connect.hdfs.storage.HdfsStorage; -import io.confluent.connect.storage.format.RecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; /** @@ -65,12 +66,14 @@ public String getExtension() { } @Override - public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { + public FileSizeAwareRecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { try { - return new RecordWriter() { - final OutputStream out = storage.create(filename, true); + return new FileSizeAwareRecordWriter() { + + private long fileSize; + final FSDataOutputStream out = storage.create(filename, true); final JsonGenerator writer = mapper.getFactory() - .createGenerator(out) + .createGenerator((OutputStream) out) .setRootValueSeparator(null); @Override @@ -89,6 +92,7 @@ public void write(SinkRecord record) { writer.writeObject(value); writer.writeRaw(LINE_SEPARATOR); } + fileSize = out.getPos(); } catch (IOException e) { throw new ConnectException(e); } @@ -101,10 +105,16 @@ public void commit() {} public void close() { try { writer.close(); + fileSize = out.getPos(); } catch (IOException e) { throw new ConnectException(e); } } + + @Override + public long getFileSize() { + return fileSize; + } }; } catch (IOException e) { throw new ConnectException(e); diff --git a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java index 53b612d22..eed1d9542 100644 --- a/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/orc/OrcRecordWriterProvider.java @@ -16,7 +16,7 @@ package io.confluent.connect.hdfs.orc; import io.confluent.connect.hdfs.HdfsSinkConnectorConfig; -import io.confluent.connect.storage.format.RecordWriter; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; import io.confluent.connect.storage.hive.HiveSchemaConverter; import org.apache.hadoop.fs.Path; @@ -45,10 +45,11 @@ public String getExtension() { } @Override - public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { + public FileSizeAwareRecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { Path path = new Path(filename); - return new RecordWriter() { + return new FileSizeAwareRecordWriter() { + private long fileSize; Writer writer; TypeInfo typeInfo; Schema schema; @@ -98,6 +99,7 @@ public void preFooterWrite(OrcFile.WriterContext writerContext) { "Top level type must be STRUCT but was " + schema.type().getName() ); } + fileSize = writer.getRawDataSize(); } catch (IOException e) { throw new ConnectException("Failed to write record: ", e); } @@ -108,6 +110,7 @@ public void close() { try { if (writer != null) { writer.close(); + fileSize = writer.getRawDataSize(); } } catch (IOException e) { throw new ConnectException("Failed to close ORC writer:", e); @@ -116,6 +119,11 @@ public void close() { @Override public void commit() { } + + @Override + public long getFileSize() { + return fileSize; + } }; } } diff --git a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java index 9ea00ad2e..b3f36bfd5 100644 --- a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java @@ -15,7 +15,7 @@ package io.confluent.connect.hdfs.parquet; -import io.confluent.connect.storage.format.RecordWriter; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Schema; @@ -49,8 +49,9 @@ public String getExtension() { } @Override - public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { - return new RecordWriter() { + public FileSizeAwareRecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) { + return new FileSizeAwareRecordWriter() { + private long fileSize; final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; final int blockSize = 256 * 1024 * 1024; final int pageSize = 64 * 1024; @@ -97,6 +98,7 @@ public void write(SinkRecord record) { Object value = avroData.fromConnectData(record.valueSchema(), record.value()); try { writer.write((GenericRecord) value); + fileSize = writer.getDataSize(); } catch (IOException e) { throw new ConnectException(e); } @@ -107,6 +109,8 @@ public void close() { if (writer != null) { try { writer.close(); + // ParquetWriter keeps track of buffer and the actual written size, + // so we do not need to update fileSize } catch (IOException e) { throw new ConnectException(e); } @@ -115,6 +119,11 @@ public void close() { @Override public void commit() {} + + @Override + public long getFileSize() { + return fileSize; + } }; } } diff --git a/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java b/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java index eda365734..15ca6aaaa 100644 --- a/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java +++ b/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java @@ -19,6 +19,7 @@ import org.apache.avro.mapred.FsInput; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.kafka.common.TopicPartition; @@ -91,7 +92,7 @@ public boolean create(String filename) { } } - public OutputStream create(String filename, boolean overwrite) { + public FSDataOutputStream create(String filename, boolean overwrite) { try { return fs.create(new Path(filename), overwrite); } catch (IOException e) { diff --git a/src/main/java/io/confluent/connect/hdfs/string/StringRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/string/StringRecordWriterProvider.java index 266fd83b8..c05d864de 100644 --- a/src/main/java/io/confluent/connect/hdfs/string/StringRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/string/StringRecordWriterProvider.java @@ -15,6 +15,8 @@ package io.confluent.connect.hdfs.string; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -22,13 +24,11 @@ import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.Charset; import io.confluent.connect.hdfs.HdfsSinkConnectorConfig; import io.confluent.connect.hdfs.storage.HdfsStorage; -import io.confluent.connect.storage.format.RecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; /** @@ -38,7 +38,7 @@ public class StringRecordWriterProvider implements RecordWriterProvider sinkRecords = new LinkedList<>(); + + final List partitions = Arrays.asList(0, 1); + final int filesPerPartition = 5; + for (int partition : partitions) { + int offset = 0; + // 5 files that are just above the maxFileSize + for (int i = 0; i < filesPerPartition; i++) { + for (int j = 0; j < StringRecordWriterProvider.WRITER_BUFFER_SIZE / 2; j++) { + // 2 bytes per record ('i' and '\n') + sinkRecords.add(new SinkRecord( + TOPIC, + partition, + Schema.STRING_SCHEMA, + "key", + null, + String.valueOf(j % 10), + offset++, + null, + TimestampType.CREATE_TIME + )); + } + } + } + + for (SinkRecord record : sinkRecords) { + topicPartitionWriter.buffer(record); + } + + topicPartitionWriter.write(); + topicPartitionWriter.close(); + + // we expect 5 files per partition, of which there are 2 + StringDataFileReader stringDataFileReader = new StringDataFileReader(); + long writtenRecords = 0; + for (int partition : partitions) { + List files = storage.list("/test/test-topic/test-topic/partition=" + partition); + assertEquals(filesPerPartition, files.size()); + + for (FileStatus info : files) { + Collection result = stringDataFileReader.readData(connectorConfig.getHadoopConfiguration(), info.getPath()); + writtenRecords += result.size(); + } + } + + assertEquals(sinkRecords.size(), writtenRecords); + } + @Test public void testWriteRecordFieldPartitioner() throws Exception { setUp(); diff --git a/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriter.java b/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriter.java index a69c71cb5..117c44d48 100644 --- a/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriter.java +++ b/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriter.java @@ -15,15 +15,14 @@ package io.confluent.connect.hdfs.utils; +import io.confluent.connect.hdfs.FileSizeAwareRecordWriter; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import java.util.List; import java.util.Map; -import io.confluent.connect.hdfs.RecordWriter; - -public class MemoryRecordWriter implements io.confluent.connect.storage.format.RecordWriter { +public class MemoryRecordWriter implements FileSizeAwareRecordWriter { private String filename; private static final Map> data = Data.getData(); private Failure failure = Failure.noFailure; @@ -45,7 +44,6 @@ public void write(SinkRecord record) { throw new ConnectException("write failed."); } data.get(filename).add(record); - } @Override @@ -62,4 +60,9 @@ public void close() { public void setFailure(Failure failure) { this.failure = failure; } + + @Override + public long getFileSize() { + return 0; + } }