From 979aa3903756b37797d35e15516a74f3d94e9943 Mon Sep 17 00:00:00 2001 From: gauravkumar37 Date: Fri, 25 Sep 2015 00:15:17 +0530 Subject: [PATCH 1/2] Added Parquet output writer --- camus-etl-kafka/pom.xml | 5 ++ .../common/ParquetRecordWriterProvider.java | 71 +++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java diff --git a/camus-etl-kafka/pom.xml b/camus-etl-kafka/pom.xml index eec17c27e..a34b803d0 100644 --- a/camus-etl-kafka/pom.xml +++ b/camus-etl-kafka/pom.xml @@ -28,6 +28,11 @@ org.apache.avro avro-mapred + + com.twitter + parquet-avro + 1.5.0 + org.apache.kafka kafka_2.10 diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java new file mode 100644 index 000000000..38c16c732 --- /dev/null +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java @@ -0,0 +1,71 @@ +package com.linkedin.camus.etl.kafka.common; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import com.linkedin.camus.coders.CamusWrapper; +import com.linkedin.camus.etl.IEtlKey; +import com.linkedin.camus.etl.RecordWriterProvider; +import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat; + +import parquet.avro.AvroParquetWriter; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + +/** + * + * + */ +public class ParquetRecordWriterProvider implements RecordWriterProvider { + public final static String EXT = ".parquet"; + + public ParquetRecordWriterProvider(TaskAttemptContext context) { + } + + @Override + public String getFilenameExtension() { + return EXT; + } + + @Override + public RecordWriter getDataRecordWriter(TaskAttemptContext context, String fileName, + CamusWrapper data, FileOutputCommitter committer) throws IOException, InterruptedException { + + CompressionCodecName compressionCodecName = null; + int blockSize = 256 * 1024 * 1024; + int pageSize = 64 * 1024; + + if (FileOutputFormat.getCompressOutput(context)) { + if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) { + compressionCodecName = CompressionCodecName.SNAPPY; + } else { + compressionCodecName = CompressionCodecName.GZIP; + } + } + + Path path = committer.getWorkPath(); + path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT)); + Schema avroSchema = ((GenericRecord) data.getRecord()).getSchema(); + final ParquetWriter parquetWriter = new AvroParquetWriter(path, avroSchema, compressionCodecName, blockSize, + pageSize); + + return new RecordWriter() { + @Override + public void write(IEtlKey ignore, CamusWrapper data) throws IOException { + parquetWriter.write(data.getRecord()); + } + + @Override + public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { + parquetWriter.close(); + } + }; + } +} From 93314065bdf9dba43ef6c7c52688a45754d0ddaa Mon Sep 17 00:00:00 2001 From: gauravkumar37 Date: Fri, 25 Sep 2015 00:18:56 +0530 Subject: [PATCH 2/2] Added Javadoc --- .../camus/etl/kafka/common/ParquetRecordWriterProvider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java index 38c16c732..4a852a03e 100644 --- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ParquetRecordWriterProvider.java @@ -20,7 +20,9 @@ import parquet.hadoop.metadata.CompressionCodecName; /** - * + * Provides a RecordWriter that uses AvroParquetWriter to write + * Parquet records to HDFS. Compression settings are controlled via ETL_OUTPUT_CODEC + * Supports Snappy & Gzip compression codecs. * */ public class ParquetRecordWriterProvider implements RecordWriterProvider {