-
Notifications
You must be signed in to change notification settings - Fork 58
Titan Format
-
InputFormat:
com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraInputFormat
-
InputFormat:
com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
-
OutputFormat:
com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
-
OutputFormat:
com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseOutputFormat
Titan is a distributed graph database developed by Aurelius and provided under the liberal Apache 2 license. Titan is backend agnostic and is currently deployed with support for Apache Cassandra and Apache HBase (see The Benefits of Titan and Storage Backend Overview).
An InputFormat
specifies how to turn a data source into a stream of Hadoop <KEY,VALUE>
pairs (see blog post). For Faunus, this means turning the source data into a stream of <NullWritable, FaunusVertex>
pairs. The following TitanXXXInputFormat
classes stream Titan encoded data contained within Cassandra and HBase into Faunus/Hadoop.
In order to read graph data from Titan/Cassandra, a graph needs to exist. For the sake of an example, The Graph of the Gods dataset deployed with Titan can be loaded using Gremlin (see diagram at Getting Started).
gremlin> g = TitanFactory.open('bin/cassandra.local')
==>titangraph[cassandra:127.0.0.1]
gremlin> g.loadGraphML('data/graph-of-the-gods.xml')
==>null
gremlin> g.stopTransaction(SUCCESS)
In Faunus, a bin/titan-cassandra-input.properties
file is provided with the following properties which tell Faunus the location and features of the Titan/Cassandra cluster.
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraInputFormat
faunus.graph.input.titan.storage.backend=cassandra
faunus.graph.input.titan.storage.hostname=localhost
faunus.graph.input.titan.storage.port=9160
faunus.graph.input.titan.storage.keyspace=titan
cassandra.input.partitioner.class=org.apache.cassandra.dht.RandomPartitioner
# cassandra.input.split.size=512
gremlin> g = FaunusFactory.open('bin/titan-cassandra-input.properties')
==>faunusgraph[titancassandrainputformat]
gremlin> g.V.count()
13/01/04 12:53:24 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 12:53:24 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Reduce]
...
==>12
NOTE: When using Titan/Cassandra as a data source, and if there are vertices with a large number of edges (i.e. a very wide row in Cassandra), an inoculous exception may occur warning that the thrift frame size has been exceeded. While the cassandra.yaml
can be updated and the following job properties added cassandra.thrift.framed.size_mb
/cassandra.thrift.message.max_size_mb
, typically, the easiest way to solve this is to add the following property to the FaunusGraph
being worked with: cassandra.input.split.size=512
(see bin/titan-cassandra-input.properties
). The value 512
is how many kilobytes to make the input split size and this value can be adjusted higher or lower to ensure performant, non-excepting behavior.
The Graph of the Gods dataset deployed with Titan can be loaded into Titan/HBase using Gremlin (see diagram at Getting Started).
gremlin> g = TitanFactory.open('bin/hbase.local')
==>titangraph[hbase:127.0.0.1]
gremlin> g.loadGraphML('data/graph-of-the-gods.xml')
==>null
gremlin> g.stopTransaction(SUCCESS)
In Faunus, a bin/titan-hbase-input.properties
file is provided with the following properties. This creates a FaunusGraph
that is fed from Titan/HBase. Note, for multi-machines environments, the titan.graph.input.storage.hostname
should use the cluster-internal IP address of the machine with Zookeeper even if that machine is in fact localhost
.
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
faunus.graph.input.titan.storage.backend=hbase
faunus.graph.input.titan.storage.hostname=localhost
faunus.graph.input.titan.storage.port=2181
faunus.graph.input.titan.storage.tablename=titan
# hbase.mapreduce.scan.cachedrows=1000
gremlin> g = FaunusFactory.open('bin/titan-hbase-input.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g.V.count()
13/01/04 15:40:56 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:40:56 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Reduce]
...
==>12
Please follow the links below for more information on streaming data out of HBase.
Faunus can be used to bulk load data into Titan. Thus, given a stream of <NullWritable, FaunusVertex>
pairs, with a TitanXXXOutputFormat
the stream is faithfully written to Titan. For all the examples to follow, it is assumed that data/graph-of-the-gods.json
is in HDFS. Finally, note that is is typically a good idea to have the graph (keyspace/table) already initialized (e.g. g = TitanFactory.open(...)
) before doing bulk writing as the creation process takes time and a heavy write load during the graph creation process can yield exceptions.
WARNING: During a map or reduce task, numerous transactions are being committed. If a task fails, then Hadoop will restart that task at the beginning of its split. Given that transactions have already been committed for that data prior to the failure, inserting repeated data is likely. Until a successful task split-to-transaction is developed, any failed task during data ingestion into Titan can not be trusted to be consistent.
faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
faunus.graph.output.titan.storage.backend=cassandra
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=9160
faunus.graph.output.titan.storage.keyspace=titan
faunus.graph.output.titan.storage.batch-loading=true
# faunus.graph.output.titan.ids.block-size=100000
faunus.graph.output.titan.infer-schema=true
faunus.graph.output.blueprints.tx-commit=5000
Here are some notes for the above properties.
-
storage.batch-loading
: By setting this to true, certain checks in Titan are circumvented which speeds up the writing process. -
ids.block-size
: When this value is small and the clients are writing lots of data, the clients communicates with Titan repeatedly to get new ids and this can cause exceptions to happen as the id system stalls trying to serve all the clients. -
infer-schema
: When a new edge label or property key is provided to Titan, Titan updates its schema metadata. By inferring the schema prior to writing, exceptions can be circumvented. -
tx-commit
: It is possible to determine how many vertices/edges should be written before committing a transaction. It is important to batch so that every write it not a commit.
gremlin> g = FaunusFactory.open('bin/titan-cassandra-output.properties')
==>faunusgraph[graphsoninputformat]
gremlin> g.V.sideEffect('{it.roman = true}')
13/01/04 15:44:42 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:44:42 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.sideeffect.SideEffectMap.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Reduce]
...
In the above job, the Graph of the Gods GraphSON file is streamed from HDFS and each vertex has a new property added (roman=true
). The output graph is pushed into Titan/Cassandra. Via the Titan/Gremlin console, the graph is viewable.
titan$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(_)-oOOo-----
gremlin> g = TitanFactory.open('bin/cassandra.local')
==>titangraph[cassandrathrift:127.0.0.1]
gremlin> g.v(4).map
==>{name=saturn, type=titan, roman=true}
gremlin>
faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseOutputFormat
faunus.graph.output.titan.storage.backend=hbase
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=2181
faunus.graph.output.titan.storage.tablename=titan
faunus.graph.output.titan.storage.batch-loading=true
# titan.graph.output.ids.block-size=100000
faunus.graph.output.titan.infer-schema=true
faunus.graph.output.blueprints.tx-commit=5000
NOTE: Please see the TitanCassandraOutputFormat
section for information the meaning of these properties.
The properties above are used to construct a FaunusGraph
. The Gremlin traversal’s resultant graph is then written to Titan/HBase and the output process is complete.
gremlin> g = FaunusFactory.open('bin/titan-hbase-output.properties')
==>faunusgraph[graphsoninputformat]
gremlin> g.V.sideEffect('{it.roman = true}')
13/01/04 15:48:32 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:48:32 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.sideeffect.SideEffectMap.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Reduce]
...