Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests and resolve issue running SparkGraphComputer on HBase #81

Merged
merged 1 commit into from
Feb 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
<scope>test</scope>
<optional>true</optional>
</dependency>
<!-- Include janusgraph-hbase-core to resolve Guava StopWatch error in HBase tests.
Can be removed when Guava version is updated in HBase -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>janusgraph-hbase-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
Expand All @@ -50,20 +58,24 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase098.version}</version>
<version>${hbase100.version}</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
Expand All @@ -46,7 +48,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);

private final TableInputFormat tableInputFormat = new TableInputFormat();
private TableRecordReader tableReader;
private RecordReader<ImmutableBytesWritable, Result> tableReader;
private byte[] inputCFBytes;
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;

Expand All @@ -57,8 +59,7 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio

@Override
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
tableReader =
(TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
janusgraphRecordReader =
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
return janusgraphRecordReader;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void setConf(final Configuration config) {
this.tableInputFormat.setConf(config);
}

public TableRecordReader getTableReader() {
public RecordReader<ImmutableBytesWritable, Result> getTableReader() {
return tableReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package org.janusgraph.hadoop.formats.hbase;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -31,11 +32,11 @@

public class HBaseBinaryRecordReader extends RecordReader<StaticBuffer, Iterable<Entry>> {

private TableRecordReader reader;
private RecordReader<ImmutableBytesWritable, Result> reader;

private final byte[] edgestoreFamilyBytes;

public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
public HBaseBinaryRecordReader(final RecordReader<ImmutableBytesWritable, Result> reader, final byte[] edgestoreFamilyBytes) {
this.reader = reader;
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
}
Expand Down Expand Up @@ -66,7 +67,7 @@ public void close() throws IOException {
}

@Override
public float getProgress() {
public float getProgress() throws IOException, InterruptedException {
return this.reader.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.hadoop;

import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
import org.janusgraph.example.GraphOfTheGodsFactory;
import org.janusgraph.graphdb.JanusGraphBaseTest;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public abstract class AbstractInputFormatIT extends JanusGraphBaseTest {

@Test
public void testReadGraphOfTheGods() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() throws Exception {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it enough to only check the number of properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the version that was in CassandraInputFormatIT and was not modified in this PR. I think changing it would be out of scope here but maybe create an issue for it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create an issue.

}

@Test
public void testReadSelfEdge() throws Exception {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = getGraph();
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
}

abstract protected Graph getGraph() throws IOException, ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,102 +14,28 @@

package org.janusgraph.hadoop;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.janusgraph.CassandraStorageSetup;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
import org.janusgraph.example.GraphOfTheGodsFactory;
import org.janusgraph.graphdb.JanusGraphBaseTest;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class CassandraInputFormatIT extends AbstractInputFormatIT {

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class CassandraInputFormatIT extends JanusGraphBaseTest {


@Test
public void testReadGraphOfTheGods() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(12L, (long) t.V().count().next());
}

@Test
public void testReadWideVertexWithManyProperties() {
int numProps = 1 << 16;

long numV = 1;
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
mgmt.commit();
finishSchema();

for (int j = 0; j < numV; j++) {
Vertex v = graph.addVertex();
for (int i = 0; i < numProps; i++) {
v.property("p", i);
}
}
graph.tx().commit();

assertEquals(numV, (long) graph.traversal().V().count().next());
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
assertEquals(numV, (long) t.V().count().next());
propertiesOnVertex = t.V().valueMap().next();
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
assertEquals(numProps, valuesOnP.size());
}

@Test
public void testReadSelfEdge() {
GraphOfTheGodsFactory.load(graph, null, true);
assertEquals(12L, (long) graph.traversal().V().count().next());

// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
assertNotNull(sky);
assertEquals("sky", sky.value("name"));
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
graph.tx().commit();

// Read the new edge using the inputformat
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
assertNotNull(edgeIdIter);
assertTrue(edgeIdIter.hasNext());
Set<Object> edges = Sets.newHashSet(edgeIdIter);
assertEquals(2, edges.size());
protected Graph getGraph() throws ConfigurationException, IOException {
final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/cassandra-read.properties");
Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation"));
baseOutDir.toFile().mkdirs();
String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString();
config.setProperty("gremlin.hadoop.outputLocation", outDir);
return GraphFactory.open(config);
}

@Override
Expand Down
Loading