diff --git a/janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml b/janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml
index 596692a387f..cf4f3d5998f 100644
--- a/janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml
+++ b/janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml
@@ -40,6 +40,14 @@
test
true
+
+
+ ${project.groupId}
+ janusgraph-hbase-core
+ ${project.version}
+ test
+
org.apache.mrunit
mrunit
@@ -50,7 +58,7 @@
org.apache.hbase
hbase-server
- ${hbase098.version}
+ ${hbase100.version}
true
test
@@ -58,12 +66,16 @@
org.mortbay.jetty
servlet-api-2.5
+
+ com.lmax
+ disruptor
+
org.apache.hbase
hbase-client
- ${hbase098.version}
+ ${hbase100.version}
true
test
diff --git a/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryInputFormat.java b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryInputFormat.java
index dd15629bf99..efe1e016c37 100644
--- a/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryInputFormat.java
+++ b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryInputFormat.java
@@ -24,8 +24,10 @@
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
@@ -46,7 +48,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);
private final TableInputFormat tableInputFormat = new TableInputFormat();
- private TableRecordReader tableReader;
+ private RecordReader tableReader;
private byte[] inputCFBytes;
private RecordReader> janusgraphRecordReader;
@@ -57,8 +59,7 @@ public List getSplits(final JobContext jobContext) throws IOExceptio
@Override
public RecordReader> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- tableReader =
- (TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
janusgraphRecordReader =
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
return janusgraphRecordReader;
@@ -104,7 +105,7 @@ public void setConf(final Configuration config) {
this.tableInputFormat.setConf(config);
}
- public TableRecordReader getTableReader() {
+ public RecordReader getTableReader() {
return tableReader;
}
diff --git a/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryRecordReader.java b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryRecordReader.java
index 68f6ff44e8d..82bd0120c32 100644
--- a/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryRecordReader.java
+++ b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryRecordReader.java
@@ -15,11 +15,12 @@
package org.janusgraph.hadoop.formats.hbase;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,11 +32,11 @@
public class HBaseBinaryRecordReader extends RecordReader> {
- private TableRecordReader reader;
+ private RecordReader reader;
private final byte[] edgestoreFamilyBytes;
- public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
+ public HBaseBinaryRecordReader(final RecordReader reader, final byte[] edgestoreFamilyBytes) {
this.reader = reader;
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
}
@@ -66,7 +67,7 @@ public void close() throws IOException {
}
@Override
- public float getProgress() {
+ public float getProgress() throws IOException, InterruptedException {
return this.reader.getProgress();
}
diff --git a/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/test/java/org/janusgraph/hadoop/AbstractInputFormatIT.java b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/test/java/org/janusgraph/hadoop/AbstractInputFormatIT.java
new file mode 100644
index 00000000000..17bb97c765c
--- /dev/null
+++ b/janusgraph-hadoop-parent/janusgraph-hadoop-core/src/test/java/org/janusgraph/hadoop/AbstractInputFormatIT.java
@@ -0,0 +1,116 @@
+// Copyright 2017 JanusGraph Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.janusgraph.hadoop;
+
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.JanusGraphVertex;
+import org.janusgraph.diskstorage.configuration.WriteConfiguration;
+import org.janusgraph.example.GraphOfTheGodsFactory;
+import org.janusgraph.graphdb.JanusGraphBaseTest;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractInputFormatIT extends JanusGraphBaseTest {
+
+
+ @Test
+ public void testReadGraphOfTheGods() throws Exception {
+ GraphOfTheGodsFactory.load(graph, null, true);
+ assertEquals(12L, (long) graph.traversal().V().count().next());
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ assertEquals(12L, (long) t.V().count().next());
+ }
+
+ @Test
+ public void testReadWideVertexWithManyProperties() throws Exception {
+ int numProps = 1 << 16;
+
+ long numV = 1;
+ mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
+ mgmt.commit();
+ finishSchema();
+
+ for (int j = 0; j < numV; j++) {
+ Vertex v = graph.addVertex();
+ for (int i = 0; i < numProps; i++) {
+ v.property("p", i);
+ }
+ }
+ graph.tx().commit();
+
+ assertEquals(numV, (long) graph.traversal().V().count().next());
+ Map propertiesOnVertex = graph.traversal().V().valueMap().next();
+ List> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
+ assertEquals(numProps, valuesOnP.size());
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ assertEquals(numV, (long) t.V().count().next());
+ propertiesOnVertex = t.V().valueMap().next();
+ valuesOnP = (List)propertiesOnVertex.values().iterator().next();
+ assertEquals(numProps, valuesOnP.size());
+ }
+
+ @Test
+ public void testReadSelfEdge() throws Exception {
+ GraphOfTheGodsFactory.load(graph, null, true);
+ assertEquals(12L, (long) graph.traversal().V().count().next());
+
+ // Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
+ JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
+ assertNotNull(sky);
+ assertEquals("sky", sky.value("name"));
+ assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
+ assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
+ assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
+ sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
+ assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
+ assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
+ assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
+ graph.tx().commit();
+
+ // Read the new edge using the inputformat
+ Graph g = getGraph();
+ GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
+ Iterator