Skip to content

Commit 74dda67

Browse files
committed
storage: Fix hadoop compatibility issues. #TASK-7320
1 parent 3349ee6 commit 74dda67

File tree

9 files changed

+42
-7
lines changed

9 files changed

+42
-7
lines changed

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-api/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompatApi.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public static HBaseCompatApi getInstance() {
3636
public abstract List<ServerName> getServerList(Admin admin) throws IOException;
3737

3838
public abstract byte[][] getTableStartKeys(Admin admin, Table table) throws IOException;
39+
40+
public abstract boolean isSnappyAvailable();
3941
}

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.0/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.hadoop.hbase.client.RegionInfo;
88
import org.apache.hadoop.hbase.client.Table;
99
import org.apache.hadoop.hbase.util.Bytes;
10+
import org.apache.hadoop.io.compress.SnappyCodec;
1011
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
1112
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;
1213

@@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
4546
}
4647
return startKeys;
4748
}
49+
50+
@Override
51+
public boolean isSnappyAvailable() {
52+
return SnappyCodec.isNativeCodeLoaded();
53+
}
4854
}

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.2/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.hadoop.hbase.client.RegionInfo;
88
import org.apache.hadoop.hbase.client.Table;
99
import org.apache.hadoop.hbase.util.Bytes;
10+
import org.apache.hadoop.io.compress.SnappyCodec;
1011
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
1112
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;
1213

@@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
4546
}
4647
return startKeys;
4748
}
49+
50+
@Override
51+
public boolean isSnappyAvailable() {
52+
return SnappyCodec.isNativeCodeLoaded();
53+
}
4854
}

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
<properties>
3434
<hbase.version>2.4.17</hbase.version>
35-
<hadoop.version>2.10.0</hadoop.version>
35+
<hadoop.version>3.3.4</hadoop.version>
3636
<phoenix.version>5.1.3</phoenix.version>
3737
<phoenix-thirdparty.version>1.1.0</phoenix-thirdparty.version>
3838
</properties>

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,10 @@ public List<ServerName> getServerList(Admin admin) throws IOException {
3838
public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
3939
return table.getRegionLocator().getStartKeys();
4040
}
41+
42+
@Override
43+
public boolean isSnappyAvailable() {
44+
// [HADOOP-17125] - Using snappy-java in SnappyCodec - 3.3.1, 3.4.0
45+
return true;
46+
}
4147
}

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,9 @@
448448
<ignoredUsedUndeclaredDependency>
449449
org.apache.tephra:tephra-core
450450
</ignoredUsedUndeclaredDependency>
451+
<ignoredUsedUndeclaredDependency>
452+
org.apache.tephra:tephra-core-shaded
453+
</ignoredUsedUndeclaredDependency>
451454
<ignoredUsedUndeclaredDependency>
452455
com.lmax:disruptor
453456
</ignoredUsedUndeclaredDependency>

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.apache.hadoop.io.compress.CompressionCodec;
1010
import org.apache.hadoop.io.compress.DeflateCodec;
1111
import org.apache.hadoop.io.compress.GzipCodec;
12-
import org.apache.hadoop.io.compress.SnappyCodec;
1312
import org.apache.hadoop.mapred.JobContext;
1413
import org.apache.hadoop.mapreduce.*;
1514
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -24,6 +23,7 @@
2423
import org.opencb.biodata.models.variant.avro.GeneCancerAssociation;
2524
import org.opencb.biodata.models.variant.avro.VariantAvro;
2625
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
26+
import org.opencb.opencga.storage.hadoop.HBaseCompat;
2727
import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver;
2828
import org.opencb.opencga.storage.hadoop.variant.mr.VariantFileOutputFormat;
2929
import org.opencb.opencga.storage.hadoop.variant.mr.VariantLocusKey;
@@ -144,7 +144,7 @@ protected void setupJob(Job job) throws IOException {
144144
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
145145
outputFormatClass = LazyOutputFormat.class;
146146
}
147-
if (SnappyCodec.isNativeCodeLoaded()) {
147+
if (HBaseCompat.getInstance().isSnappyAvailable()) {
148148
FileOutputFormat.setCompressOutput(job, true);
149149
// FIXME: SnappyCodec might not be available in client side
150150
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import org.apache.hadoop.io.compress.CompressionCodec;
66
import org.apache.hadoop.io.compress.DeflateCodec;
77
import org.apache.hadoop.io.compress.GzipCodec;
8-
import org.apache.hadoop.io.compress.SnappyCodec;
98
import org.apache.hadoop.mapred.JobContext;
109
import org.apache.hadoop.mapreduce.Job;
1110
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -17,6 +16,7 @@
1716
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
1817
import org.apache.hadoop.util.Tool;
1918
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
19+
import org.opencb.opencga.storage.hadoop.HBaseCompat;
2020
import org.opencb.opencga.storage.hadoop.utils.ValueOnlyTextOutputFormat;
2121
import org.opencb.opencga.storage.hadoop.variant.io.VariantDriver;
2222
import org.slf4j.Logger;
@@ -164,7 +164,7 @@ protected void setupJob(Job job) throws IOException {
164164
outputFormatClass = LazyOutputFormat.class;
165165

166166
job.setOutputFormatClass(ValueOnlyTextOutputFormat.class);
167-
if (SnappyCodec.isNativeCodeLoaded()) {
167+
if (HBaseCompat.getInstance().isSnappyAvailable()) {
168168
FileOutputFormat.setCompressOutput(job, true);
169169
// FIXME: SnappyCodec might not be available in client side
170170
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/VariantFileOutputFormat.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.hadoop.fs.FSDataOutputStream;
2121
import org.apache.hadoop.fs.FileSystem;
2222
import org.apache.hadoop.fs.Path;
23-
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
2423
import org.apache.hadoop.io.NullWritable;
2524
import org.apache.hadoop.io.compress.CompressionCodec;
2625
import org.apache.hadoop.io.compress.GzipCodec;
@@ -54,6 +53,15 @@
5453
*/
5554
public class VariantFileOutputFormat extends FileOutputFormat<Variant, NullWritable> {
5655

56+
private static Class<?> abfsOutputStreamClass;
57+
58+
static {
59+
try {
60+
abfsOutputStreamClass = Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream");
61+
} catch (ClassNotFoundException e) {
62+
abfsOutputStreamClass = null;
63+
}
64+
}
5765

5866
public static final String VARIANT_OUTPUT_FORMAT = "variant.output_format";
5967

@@ -74,7 +82,7 @@ public RecordWriter<Variant, NullWritable> getRecordWriter(TaskAttemptContext jo
7482
FileSystem fs = file.getFileSystem(conf);
7583
FSDataOutputStream fsOs = fs.create(file, false);
7684
OutputStream out;
77-
if (fsOs.getWrappedStream() instanceof AbfsOutputStream) {
85+
if (isAbfsOutputStream(fsOs)) {
7886
// Disable flush on ABFS. See HADOOP-16548
7987
out = new FilterOutputStream(fsOs) {
8088
@Override
@@ -92,6 +100,10 @@ public void flush() throws IOException {
92100
return new VariantRecordWriter(configureWriter(job, countingOut), countingOut);
93101
}
94102

103+
private static boolean isAbfsOutputStream(FSDataOutputStream fsOs) {
104+
return abfsOutputStreamClass != null && abfsOutputStreamClass.isInstance(fsOs.getWrappedStream());
105+
}
106+
95107
private DataWriter<Variant> configureWriter(final TaskAttemptContext job, OutputStream fileOut) throws IOException {
96108
// job.getCounter(VcfDataWriter.class.getName(), "failed").increment(0); // init
97109
final Configuration conf = job.getConfiguration();

0 commit comments

Comments
 (0)