Powers of Ten – Part II

“‘Curiouser and curiouser!’ cried Alice (she was so much surprised, that for the moment she quite forgot how to speak good English); ‘now I’m opening out like the largest telescope that ever was!”
    — Lewis CarrollAlice’s Adventures in Wonderland

Alice's Adventure in WonderlandIt is sometimes surprising to see just how much data is available. Much like Alice and her sudden increase in height, in Lewis Carroll’s famous story, the upward growth of data can happen quite quickly and the opportunity to produce a multi-billion edge graph becomes immediately present. Luckily, Titan is capable of scaling to accommodate such size and with the right strategies for loading this data, the development efforts can more rapidly shift to the rewards of massive scale graph analytics.

Faunus: Graph Analytics EngineThis article represents the second installment in the two part Powers of Ten series that discusses bulk loading data into Titan at varying scales. For purposes of this series, the “scale” is determined by the number of edges to be loaded. As it so happens, the strategies for bulk loading tend to change as the scale increases over powers of ten, which creates a memorable way to categorize different strategies. “Part I” of this series, looked at strategies for loading millions and tens of millions of edges and focused on usage of Gremlin to do so. This part of the series will focus on hundreds of millions and billions of edges and will focus on the usage of Faunus as the loading tool.

Note: By Titan 0.5.0, Faunus will be pulled into the Titan project under the name Titan/Hadoop.

100 Million

Gremlin to the EighthAs a reminder from this article’s predecessor, loading tens of millions of edges was best handled with BatchGraph. The use of BatchGraph might also be useful in the low hundreds of millions of edges, assuming that the time of loading related to developmental iteration is not a problem. It is at this point that the decision to use Faunus for loading could be a good one.

Faunus is a graph analytics engine that is based on Hadoop and in addition to its role of being an analytic tool, Faunus also provides ways to manage large scale graphs, providing ETL-related functions. By taking advantage of the parallel nature of Hadoop, the loading time for hundreds of millions of edges can be decreased, as compared to a single threaded loading approach with BatchGraph.

The DocGraph data set “shows how healthcare providers team to provide care”. DocGraph was introduced in the previous installment to the Powers of Ten series, where the smallest version of the data set was utilized. As a quick reminder of this data set’s contents, recall that vertices in this network represent healthcare providers and edges represent shared interactions between two providers. This section will utilize the “365-day Window”, which consists of approximately 1 million vertices and 154 million edges.

DocGraph Schema

Graphs in the low hundreds of millions of edges, like DocGraph, can often be loaded using a single Hadoop node running in psuedo-distributed mode. In this way, it is possible to have gain the advantage of parallelism, while keeping the configuration complexity and resource requirements as low as possible. In developing this example, a single m2.xlarge EC2 instance was used to host Hadoop and Cassandra in a single-machine cluster. It assumes that the following prerequisites are in place:

Once the prerequisites have been established, download the DocGraph data set and unpackage it to $FAUNUS_HOME/:

$ curl -L -O http://downloads.cms.gov/foia/physician-referrals-2012-2013-days365.zip
$ unzip physician-referrals-2012-2013-days365.zip

One of the patterns established in the previous Powers of Ten post was the need to always create the Titan Type Definitions first. This step is most directly accomplished by connecting to Cassandra with the Titan Gremlin REPL (i.e. $TITAN_HOME/bin/gremlin.sh) which will automatically establish the Titan keyspace. Place the following code in a file at the root of called $TITAN_HOME/schema.groovy:

g = com.thinkaurelius.titan.core.TitanFactory.open("conf/titan-cassandra.properties")
g.makeKey("npi").dataType(String.class).single().unique().indexed(Vertex.class).make()
sharedTxCount = g.makeKey("sharedTxCount").dataType(Integer.class).make()
patientTotal = g.makeKey("patientTotal").dataType(Integer.class).make()
sameDayTotal = g.makeKey("sameDayTotal").dataType(Integer.class).make()
g.makeLabel("shares").signature(sharedTxCount, patientTotal, sameDayTotal).make()
g.commit()

This file can be executed in the REPL as: gremlin> \. schema.groovy

DocGraph LogoThe DocGraph data is formatted as a CSV file, which means that in order to read this data the Faunus input format must be capable of processing that structure. Faunus provides a number of out-of-the-box formats to work with and the one to use in this case is the ScriptInputFormat. This format allows specification of an arbitrary Gremlin script to write a FaunusVertex, where the FaunusVertex is the object understood by the various output formats that Faunus supports.

The diagram below visualizes the process, where the script defined to the ScriptInputFormat will execute against each line of the CSV file in a parallel fashion, allowing it to parse the line into a resulting FaunusVertex and related edges, forming an adjacency list. That adjacency list can then be written to Cassandra with the TitanCassandraInputFormat.

Faunus ScriptFormat

The following script contains the code to parse the data from the CSV file and will be referred to as $FAUNUS_HOME/NPIScriptInput.groovy

ID_CHARACTERS = ['0'..'9','D'].flatten()
NUM_CHARACTERS = ID_CHARACTERS.size()
 
def long encodeId(String id) {
  id.inject(0L, { acc, c ->
    acc * NUM_CHARACTERS + ID_CHARACTERS.indexOf(c)
  })
}
 
def boolean read(FaunusVertex vertex, String line) {
 
    def (id1,
         id2,
         sharedTxCount,
         patientTotal,
         sameDayTotal) = line.split(',')*.trim()
 
    vertex.reuse(encodeId(id1))
    vertex.setProperty("npi", id1)
 
    def edge = vertex.addEdge(Direction.OUT, "shares", encodeId(id2))
    edge.setProperty("sharedTxCount", sharedTxCount as Integer)
    edge.setProperty("patientTotal", patientTotal as Integer)
    edge.setProperty("sameDayTotal", sameDayTotal as Integer)
 
    return true
}

Hadoop LogoThe most important aspect of the code above is the definition of the read function at line ten, where the FaunusVertex and a single line from the CSV file are fed. This function processes the CSV line by splitting on the comma separator, setting the property on the supplied FaunusVertex and creating the edge represented by that CSV line. Once the script is created to deal with the input file, attention should be turned to the Faunus properties file (named $FAUNUS_HOME/faunus.properties):

# input graph parameters
faunus.graph.input.format=com.thinkaurelius.faunus.formats.script.ScriptInputFormat
faunus.input.location=docgraph/Physician-Referrals-2012-2013-DAYS365.txt
faunus.graph.input.script.file=docgraph/NPIScriptInput.groovy
faunus.graph.input.edge-copy.direction=OUT
 
# output data (graph or statistic) parameters
faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
faunus.graph.output.titan.storage.backend=cassandra
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=9160
faunus.graph.output.titan.storage.keyspace=titan
faunus.graph.output.titan.storage.batch-loading=true
faunus.graph.output.titan.infer-schema=false

mapred.task.timeout=5400000
mapred.max.split.size=5242880
mapred.reduce.tasks=2
mapred.map.child.java.opts=-Xmx8G
mapred.reduce.child.java.opts=-Xmx8G
mapred.job.reuse.jvm.num.tasks=-1
 
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=output
faunus.output.location.overwrite=true

HDFS LogoThe above properties file defines the settings Faunus will use to execute the loading process. Lines two through five specify the input format and properties related to where the source data is. Note that the file locations specified are representative of locations in Hadoop’s distributed file system, HDFS, and not the local file system. Lines eight through fourteen focus on the output format, which is a TitanGraph. These settings are mostly standard Titan configurations, prefixed with faunus.graph.output.titan.. As with previous bulk loading examples in Part I of this series, storage.batch-loading is set to true.

It is now possible to execute the load through the Faunus Gremlin REPL, which can be started with, $FAUNUS_HOME/bin/gremlin.sh. The first thing to do is to make sure that the data and script files are available to Faunus in HDFS. Faunus has built-in help for interacting with that distributed file system, allowing for file moves, directory creation and other such functions.

gremlin> hdfs.mkdir("docgraph")
==>null
gremlin> hdfs.copyFromLocal('Physician-Referrals-2012-2013-DAYS365.txt','docgraph/Physician-Referrals-2012-2013-DAYS365.txt')
==>null
gremlin> hdfs.copyFromLocal("NPIScriptInput.groovy","docgraph/NPIScriptInput.groovy")
==>null

Now that HDFS has those files available, execute the Faunus job that will load the data as shown below:

gremlin> g = FaunusFactory.open("faunus.properties")
==>faunusgraph[scriptinputformat->titancassandraoutputformat]
gremlin> g._()       
13:55:05 INFO mapreduce.FaunusCompiler: Generating job chain: g._()
13:55:05 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.2-job.jar
13:55:05 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)
13:55:05 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]
...
17:55:25 INFO input.FileInputFormat: Total input paths to process : 2
17:55:25 INFO mapred.JobClient: Running job: job_201405141319_0004
17:55:26 INFO mapred.JobClient:  map 0% reduce 0%
17:56:23 INFO mapred.JobClient:  map 1% reduce 0%
...
02:06:46 INFO mapred.JobClient:  map 100% reduce 0%
...
18:54:05 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters
18:54:05 INFO mapred.JobClient:     EDGE_PROPERTIES_WRITTEN=463706751
18:54:05 INFO mapred.JobClient:     EDGES_WRITTEN=154568917
18:54:05 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=624
...
18:54:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=77376

At line one, the FaunusGraph instance is created using the docgraph.properties file to configure it. Line three, executes the job given the configuration. The output from the job follows, culminating in EDGES_WRITTEN=154568917, which is the number expected from this dataset.

The decision to utilize Faunus for loading at this scale will generally be balanced against the time of loading and the complexity involved in handling parallelism in a custom way. In other words, BatchGraph and custom parallel loaders might yet be good strategies if time isn’t a big factor or if parallelism can be easily maintained without Hadoop. Of course, using Faunus from the beginning will allow the same load to scale up easily, as converting from a single machine pseudo-cluster, to a high-powered, multi-node cluster isn’t difficult to do and requires no code changes for that to happen.

1 Billion

Gremlin to the NinethIn terms of loading mechanics, the approach to loading billions of edges, is not so different from the previous section. The strategy for loading is still Faunus-related, however a single machine psuedo-cluster is likely under-powered for a job of this magnitude. A higher degree of parallelism is required for it to execute in a reasonable time frame. It is also likely that the loading of billions of edges will require some trial-and-error “knob-turning” with respect to Hadoop and the target backend store (e.g. Cassandra).

Friendster LogoThe Friendster social network dataset represents a graph with 117 million vertices and 2.5 billion edges. The graph is represented as an edge list, where each line in the CSV file has the out and in vertex represented as a long separated by a colon delimiter. Like the previous example with DocGraph, the use of ScriptInputFormat provides the most convenient way to process this file.

In this case, a four node Hadoop cluster was created using m2.4xlarge EC2 instances. Each instance was configured with eight mappers and six reducers, yielding a total of thirty-two mappers and twenty-four reducers in the cluster. Compared to the single machine pseudo-cluster used in the last section, where there were just two mappers and two reducers, this fully distributed cluster has a much higher degree of parallelism. Like the previous section, Hadoop and Cassandra were co-located, where Cassandra was running on each of the four nodes.

As the primary difference between loading data at this scale and the previous one is the use of a fully distributed Hadoop cluster as compared to a pseudo-cluster, this section will dispense with much of the explanation related to execution of the load and specific descriptions of the configurations and scripts involved. The script for processing each line of data in the Friendster dataset looks like this:

import com.thinkaurelius.faunus.FaunusVertex
import static com.tinkerpop.blueprints.Direction.OUT
 
def boolean read(final FaunusVertex v, final String line) {
    def parts = line.split(':')
    v.reuse(Long.valueOf(parts[0]))
    if (parts.size() > 1) {
        parts[1].split(',').each({
            v.addEdge(OUT, 'friend', Long.valueOf(it))
        })
    }
    return true
}

The faunus.properties file isn’t really any different than the previous example except that it now points to Friendster related files in HDFS in the “input format” section. Finally, as with every loading strategy discussed so far, ensure that the Titan schema is established first prior to loading. The job can be executed as follows:

gremlin> hdfs.copyFromLocal("/tmp/FriendsterInput.groovy","FriendsterInput.groovy")
==>null
gremlin> g = FaunusFactory.open("bin/friendster.properties")
==>faunusgraph[scriptinputformat->titancassandraoutputformat]
gremlin> g._()                                              
18:28:46 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.4-job.jar
18:28:46 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)
18:28:46 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]
...
18:28:47 INFO input.FileInputFormat: Total input paths to process : 125
18:28:47 INFO mapred.JobClient: Running job: job_201405111636_0005
18:28:48 INFO mapred.JobClient:  map 0% reduce 0%
18:29:39 INFO mapred.JobClient:  map 1% reduce 0%
...
02:06:46 INFO mapred.JobClient:  map 100% reduce 0%
...
02:06:57 INFO mapred.JobClient:   File Input Format Counters 
02:06:57 INFO mapred.JobClient:     Bytes Read=79174658355
02:06:57 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters
02:06:57 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=15094
02:06:57 INFO mapred.JobClient:     EDGES_WRITTEN=2586147869
02:06:57 INFO mapred.JobClient:   FileSystemCounters
02:06:57 INFO mapred.JobClient:     HDFS_BYTES_READ=79189272471
02:06:57 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1754590920
...
02:06:57 INFO mapred.JobClient:     Bytes Written=0

The billion edge data load did not introduce any new techniques in loading, but it did show that the same technique used in the hundred million edge scale could scale in a straight-forward manner to billion edge scale without any major changes to the mechanics of loading. Moreover, scaling up Faunus data loads can really just be thought of as introducing more Hadoop nodes to the cluster.

Conclusion

Gremlin to the QuestionOver the course of this two post series, a number of strategies have been presented for loading data at different scales. Some patterns, like creating the Titan schema before loading and enabling storage.batch-loading, carry through from the smallest graph to the largest and can be thought of as “common strategies”. As there are similarities that can be identified, there are also vast differences ranging from single-threaded loads that take a few seconds to massively parallel loads that can take hours or days. Note that the driver for these variations is the data itself and that aside from “common strategies”, the loading approaches presented can only be thought of as guidelines which must be adapted to the data and the domain.

Complexity of real-world schema will undoubtedly increase as compared to the examples presented in this series. The loading approach may actually consist of several separate load operations, with strategies gathered from each of the sections presented. By understanding all of these loading patterns as a whole, it is possible to tailor the process to the data available, thus enabling the graph exploration adventure.

Acknowledgments

Dr. Vadas Gintautas originally foresaw the need to better document bulk loading strategies and that such strategies seemed to divide themselves nicely in powers of ten.

Authors

Stephen MalletteDaniel Kuppitz

Powers of Ten – Part I

“No, no! The adventures first,’ said the Gryphon in an impatient tone: ‘explanations take such a dreadful time.”
    — Lewis CarrollAlice’s Adventures in Wonderland

It is often quite simple to envision the benefits of using Titan. Developing complex graph analytics over a multi-billion edge distributed graph represent the adventures that await. Like the Gryphon from Lewis Carroll’s tale, the desire to immediately dive into the adventures can be quite strong. Unfortunately and quite obviously, the benefits of Titan cannot be realized until there is some data present within it. Consider the explanations that follow; they are the strategies by which data is bulk loaded to Titan enabling the adventures to ensue.

Alice's Adventure in WonderlandThere are a number of different variables that might influence the approach to loading data into a graph, but the attribute that provides the best guidance in making a decision is size. For purposes of this article, “size” refers to the estimated number of edges to be loaded into the graph. The strategy used for loading data tends to change in powers of ten, where the strategy for loading 1 million edges is different than the approach for 10 million edges.

Given this neat and memorable way to categorize batch loading strategies, this two-part article outlines each strategy starting with the smallest at 1 million edges or less and continuing in powers of ten up to 1 billion and more. This first part will focus on 1 million and 10 million edges, which generally involves common Gremlin operations. The second part will focus on 100 million and 1 billion edges, which generally involves the use of Faunus.

1 Million

Gremlin Ten to the SixthIn the range of millions of edges or less, there really isn’t a particular loading strategy to follow, as the graph will likely fit in memory and the load time will be reasonably fast. Mistakes at this scale are not as costly because problems are typically straightforward to diagnose and the graph can be reloaded from scratch without much cost in terms of time.

As explained in a previous blog post entitled Polyglot Persistence and Query with Gremlin, the Gremlin REPL is a flexible environment for working with any kind of data. Obviously, it provides access to graph databases like Titan, but within the same REPL session, it is possible to also connect to a relational database, reach out to a web service, read a file, etc. Given this capability, writing a Gremlin script that can be executed through the REPL is likely the most lightweight and direct manner to get data into a graph.

Wikipedia Vote Network SchemaThe Wikipedia Vote Network is a data set that “contains all the Wikipedia voting data from the inception of Wikipedia until January 2008. Vertices in the network represent Wikipedia users and a directed edge from vertex i to vertex j represents that user i voted on user j.” Within its basic tab-delimited data structure, it contains 7,115 vertices and 103,689 edges, making it a good size for this demonstration.

All of the examples in this post, assume that the latest version of Titan is downloaded and unpackaged (titan-all packaging is required for the examples). From Titan’s root directory download and unpackage the Wikipedia Vote Network data set:

$ curl -L -O http://snap.stanford.edu/data/wiki-Vote.txt.gz
$ gunzip wiki-Vote.txt.gz

Unzipping the archive will create the wiki-Vote.txt file in the root of Titan directory. The following Gremlin script demonstrates how to load that file into Titan (backed by BerkleyDB):

g = TitanFactory.open('/tmp/1m')
g.makeKey('userId').dataType(String.class).indexed(Vertex.class).unique().make()
g.makeLabel('votesFor').make()
g.commit()

getOrCreate = { id ->
  def p = g.V('userId', id)
  if (p.hasNext()) ? p.next() : g.addVertex([userId:id])
}
 
new File('wiki-Vote.txt').eachLine {
  if (!it.startsWith("#")){
    (fromVertex, toVertex) = it.split('\t').collect(getOrCreate)
    fromVertex.addEdge('votesFor', toVertex)
  }
}
 
g.commit()

The key portions of the data loading script to pay attention to are as follows:

  • g.makeKey(‘userId’)… – Create types in Titan first. In this case, the schema only consists of a userId that will be on each user vertex. Always commit at the end of type creation and before loading data to the graph instance.
  • getOrCreate = { id ->... – A helper function that takes a vertex identifier (i.e. userId) as an argument and does an index look-up to determine if the vertex already exists. If it does exist the vertex is returned, but if it does not exist it is created. The concept of getOrCreate is a common one and having an efficient helper function to perform this task will almost always be necessary.
  • new File('wiki-Vote.txt').eachLine { – Read the source data file line by line executing the supplied closure for each one.
  • if (!it.startsWith("#")){ – The file contains comments which are identified by a # at the start of the line. These lines should be ignored.
  • (fromVertex, toVertex) = it.split('\t').collect(getOrCreate) – Each line consists of a pair of tab delimited values. This code splits the line of text on the tab to create a list containing two userId values. The getOrCreate function is applied over those values with collect and the resulting list is then destructured into two variables containing vertices in the graph that either already existed or were otherwise newly created: fromVertex and toVertex.
  • fromVertex.addEdge('votesFor', toVertex) – Constructs the edge between the two vertices.
  • g.commit() – It is worth noting that this load is performed within the context of a single transaction. At the higher end of the 1 million edge range, it will be necessary to perform intermediate commits during that process.

To execute this script, copy it into a file called load-1m.groovy at the root of the Titan install directory. Please note that the script will generate the Titan database on the file system at /tmp/1m. Start Gremlin with bin/gremlin.sh. When the REPL has initialized execute the script as follows:

$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> \. load-1m.groovy
==>titangraph[local:/tmp/1m]
==>userId
...
==>null
gremlin> g.V.count()
==>7115
gremlin> g.E.count()
==>103689

The Wikipedia Vote Network has a simple schema. Even at the scale of 1 million edges, the complexity of a batch loading script can only rise from this case. The loading script in this section provides for a good skeleton on which more complicated loads can be fleshed out.

10 Million

Gremlin Ten to the SeventhThe approach to loading tens of millions of edges isn’t so different from the previous section. A Gremlin script is still the most direct approach to loading, however there are some differences to consider. The most important of these differences is the use of BatchGraph, which handles intermediate commits of transactions at specified intervals and maintains a vertex cache for fast retrieval. Please refer to the BatchGraph documentation for important information on the limitations of its usage.

The DocGraph data set “shows how healthcare providers team to provide care”. Vertices in this network represent healthcare providers, where they are identified by an NPI number. Edges represent shared interactions between two providers with three properties that further qualify that interaction. The data is partitioned into several sizes based on time windows. This section will utilize the “30-day Window”, which consists of approximately 1 million vertices and 73 million edges.

DocGraph Schema

From Titan’s root directory download and unpackage the DocGraph data set:

$ curl -L -O http://downloads.cms.gov/foia/physician-referrals-2012-2013-days30.zip
$ unzip physician-referrals-2012-2013-days30.zip && rm physician-referrals-2012-2013-days30.zip
$ head -n3 Physician-Referrals-2012-2013-DAYS30.txt

$ sort Physician-Referrals-2012-2013-DAYS30.txt > Physician-Referrals-2012-2013-DAYS30-sorted.txt

Unzipping the archive will create the Physician-Referrals-2012-2013-DAYS30.txt file in the root of Titan directory. Unlike the case in the previous section, the data is pre-sorted by the NPI number of what will be the out vertex for each edge. Pre-sorting the data will help improve the performance of BatchGraph, as writes to and flushes of the cache are reduced. The following Gremlin script demonstrates how to load that file into Titan (backed by BerkleyDB):

conf = new BaseConfiguration() {{
  setProperty("storage.backend", "berkeleyje")
  setProperty("storage.directory", "/tmp/10m")
  setProperty("storage.batch-loading", true)
}}
 
g = TitanFactory.open(conf)
g.makeKey("npi").dataType(String.class).single().unique().indexed(Vertex.class).make()
sharedTransactionCount = g.makeKey("sharedTxCount").dataType(Integer.class).make()
patientTotal = g.makeKey("patientTotal").dataType(Integer.class).make()
sameDayTotal = g.makeKey("sameDayTotal").dataType(Integer.class).make()
g.makeLabel("shares").signature(sharedTransactionCount, patientTotal, sameDayTotal).make()
g.commit()
 
bg = new BatchGraph(g, VertexIDType.STRING, 10000)
bg.setVertexIdKey("npi")
 
c = 0L
new File("Physician-Referrals-2012-2013-DAYS30-sorted.txt").eachLine({ final String line ->
 
    def (id1,
         id2,
         sharedTransactionCount,
         patientTotal,
         sameDayTotal) = line.split(',')*.trim()
 
    def v1 = bg.getVertex(id1) ?: bg.addVertex(id1)
    def v2 = bg.getVertex(id2) ?: bg.addVertex(id2)
    def edge = bg.addEdge(null, v1, v2, "shares")
    edge.setProperty("sharedTxCount", sharedTransactionCount as Integer)
    edge.setProperty("patientTotal", patientTotal as Integer)
    edge.setProperty("sameDayTotal", sameDayTotal as Integer)
 
    if (++c%100000L == 0L) println "Processed ${c} edges"
 
})
 
bg.commit()

The anatomy of this script is as follows (it can be executed in the Gremlin REPL with the instructions supplied in the previous section):

  • setProperty("storage.batch-loading", true) – Enabling “batch loading” for Titan will help improve performance by disabling consistency checks and locking. Read more about this option and other settings that can affect batch loading in the Titan documentation.
  • g.makeKey("npi")... – As in the previous example at the 1 million edge scale, types should be created and committed first.
  • bg = new BatchGraph(g, VertexIDType.STRING, 10000) – Wrap the TitanGraph instance in a BatchGraph, define the data type of the identifier which in this case for the NPI number is a STRING, and set the transaction size to 10000. With this setting, BatchGraph will automatically commit transactions on every 10,000 mutations to the graph.
  • bg.setVertexIdKey("npi") – Tells BatchGraph that the vertex identifier will be stored in a vertex property key called npi.
  • ...sameDayTotal) = line.split(',')*.trim() – Each line in the file consists of a pair of comma delimited values. This line splits the line of text on the comma to create a list containing five values destructured to five variables.
  • def v1 = bg.getVertex(id1) ?: bg.addVertex(id1)BatchGraph helps simplify the getOrCreate function from the previous section. BatchGraph overrides the default functionality of addVertex and getVertex allowing a specification and look-up of a vertex by the NPI number. If the vertex is not found, getVertex will return null and the vertex will have to be added.
  • bg.commit() – With all loading complete, make a final call to commit to finalize any remaining elements in the transaction buffer.

DocGraph LogoThe DocGraph example demonstrated the key strategies for loading tens of millions of edges, which in summary are: pre-process data when possible to ease loading and improve performance and use BatchGraph to allow focus on the data being loaded as opposed to loading mechanics, such as manually batching commits, handling identifiers, and writing getOrCreate methods.

Some other strategies and ideas to consider at this scale include:

  • Do programming and testing of load scripts with a subset of the data to improve development cycle time.
  • Use third-party libraries to to be more productive and reduce the amount of code to be written (e.g. groovycsv).
  • Consider methods of parallel loading using gpars, if the data is such that it can be organized to allow for that.
  • If there is an inclination to load data from a non-JVM language, like Python, reconsider this article and write the load script in Gremlin. In this way, the process of loading data can be completed quickly, allowing focus on language-specific tooling (e.g. Bulbs) for Python application development).
    • The tens of millions size falls into the realm of “Boutique Graph Data“, where many applications will fit or at least be where most applications will start. In that sense, this size is perhaps one of the more relevant sizes in the “Powers of Ten.”

      Conclusion

      This article explored the lower scales of loading data to Titan. At the millions and tens of millions of edges scales, a Gremlin script and the REPL is generally all that is required for batch-loading activities. For those just getting started with TinkerPop and Titan, taking this approach means having to learn the least amount of the stack in order to get started. Being comfortable at this scale of loading is critical to being successful at the hundreds of millions and billions of edges scales to be described in the next part of this article series.

      Acknowledgments

      Dr. Vadas Gintautas originally foresaw the need to better document bulk loading strategies and that such strategies seemed to divide themselves nicely in powers of ten.

      Authors

      Stephen MalletteDaniel Kuppitz

The Social Graph of the Los Alamos National Laboratory

John Carter Movie Poster The web is composed of numerous web sites tailored to meet the information, consumption, and social needs of its users. Within many of these sites, references are made to the same platonic “thing” though different facets of the thing are expressed. For example, in the movie industry, there is a movie called John Carter by Disney. While the movie is an abstract concept, it has numerous identities on the web (which are technically referenced by a URI).

Marko LinkedIn Headshot Social sites represent the identities of people. The “thing” known as Marko A. Rodriguez has an identity on Twitter, LinkedIn, and WordPress. Each identity is referring to the same platonic concept (noumenon), but in doing so, provides a different interpretation of its essence (phenomenon). The question then becomes — “What is Marko A. Rodriguez?” Each website only presents a particular view of Marko which is ultimately a lossy representation. Marko "Deal" Headshot For Marko is embedded within the larger world of yet more “things” (i.e., his artifacts of creation, his institutions of study, his social collaborations) and their embeddings, ad infinitum. Therefore, no single site with finite storage will be able to fully grasp Marko. To approach a full understanding of the platonic concept of Marko, one must bear witness to all the phenomenological aspects that are and indirectly related to Marko. In doing so, one realizes the plantonic Marko A. Rodriguez for which no concept can bear.

A Unified Graph of People, Institutions, Artifacts, and Concepts

Los Alamos National LaboratoryAurelius collaborated with the Digital Library Research and Prototyping Group of the Los Alamos National Laboratory (LANL) to develop EgoSystem atop the distributed graph database Titan. The purpose of this system is best described by the introductory paragraph of the April 2014 publication on EgoSystem.

EgoSystemEgoSystem was developed to support outreach to former Los Alamos National Laboratory (LANL) employees. As a publicly funded research organization, there are many reasons why LANL would want to maintain contact with its “alumni.” Scientific research is often collaborative. Former employees know the Lab and its work, and often have colleagues who remain employed at LANL. These relationships fuel intra- and interdisciplinary projects. Government research agencies are also encouraged to partner with the private sector. Productizing LANL output becomes an opportunity for a public-private or commercial entity via a technology transfer process. Some small businesses (and jobs) owe their existence to ideas that were first developed at LANL. Public support for the ongoing research at LANL plays a role in ensuring support for adequate funding levels for that work. Outreach to alumni can encourage them to serve as ambassadors and advocates for LANL and its mission.

From a technical standpoint, EgoSystem discovers and aggregates information on LANL employees (past and present), their created artifacts, institutions, and associated concepts/tags/keywords throughout the web. Moreover, as employees relate to other individuals (e.g. coauthors, followers), their respective identities are aggregated as well. This automated discovery process yields a social graph that presents EgoSystem users with a consolidated view of the multiple facets of an individual as portrayed by the numerous websites in which they use. Simple questions can be answered: what do they Tweet about?, what are their publications?, what research concepts do they like?, where have they been employed?, who are their coauthors?, etc. In essence, EgoSystem removes the manual process of having to iteratively use a search engine to find Marko A. Rodriguez on LinkedIn, Twitter, ArXiv, etc. and presents the information in an integrated manner along with novel social-based statistics only possible when the information in these disparate data silos is linked and processed as a whole (see A Reflection on the Structure and Process of the Web of Data). The following two sections will describe EgoSystem’s data model (its graph schema) and its data processes (its graph traversals).

The EgoSystem Graph Data

There are two categories of vertices in EgoSystem.

  1. Platonic: Denotes an abstract concept devoid of interpretation.
  2. Identity: Denotes a particular interpretation of a platonic.

Every platonic vertex is of a particular type: a person, institution, artifact, or concept. Next, every platonic has one or more identities as referenced by a URL on the web. The platonic types and the location of their web identities are itemized below. As of EgoSystem 1.0, these are the only sources from which data is aggregated, though extending it to support more services (e.g. Facebook, Quorum, etc.) is feasible given the system’s modular architecture.

The graph diagrammed below demonstrates the four platonic types, their realization as identities in the various online web systems mentioned above, and how the vertices semantically relate to one another. Note that the properties of the vertices and edges are not diagrammed for the sake of diagram clarity, but include properties such as name, url, startTime/endTime, longitude/latitude, logoUrl, and various publication statistics such as h-index and g-index.

EgoSystem Schema

Titan: Distributed Graph Database At the top of the diagram there is a yellow colored, platonic person vertex. That person has an email address, MS Academic page, Mendeley page, two institutional affiliations, and a Twitter account. These vertices are known as the identities of the platonic person. The Twitter identity follows another Twitter identity that is associated with yet another person platonic. In this person’s MS Academic identity, an article was authored. That article is not the platonic article, but the identity as represented by MS Academic (as referenced by an MS Academic URL). The platonic article has another identity represented in Mendeley. That is, both MS Academic and Mendeley make reference to the same plantonic article. The MS Academic article identity relates to a concept identity (i.e. a tag or keyword). Such concepts are bundled by a platonic representing the abstract notion of that concept irrespective of how it is portrayed by the particular websites. In summary, identities relate to other identities via semantically-typed edges and all identities referring to the same platonic are bundled together under the respective platonic.

The use of a platonic is a way of yielding an n-ary relationship in a binary graph. This relates to concepts in the Semantic Web community around blank nodes, owl:sameAs, and named graphs. The platonic makes it easy to partition the data sets whereby the Twitter subgraph can be analyzed irrespective of the MS Academic authorship subgraph. However, the benefit of having all this data aggregated (by means of the plantonics) and stored in the Titan graph database is that it is possible to perform novel traversals/queries that span the once isolated data silos with limited scalability concerns (see Faith in the Algorithm, Part 2: Computational Eudaemonics).

The EgoSystem Graph Processes

There are two general processes that occur in EgoSystem:

  1. Discovery processes: data aggregation and validation
  2. User processes: graph analysis, ranking, scoring, and presentation

Discovery Processes

The graph schema presented previously is populated using EgoSystem’s “discovery process.” Given some basic information about an individual (their name, place of work, educational institution), EgoSystem makes use of search APIs and its existing graph structure to find matches in the various web systems discussed. Suppose there is a person named Jie Bao and all that is known is that his research institution is Rensselaer Polytechnic Institute (RPI). Unfortunately, there are numerous people with the name Jie Bao and thus, numerous Twitter accounts as well. Jie Bao CoAuthorship Graph However, if we know that Jie Bao is at RPI and there is a person named Li Ding at RPI as well (for which EgoSystem has a Twitter account for), and one particular Jie Bao Twitter account follows Li Ding’s Twitter account, then there is some certainty that that Jie Bao Twitter identity is the one EgoSystem is looking for. Furthermore, if we know Li Ding’s MS Academic identity, then with great certainty we can assume his Jie Bao coauthor is in fact referring to the same platonic Jie Bao currently being discovered. In this way, external web service API calls along with internal graph searches provides the means by which EgoSystem is able to continuously iterate over its data to further populate and refine the graph data structure. Moreover, this process is non-linear in that sometimes a homepage web-scrape provides a person’s Twitter account which is assumed to be 100% accurate. In short summary, a collection of such rules, ranked by their accuracy, continually fire as background processes in EgoSystem to help update and refine the social graph.

User Processes

Gremlin Running When a user interacts with EgoSystem, there are more salient processes that are ultimately encoded as Gremlin graph traversals. The simplest graph traversal is a one-step hop from the platonic vertex to its identities. Such walks are used to populate a splash page about that platonic. A screenshot of the Marko A. Rodriguez (uuid:123-456) splash page is presented below.

g.V('uuid','123-456').out('hasIdentity')

Notice the tag cloud associated with Marko in the screenshot below. Note that no person identities have direct references to concepts. The diagrammed tag cloud is derived from the concepts associated with Marko’s MS Academic and SlideShare artifacts. This inference is defined using the following multi-step Gremlin traversal that ultimately generates a frequency distribution. The returned distribution is used to determine the font-size of the tags in the tag cloud.

g.V('uuid','123-456').out('hasIdentity')
   .has('service',T.in,["msacademic","slideshare"]).out
   .out('hasConcept').name.groupCount

EgoSystem User Interface

EgoSystem’s processes get more interesting when doing path analysis in the graph. In the interface above Aric Hagberg is “saved” (top right of the interface). With source and sink vertices, common path problems can be readily solved using EgoSystem.

  • Who do Marko and Aric know in common? (coauthors, Twitter followers, etc.)
  • Is there a path from me to Marko via Aric?
  • Which of Marko and Aric’s shared social connections currently work at LANL?

A collection of other complex traversals provided by EgoSystem are presented below.

  • Which paths exist from Marko to current LANL employees?
  • Does Marko have any collaborators that work at LANL and are currently in or near Washington DC? (uses Titan’s Elasticsearch geo-index/search capabilities)
  • Which of Marko’s coauthors does he not follow on Twitter?
  • Provide a histogram of the concepts of the authored publications of all LANL employees. (uses Faunus for OLAP graph analytics)
  • Which of my collaborators has worked with someone with graph theory experience?

The questions that can be asked of such a graph are only bound by the data which is aggregated and when numerous sources of variegated data is integrated into a universal social graph, the queries quickly become boundless.

Conclusion

EgoSystem Mini Logo EgoSystem was designed and developed by the Los Alamos National Laboratory and Aurelius to solve problems related to social searching employees and alumni of the lab. The principles are general and can be applied to any organization wishing to understand the greater social context in which they are embedded. No organization is an island unto itself and realizing how their employees and artifacts effect and are effected by others provides an organization a birds eye view of its surrounding world.

Acknowledgements

EgoSystem was designed and developed by James Powell (LANL), Harihar Shankar (LANL), Marko A. Rodriguez (Aurelius), and Herbert Van de Sompel (LANL). Joshua Shinavier contributed ideas to the textual content of this blog post. The story behind EgoSystem’s development is a testament to its mission statement of helping LANL to maintain connections with its alumni. Marko A. Rodriguez was a graduate researcher at the Digital Library Research and Prototyping Team in 2004 under the tutelage of Johan Bollen and Herbert Van de Sompel (in association with Alberto Pepe). While in this group, he studied information science from a graph theory perspective. Since then, Marko went on to create Aurelius and through his lasting ties with LANL, was invited to collaborate on the research and development of EgoSystem.

Powell, J., Shankar, H., Rodriguez, M.A., Van de Sompel, H., “EgoSystem: Where are our Alumni?,” Code4Lib, issue 24, ISSN:1940-5758, April 2014.

Author

Marko A. Rodriguez

Boutique Graph Data with Titan

Boutique Graph Data Titan is a distributed graph database capable of supporting graphs on the order of 100 billion edges and sustaining on the order of 1 billion transactions a day (see Educating the Planet with Pearson). Software architectures that leverage such Big Graph Data typically have 100s of application servers traversing a distributed graph represented across a multi-machine cluster. These architectures are not common in that perhaps only 1% of applications written today require that level of software/machine power to function. The other 99% of applications may only require a single machine to store and query their data (with a few extra nodes for high availability). Such boutique graph applications, which typically maintain on the order of 100 million edges, are more elegantly served by Titan 0.4.1+. In Titan 0.4.1, the in-memory caches have been advanced to support faster traversals which makes Titan’s single-machine performance comparable to other single machine-oriented graph databases. Moreover, as the application scales beyond the confines of a single machine, simply adding more nodes to the Titan cluster allows boutique graph applications to seamlessly grow to become Big Graph Data applications (see Single Server to Highly Available Cluster).

Graph Representations On-Disk and In-Memory

When representing a graph within a computer, the design of both its on-disk and in-memory representations effect its read/write performance. It is important that designs are sympathetic to the graph usage patterns of the application (see A Letter Regarding Native Graph Databases). For instance, when on-disk, it is best that co-retrieved graph data be co-located else costly random disk seeks will be required. Titan leverages an adjacency list representation on-disk where a vertex and its incident edges are co-located. Titan also allows for the sorting of a vertex’s incident edges according to application-specific edge-comparators. These vertex-centric indices support fine-grained retrieval of a vertex’s incident edges from disk and (hopefully) from the same page of disk (see A Solution to the Supernode Problem).

Parthenon Once the graph data is retrieved from disk, it is placed into memory. Titan 0.4.1+ maintains two levels of in-memory caching: warm and hot. Warm caches store the hotspots of the global adjacency list and hot caches store the locally traversed sub-graph of a transaction. Both caches keep their data elements (vertices and edges) as byte-buffers. For traversals that move from vertex-to-vertex using edge labels and properties as filters, edge object creation can be avoided by leveraging the aforementioned vertex-centric sort orders and classic binary search. With typical natural graphs having multiple orders of magnitude more edges than vertices, this greatly reduces the number of objects created and the amount of memory used. Moreover, this allows memory to be accessed sequentially which increases the chance of a CPU cache hit (see Random vs. Sequential Memory Access).

In sum, the updates in Titan 0.4.1+ address multiple levels of the memory hierarchy in that the disk, the global cache, and the local transactions each have appropriate data structures respective of their ultimate role in graph processing. The benefits of these designs are demonstrated in the sections to follow.

Titan Graph Traversals on a Boutique Graph

Amazon Review Schema In order to test both Titan’s on-disk and in-memory representations at boutique scale, a graph version of Amazon’s web rating data was generated. The benefit of this data set is two fold: 1.) it is a good size (37 million edges) to demonstrate a single machine use case and 2.) it supports edge properties which demonstrate the benefit of Titan’s vertex-centric indices in graph processing. The schema for the graph is represented on the left where there are two types of vertices: users and products. The graph is bipartite in that users link to products by reviewed-edges. The reviewed-edges maintain two properties: score (a float between 1-5) and time (a long in Unix time).

Blueprints The benchmark to follow was executed on two types of Amazon EC2 instances: m2.xlarge (hard disk drive — HDD) and c3.xlarge (solid state drive — SSD). To demonstrate Titan 0.4.1’s relative performance benefits, Titan 0.4.0 and a popular 3rd party graph database were tested as well. The same code was run on all systems by leveraging TinkerPop‘s Blueprints graph API. The JVM’s heap settings were -Xms3G -Xmx3G and default options for all systems were used — except for a warm cache being explicitly enabled for Titan 0.4.1:

cache.db-cache=true
cache.db-cache-size=0.5
cache.db-cache-time=0

The benchmark was executed as enumerated below. Note that for cold cache times, the query runtime for the first query was taken. For warm cache times, the transaction is rolled back and the query is executed again. For hot cache times, the query is not rolled back and the query is re-executed. The actual Gremlin traversals are provided in the code block below minus a .count() which was used to ensure all databases returned the same result set. Finally, note that Titan/Cassandra was used for this experiment though the recent cache developments are available to any of Titan’s underlying storage systems.

Gremlin: A Graph Traversal Language

  1. 300 random user ids were gathered from the raw Amazon web review data.
  2. 100 users were queried with the “user preference”-traversal.
  3. 100 users were queried with the “user similarity”-traversal.
  4. 100 users were queried with the “product recommendation”-traversal.
// what products has the user liked in the last year?
//  user preference
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV


// what users have liked the same products that the user liked within the last year?
//  user similarity
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV
  .inE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).outV()
  .except([user])


// what products are liked by the users that like the same products as the user within the last year? 
//   product recommendation
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV.aggregate(x)
  .inE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).outV
  .except([user])[0.<1000]
  .outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV
  .except(x).groupCount(m)

The benchmark results are available for exploratory viewing at the following URLs (made possible by HighCharts JS).

The plots have more data points on the left-end of the x-axis. To examine the results of these smaller result sets, the plots can be zoomed in by dragging a square over that region of the chart. Each plot line can be activated (or deactivated) by clicking on the respective line’s name in the legend of the plot.

It is important to stress that the results of any benchmark should be understood in the context in which they were executed. This benchmark demonstrates relative performance

  • on a boutique graph dataset (i.e. a relatively small graph),
  • using default settings for the graph databases,
  • against two different long term storage mediums (HDD and SSD),
  • within the resource constraints of a single machine (m2.xlarge and c3.xlarge),
  • executing 3 particular read-based queries,
  • and with only a single-user/thread querying the database at any one time.

Cold Caches Demonstrate On-Disk Performance

Cold Cache

The diagram above plots the number of vertices returned by the traversal on the x-axis against the time it took for the traversal to complete in milliseconds on the y-axis. Titan’s cold cache performance numbers are due in large part to its sequentialization of the graph by co-locating edges with their vertices on disk (see Scalable Graph Computing: Der Gekrümmte Graph). Furthermore, with vertex-centric indices, edges are not only co-located with their vertices but also sorted such that “a score equal to 5″ is quickly fetched using an offset index. When edges are not co-located with their vertices, then the graph is not laid out sequentially. This can lead to poor fetch times as random disk seeks are required (see purple line). The effect of random disk access is made more apparent on the hard disk drive plot (as opposed to the solid state drive plot above). Note that pure sequentialization is difficult to achieve in any multi-user system which allows for random writes. For this reason, various compaction processes occur to combat defragmentation of the data on the disk.

Due to graph compression techniques based on data co-location and logical adjacency, Titan’s disk footprint remains relatively low.

  • Titan/Cassandra — 2.9 gigabytes
  • Popular Graph Database — 15.0 gigabytes

Faunus Sequentialization and a compressed data format are useful when executing distributed OLAP queries with Faunus. Rows of data (i.e. a vertex and its incident edges) can be sequentially pulled off the disk allowing for global parallel scans of the graph to execute more efficiently (see Faunus Provides Big Graph Data Analytics). Moreover, it is faster to move 2.9 gigabytes of data than 15.0 gigabytes.

Warm Caches Demonstrate Off-Heap Cache Performance

Warm Cache

Once data has been pulled off of disk, it is put into memory. Titan has always performed well in pulling graph data off of the underlying storage medium (as seen in the previous section with both HDD and SSD). In terms of in-memory performance, Titan 0.4.1 now boasts a warm cache system. The benefit of this addition is made apparent in the plot above when comparing Titan 0.4.0 and Titan 0.4.1. With Titan 0.4.1, heavily accessed areas of the graph are stored in a global cache composed primarily of byte-buffers. If the data is not in the cache (a cache miss), then Cassandra’s caching mechanisms are leveraged (typically a key-cache and an OS-based page cache). It is important to emphasize that a warm cache miss in Titan 0.4.1 yields runtimes equivalent to those seen in Titan 0.4.0 (which for the complex traversals presented, remain sub-second — see the gray line). These sub-second cache miss times are possible because of Titan’s on-disk representation. Graph sequentialization enables effective use of the OS’s page cache, where “going to disk” will typically not yield random seeks.

Hot Caches Demonstrate On-Heap Transaction Performance

Hot Cache

To answer a query, the traversal primitives first check for data in the current transaction’s hot cache, then the global warm cache, and then on a particular page of disk (which again, may be in memory because of the operating system’s page cache). Wherever the data is ultimately found, it is computed on in the local transaction’s hot cache. For traversals that move from vertex-to-vertex by filtering paths using edge properties and labels, Titan does not need to deserialize the edges into objects. Instead, it operates directly on the byte-buffers stored in the warm cache. This greatly reduces the number of objects created (|V| << |E|), the amount of time spent garbage collecting the young generation heap, and stages Titan nicely for speedy, recursive graph algorithms (to be explored in a future blog post).

Conclusion

AureliusAurelius has consistently touted Titan as a Big Graph Data solution for OLTP graph applications that leverage graphs and transactional loads so large that they require more resources than a single machine can provide. However, for organizations looking to use a graph database, 100s of billions of edges is typically not their starting point. Titan 0.4.1+ better supports boutique shops beginning with graphs and wanting to ultimately procure a massive, universal graph of their domain. Moreover, all this graph processing functionality is provided under the commercial friendly, Apache2, free software license. With Titan 0.4.1+, boutique graph applications can freely move from a single machine, to a highly available single node (e.g. a 3 machine cluster with replication factor 3), to a fully distributed graph represented across an arbitrary number of machines.

Acknowledgements

This post was embarked on by Dr. Rodriguez after he wrote an email to the Aurelius team stating that Titan’s in-memory cache should be optimized for boutique graph use cases. Pavel Yaskevich took up the challenge and was able to increase traversal speeds using software hotspot profiling. After which, Dr. Bröcheler yielded further optimizations by cleverly leveraging the sequential in-memory byte-buffer representations of the graph data. Dr. Julian McAuley provided Aurelius a pre-processed version of the Amazon web review data under the BSD license and Daniel Kuppitz leveraged that data set in order to painstakingly stage and execute the benchmark presented.

Authors

Daniel Kuppitz Matthias Broecheler Marko A. Rodriguez

A Letter Regarding Native Graph Databases

It’s fun to watch marketers create artificial distinctions between products that grab consumer attention. One of my favorite examples is Diamond Shreddies. Shreddies, a whole wheat cereal, has a square shape and was always displayed as such. So an ingenious advertiser at Kraft foods thought to advertise a new and better Diamond Shreddies. It’s a fun twist that got people’s attention and some consumers even proclaimed that Diamond Shreddies tasted better though they obviously ate the same old product.

Such marketing techniques are also used in the technology sector — unfortunately, at a detriment to consumers. Unlike Kraft’s playful approach, there are technical companies that attempt to “educate” engineers on artificial distinctions as if they were real and factual. An example from my domain is the use of the term native graph database. I recently learned that one graph database vendor decided to divide the graph database space into non-native (i.e. square) and native (i.e. diamond) graph databases. Obviously, non-native is boring, or slow, or simply bad and native is exciting, or fast, or simply good.

Problem is: There is no such thing as a native graph database.

On the Concept of Native Computing

Let’s look at the definition of “native” when applied to data as taken directly from Wikipedia’s Native Computing article:

Applied to data, native data formats or communication protocols are those supported by a certain computer hardware or software, with maximal consistency and minimal amount of additional components.

Memory Hierarchy I’m not claiming that Wikipedia is an authority on this subject, but this is a baseline definition we can work with for the purpose of this letter’s argument. From Wikipedia’s definition, it follows that a native graph database is a graph database that represents the graph (i.e. data) maximally consistent with the underlying hardware. Currently, all commercially-available hardware follows the Von Neumann architecture. Under the Von Neumann architecture, the memory subsystems are represented as a sequential memory space. Moreover, in said memory systems, sequential access is significantly faster than random access. Realize this for yourself by writing a very large array into RAM and then comparing sequential vs. random access times. If you are too busy, read Pathologies of Big Data as the author has done the comparison for you on different types of memory systems. If you are regularly working with non-trivial amounts of data, you most definitely should read the Pathologies of Big Data article.

Next, the purpose of any database is to retrieve a query result set by navigating the memory hierarchy and sequentializing memory access as much as possible. How the data is laid out in each of these memory systems, i.e. the data format, data structures and caches, explains many if not most of the differences between database systems. As an example, consider columnar databases. These relational databases store tables by columns (not rows) which makes it possible to quickly compute aggregates over columns because data access is sequential. That’s why they outperform their row-oriented counter parts on analytic queries.

We conclude that a database system is native if the data formats and structures it uses effectively sequentialize memory access across the memory hierarchy for the targeted type of workload.

Embedding a Graph in a 1-Dimensional Space

Let us now apply the concept of native computing to graph databases. Graph databases need to efficiently execute arbitrary graph traversals. A graph traversal is a restricted walk over the graph, moving from one vertex to its adjacent vertices via a selected set of incident edges. Without making any assumption on the type of traversal to be executed, it follows that a graph database needs to store vertices, their incident edges and their adjacent vertices in close proximity in the memory systems in order to sequentialize memory access (see Scalable Graph Computing: Der Gekrümmte Graph). However, those vertices have other adjacent vertices which makes it impossible to keep everything sequential (save in the most trivial graph topologies).

Consider the small graph on the left. Pick any vertex. Linearly write down that vertex, its edges and its adjacent vertices. With that initial choice made, it becomes increasingly difficult — and ultimately impossible — to add the other vertices, their edges and adjacencies into your linear list without pulling adjacent vertices apart. What you are attempting to do, and what every graph database needs to do, is to topologically embed a graph into a 1-dimensional space. There is a branch of mathematics called topological graph theory which studies such graph embeddings for arbitrary spaces and graphs. Unless the graph has no edges or forms a linear chain, there is no (strict) embedding into a 1-dimensional space. Hence, for all but the simplest graphs there exists no native data representation on typical Von Neumann computers which require sequential memory layout.

We conclude that there is no such thing as a native graph database for current computing systems.

Approximations to Embedding a Graph in a 1-Dimensional Space

Without a perfect mapping, the best that can be achieved is an approximation. Thus, the layout of a graph in sequential memory turns into an optimization problem. The question then becomes: “Can we lay out the graph sequentially so that most vertices have their neighbor close by?” For any given graph layout in memory, we can define the distance between two adjacent vertices as the absolute difference in their (sequential) memory addresses. Then, we define the objective function as the sum of distances of all adjacent vertex pairs. The optimal graph data layout is the one that minimizes this objective function. Unfortunately, this optimization problem is NP-hard, which means that the best we can hope for in practice are approximations (unless P=NP). Even those approximations are pretty expensive to compute. For example, in my research on disk-oriented graph layouts (see Disk-Oriented Graph Management), I propose an approximation method based on hierarchical clustering which took hours to calculate on graphs with 100s of millions of edges and requires that the graph structure remain fixed. A static graph structure is not an option if you need to update the graph in real-time (i.e. the typical use-case for a graph database). There is a vast trove of research on the subject of optimal graph embeddings, which I encourage you to read if you are interested in the fascinating world of hyper-dimensional manifolds, but let me conclude here that this problem is difficult, far from solved, and an area of active research.

We conclude that a graph database should be judged on the tradeoffs it makes concerning computational/memory complexity and the quality of its graph embeddings.

Titan’s Approach to Embedding a Graph in a 1-Dimensional Space

Adjacency ListLet me illustrate the tradeoff choices that the graph database Titan has made in embedding a graph into a modern computer. Titan internally represents a graph as an adjacency list. This means it stores each vertex with all of its properties and incident edges in one sequential list. This has the benefit that Titan can access all edges and their properties with one sequential scan. In fact, Titan uses a more sophisticated variant of the adjacency list format where edges are stored in a user-defined sort order. For each vertex, an offset index is built that allows for fast retrieval of subsets of the incident edges that fall within a given sequential range. We call these indices vertex-centric indices and they significantly speed up graph traversals on large graphs which have vertices with many edges — so called supernodes (see A Solution to the Supernode Problem). The downside of vertex-centric indices is that they require building index structures, keeping edges in sort order, and each edge is stored twice. This, in turn, requires more complex data structures and regular compactions of the data files after updates to the graph. Once off disk, Titan maintains the sorted adjacency list format in RAM using binary search instead of building index structures to acknowledge the slightly more favorable random access times of RAM. This allows the CPU to effectively prefetch data on traversals. We see that Titan expends additional storage space and computation to achieve a more sequential representation of the graph for quicker data retrieval.

Titan: Distributed Graph Database Once vertices are co-located with their incident edges, the next problem is, how should vertices be co-located with their adjacent vertices? Vertices are laid out sequentially in random or guided order. With random order there is no data locality between adjacent vertices but its trivial to compute and leads to balanced partitions in distributed environments. Alternatively, Titan provides a “guided order” mode which allows users to plug-in a heuristic function that tells Titan which partition block a particular vertex should be assigned to or which other vertex it should be close to. While Titan handles all of the logistics, the “intelligence” of the allocation is in the heuristic function. We believe that for some graphs it is pretty easy to come up with a good heuristic function with enough domain knowledge. For instance, the Facebook graph can be effectively partitioned using user geography as a heuristic (see Balanced Label Propagation for Partitioning Massive Graphs). This is not surprising since users who live close by are more likely to be friends on Facebook. Heuristic partitioning works well on some graphs, but requires more computation on update and is prone to imbalances in distributed deployments if not used carefully.

To conclude, the next time somebody tries to sell you a “native” graph database, be sure to reference this letter and ask them how they lay out their graph throughout all the sequential memory layers and decide for yourself the optimality of their embedding (both on disk and in memory)

Conclusion

The intention of this letter was to point out the challenges that all graph databases are facing, vindicate Titan from arbitrary categorization by vendor marketing, and to ensure that the public is properly educated on the topic at hand. All arguments presented are based on generally available computer science knowledge. Please comment on Hacker News if you find the argument flawed, lacking a proper citation, or wish to discuss the thesis at greater length. Moreover, if you are interested in this area of technology, please join Aurelius’ public mailing list and participate in helping us develop the next generation of graph technologies.

Acknowledgements

Dr. Marko A. Rodriguez provided editorial review of this letter in order to make the presented argument more concise, supported with images, and easier to consume for the general reader. Pavel Yaskevich provided astute points regarding the interplay between CPU caches and RAM.

Author

Matthias Broecheler

Developing a Domain Specific Language in Gremlin

Developing a Gremlin-based DSL Domain Specific Languages (DSLs) provide a way for programmers to increase the expressivity of their code within a specific domain. It enables the developer to gain greater productivity by hiding low-level programming constructs in favor of higher level ones that better link to the elements of the domain itself. DSLs also have the benefit of making it possible for non-programmers to “speak” in the same language as their programming counterparts, thus lowering technical communication barriers.

Pearson’s OpenClass Education-based DSL

An earlier Aurelius blog post entitled “Educating the Planet with Pearson,” spoke of the OpenClass platform and Titan’s role in Pearson’s goal of “providing an education to anyone, anywhere on the planet”. It described the educational domain space and provided a high-level explanation of some of the conceptual entity and relationship types in the graph. For example, the graph modeled students enrolling in courses, people discussing content, content referencing concepts and other entities relating to each other in different ways. When thinking in graph terminology, these “conceptual entity and relationship types” are expressed as vertices (e.g. dots, nodes) and edges (e.g. lines, relationships), so in essence, the domain model embeds conceptual meaning into graph elements.

Domain Over GraphAt Pearson, the OpenClass domain model is extended into a programmatic construct, a DSL based on Gremlin, which abstracts away the language of the graph. Engineers and analysts can then ask questions of the graph in their educational domain language, as opposed to translating those familiar terms into the language of vertices and edges. The OpenClass DSL defines the graph schema, extends the Gremlin graph traversal language into the language of education, provides standalone functions that operate over these extensions, and exposes algorithms that are developed from those extensions and functions. Together these components form a coarsely-grained API which helps bring general accessibility to complex graph traversals.

Extending Gremlin into a DSL

Gremlin is a Groovy-based DSL for traversing property graphs and has applicability in the areas of graph query, analysis, and manipulation. It provides for an intuitive way to not only think in terms of a graph, but to program in terms of one as well. One of the interesting properties of Gremlin is that it allows programmers to extend upon the language for even greater programmatic expressiveness, not just within graphs, but within the domain that the graph itself resides.

Person Wrote Post ModelAn important aspect to Pearson’s graph is the notion of “people discussing content.” For purpose of the upcoming examples, think of “content” as an online discussion forum, with instructor assigned topics for students to share discourse and open debate. A person writes a post which may be in reference to a different post that was written earlier.

Traversing Discussions in Graph Terminology

Traversing the GraphGiven the “people discussing content” graph structure, a Gremlin novice could immediately begin to navigate the graph. Asking the graph for a list of all vertices with a property key of type and a value of post yields the list of posts in the graph. The Gremlin for such a request is below:

g.V.has('type','post')

The traversal becomes a bit more involved when there is a need to walk the depth of the tree of posts:

g.V.has('type','post').out('child').loop(1){it.loops<25}{true}

To analyze and compare threads of posts within the tree, the path of each thread needs to be analyzed, such that each thread be flattened into a Map, where the key is the userName of the user who wrote the first post in the thread and the value is a unique list of vertices for the threads the user started:

m=[:]
g.V.has('type','post').out('child').loop(1){it.loops<25}{true}
   .path.groupBy(m){it[0].userName}{it}{it.collectMany{it}.unique()}.iterate()

Evaluating m after execution of the traversal would yield post vertices arranged as follows:

gremlin> m
==>marko=[v[184476380], v[282106584], v[184550536], v[189966816]]
==>josh=[v[173318448], v[188571048]]
==>daniel=[v[186130596], v[308964172]]
...
==>stephen=[v[176281532], v[182440524], v[188572948], v[282049412]]

It is not important to understand the mechanics of the Gremlin above. Its intent is to demonstrate a bit of confusion, in the sense that even a Gremlin expert might have to take a moment to deconstruct what this code is doing. Consider for a moment just how much someone unfamiliar with graphs would have to learn in order to get a set of flattened post threads into this format. The value of a DSL suddenly becomes apparent.

Traversing Discussions in the Domain Terminology

Traversing the DomainDeveloping a DSL can begin with a standalone Groovy script that can be referenced when starting a Gremln REPL or initialized into Rexster or Titan Server through the init-script configuration element of rexster.xml. In the case of OpenClass, the DSL has evolved well past the early development needs that a “script” satisfies and is now engineered as a Maven-based project deployed as a standalone JAR file.

It is a good practice to use the DSL to centralize the property name strings that make up the graph’s schema. Avoiding the use of hard-coded strings eases future refactoring efforts and makes it straightforward to identify property usage within the DSL itself.

class S {
  public static final String EDGE_CHILD = "child"
  public static final String PROPERTY_POST_ID = "postId"
  public static final String PROPERTY_TYPE = "type"
  public static final String TYPE_POST = "post"
}

Examining the Gremlin traversals from the previous section, it can be seen that there is some commonality to them in that they all start with similar types of statements, each building on the next to add additional layers of processing. With Gremlin’s User Defined Steps, it is possible to build composable base steps that extend the language of the graph or operate at a level of abstraction higher than the graph itself.

First, define a class that will be responsible for containing the step definitions and for initializing them into Gremlin:

class Steps {
  def load() {
    // this method will call methods that will initialize each step definition.
    // from the Gremlin REPL or other code base that utilizes the steps, simply
    // call new Steps().load() to make the steps available.  
  }
}

With the Steps class in place, a first step definition can be added to encapsulate post filtering:

class Steps {
  def load() {
    defineStepPost()
  }

  private def defineStepPost() {
    Gremlin.defineStep('post', [Vertex, Pipe], { _().has(S.PROPERTY_TYPE, S.TYPE_POST) })
  }
}

Including this step simplifies the three Gremlin statements written in the previous section to:

g.V.post

g.V.post.out('child').loop(1){it.loops<25}{true}

m=[:]
g.V.post.out('child').loop(1){it.loops<25}{true}
   .path.groupBy(m){it[0].userName}{it}{it.collectMany{it}.unique()}.iterate()

Gremlin GraduateThe post step replaces usage of has(S.PROPERTY_TYPE, S.TYPE_POST). That change doesn’t make the code much more readable, but it is a start. Continuing with the example, two additional steps are included, one to traverse the tree of post vertices and one to flatten each thread (or discussion path):

class Steps {
  public static final int CONTROL_MAX_DEPTH = 25
  def load() {    
    defineStepPost()
    defineStepPostTree()
    defineStepFlattenThread()
  }

  private def defineStepPost() {
    Gremlin.defineStep('post', [Vertex, Pipe], { _().has(S.PROPERTY_TYPE, S.TYPE_POST) })
  }

  private def defineStepPostTree() {
    Gremlin.defineStep('postTree', [Vertex, Pipe], { depth = CONTROL_MAX_DEPTH ->
            _().post.out(S.EDGE_CHILD).loop(1){it.loops<depth}{true}
    })
  }

  private def defineStepFlattenThread() {
    // the addition of .transform{it[0]}.dedup to the end of this Gremlin statement
    // makes flattenThread a pure side-effect in that it converts the output back to
    // the original vertices passed in.
    Gremlin.defineStep('flattenThread', [Vertex, Pipe], { m, depth = CONTROL_MAX_DEPTH, keyOn = null ->
            _().postTree(depth).path.groupBy(m){keyOn == null ? it[0] : keyOn(it[0])}{it}
            {it.collectMany{it}.unique()}.transform{it[0]}.dedup
    })
  }
}

The addition of these steps simplifies the traversals and expands their flexibility:

g.V.post

// traverses to the default depth of 25
g.V.postTree

// traverse to the assigned depth of 256
g.V.postTree(256) 

m=[:];g.V.flattenThread(m).iterate()

// traverse to depth 256
m=[:];g.V.flattenThread(m, 256).iterate() 

// traverse to depth 256, key the Map on the postId of the root vertex instead of the vertex itself 
m=[:];g.V.flattenThread(m, 256, {it.getProperty(PROPERTY_POST_ID)}).iterate()

The steps have also been defined in such a way that the DSL gains the interesting capability to parameterize behavior of the traversal. Parameterization of steps introduces flexibility to the DSL, allowing the consumers of the functions to tune the internals of the traversal for performance, filtering, transformations, etc. Note how the last example of flattenThread provides a closure for the final argument, making it possible to introduce dynamic behavior to traversals. Instead of always keying the map on userName, that behavior is now determined by the user of DSL.

DSL Development Patterns

The list below represents recommended patterns to follow when building DSLs with Gremlin:

  • Centralize property names, edge labels and other string values as global variables. Don’t embed string literals into the DSL.
  • Include a schema class with some sort of initialization function that takes a Blueprints Graph instance as an argument and configures the indices of the graph. A schema class is especially important when using Titan and its Type Definitions.
  • Standalone Groovy scripts are just a starting point for a DSL. Those scripts will quickly grow in complexity and become unmanageable. Treat the DSL as its own project. Use a dependency management and build system like Maven or Gradle to produce compiled code that can be referenced in other projects, pulled into the Gremlin REPL with Grape or copied to the path of Rexster or Titan Server and configured for use with imports and static-imports settings. Note that direct support of Grape in the REPL will be replaced in Gremlin 2.5.0 with the Gremlin.use() function which wraps Grape and performs a similar function.
  • Given deployment of the DSL to Rexster or Titan Server, client-side code no longer needs to pass long, complex Gremlin scripts as strings for remote execution via REST or RexPro. Client applications can just call parameterized functions from the DSL exposed on the server.
  • Write tests. Use the aforementioned schema class to “set-up” a Graph instance on each run of the tests to ensure that changes to the schema do not introduce problems and that new additions to the DSL will work properly in the production graph given that configuration. Use TinkerGraph for a lightweight means to test DSL operations.
  • Write in-line documentation for the schema, User Defined Steps, and other functions, but consider avoiding javadoc. Use a tool like Groc, which processes Markdown formatted text and produces documentation that includes the source code.
  • Design DSL components as composable blocks, such that one or more blocks can be used together for even higher-level operations. When possible, think generically and design functions that can be altered at the time they are called through parameterization with settings and closures.
  • The DSL is not just about extending Gremlin with User Defined Steps. Make use of the Groovy language and write standalone functions that operate on the graph, within User Defined Steps, or anywhere it makes sense to encapsulate graph functionality.
  • Use an IDE, like IntelliJ. Since Gremlin is Groovy, IDE features like syntax validation and code complete help make writing Gremlin more productive.

Conclusion

Gremlin is a general purpose graph traversal language, but developers can extend it with the specific rules and nomenclature of their domain. This additional layer to Gremlin can provide for a robust toolset for programmers, analysts and others interested in the data that the graph contains.

Pearson’s OpenClass DSL continues to expand allowing realization of the following benefits:

  • All logic related to the graph is centralized in the DSL providing a standard interface for any part of the organization that wishes to access the graph for information.
  • Non-programmers leverage the DSL in their work, as there is less “Gremlin-graph” language and more “education-graph” language.
  • Ad-hoc analysis of the graph tends to be less error prone and more productive, as the higher-order functions of the DSL are tested versions of common and sometimes mundane traversals (e.g. traversing a tree).
  • Interesting and unintended discoveries occur when exploring the graph by mixing and matching functions of the DSL.

The introduction of a DSL over Gremlin will be beneficial to projects of any size, but will quickly become a requirement as the complexity of the conceptual model of the domain increases. Investing in a DSL to make it a core component of a graph engineering strategy, should be considered a common pattern for productionalizing Gremlin in the TinkerPop and Aurelius technology stacks.

Acknowledgements

Dr. Marko A. Rodriguez read draft versions of this post and provided useful comments.

Authors

Stephen Mallette

Scalable Graph Computing: Der Gekrümmte Graph

Scalable Graph Computing is a multi-part treatment on that for which the series is named.

A graph describes which “things” are adjacent to which other “things.” A logical graph forms a topology separate from the topology of the physical, material world. When computing, the logical structure must be placed in 3D, geometric space. A scalable graph data structure demonstrates a non-random morphism between logical and physical adjacency. This post discusses scalable graph data structures from the following perspective: It is important to co-locate data in space according to the metric of co-retrieval in time. Respecting space/time is the difference between sequential and random memory access. It is the difference between scalable and unscalable (graph) computing.

Co-Location and Co-Retrieval in Computing

If X tends to be retrieved with Y, then X and Y should reside side-by-side. If groceries and then the laundromat are on the todo list then to save time, it is best that they are near one another. With respect to the memory hierarchy, X and Y should be in the same cloud region, same data center, the same server rack, the same machine, on the same page of disk, in the same block of memory, together in L* cache, and — to the limit of identity — in the CPU’s registers. It is ludicrously slower to move data across the Internet to a CPU than to move that same data from the CPU’s L1 cache. The difference is ~50 milliseconds versus ~1 nanosecond, where 50 milliseconds is 50,000,000 nanoseconds. In human terms, suppose 1 second is a nanosecond, then 50 million seconds is 1.58 years. If X is on the CPU and Y is in Connecticut, then a relative eternity must unfold while Y travels its way to X‘s side for comparison/computation (a wait state). Fortunately, real-world computer systems move data in “chunks” and scalable systems use this phenomena to their advantage.

Memory Hierarchy

Tommy MacWilliam in Computer Science E-1

All the bits in a word are necessary to define the word: ?001 could be 0001 (1) or 1001 (9). When one bit of a word is accessed, all bits must be accessed else, the meaning of the word is unclear — ?at could be cat or hat. Local machine buses (bit co-retrieval systems) move a word’s bits together. Bus-inspired co-retrieval exists everywhere. When data is fetched from disk, pages are accessed (i.e. sequential blocks of data). If the page of data with X on it is requested from the OS by the executing program, then Y should have also been on that page, else another trip to disk must be made (seek time is ~5ms). When a human is interested in a webpage, the entire document is downloaded as it is assumed that the sentence after the first will most likely be required next. Similar assumptions are made for streaming audio and video. Data moves towards the CPU from one memory structure to the next in proportion to the size of that memory structure. With any luck (aka a cache hit), the next piece of data required from the CPU will be nearby — not in memory, the disk, nor in Windham, Connecticut.

Localized Computing and Random Access Structures

Physics Computer The function f(x,y) brings two pieces of data together to yield more data. In laymen physics, atoms are data and physical laws are the functions, where the “CPU” is everywhere. An interesting aspect of man-driven computing is that logical adjacency (a graph) does not imply spatial adjacency. Software-based computing, as we know it, is about stitching together “random” areas of memory (space) so that two logically adjacent data can be pulled together for comparison at a spatially localized CPU. Random access computing is both a curse and a blessing. The curse is that physics will not do our job for us — the created graph, while embedded in space, requires a machine that is not physics to evolve it (i.e. a CPU — however see molecular computing). The blessing is that new “things” (new definitions of adjacency) emerge out of 3D reality. In the emergent referential structure, a “thing” can be a “thing” without its parts 3D-near each other. Unfortunately, this level of abstraction comes at the cost of time. For speed, the best the clever engineer can do is try their damnedest to make 3D reality and their data structure isomorphic in the literal sense — the same structure. However, as we believe it, NP!=P and P!=O(1).

[P]rogramming is basically planning and detailing the enormous traffic of words through the Von Neumann bottleneck, and much of that traffic concerns not significant data itself, but where to find it.
    — John Backus (1977)

CPU Computer Three-dimensional space, the things within it, and the physics that evolves it are not expressive enough for the likes of mind — for man has eaten from the tree of knowledge. No longer within the grace of a 1-to-1 correspondence, neurons go about constructing new maps of space/time that unite disparate areas via the construction of concepts in the hopes of realization (i.e. “full comprehension of the situation”). When the brain runs out of storage and compute resources, it goes about its way linking concepts in a computer using data structures (graphs). When a single computer is no longer sufficient, clusters of computers are leveraged. As larger graphs and more complex problems are met, graph algorithms must become more advanced (O(n1/c)) and/or the graph data structures must be represented more efficiently according to the constraints of modern hardware (see the Von Neumann bottleneck which is also a constraint of mind). In the end, both CPUs and brains are not efficient architectures as the data and the process are not one in the same and the data must ultimately be moved to a single point in space (e.g. a CPU’s ALU, which implements the function f(x,y), and the world must go through the senses to the brain).

“Neurons which fire together, wire together.”
    — Donald Hebb on Hebbian Theory

Co-Locating the Structures of a Graph

All The World on a Head of a Pin The structures of a property graph are key/value properties, vertices, edges, incidences, adjacencies, cliques, clusters, super clusters, and graph filaments. Unfortunately, the entire world can not exist on the head of a pin (the CPU’s registers). While a graph is ultimately bits/bytes/words, the aforementioned larger structures hint at the probability of co-retrieval and allow the clever engineer to use that information to place the structures in space.

When a vertex is accessed, its long identifier and key/values properties are typically accessed as well. If a vertex’s incident liked-edges are accessed according to their stars-property, then it is important to have those edges sequentially ordered by 5 star, then 4 star, then 3, and so on with an offset index to where each group can be found (see vertex-centric indices). One of the major benefits of the graph data structure is that adjacency is made explicit and is not up to arbitrary joins. When a vertex is accessed, its adjacent neighbors are known and those adjacents are more likely to be accessed next than a random distant neighbor. Thus, the vertices in a clique should reside side-by-side. Given the size of a clique (potentially 10s of vertices), they should be on the same page of disk. Loose cliques form clusters — e.g., students at a university are more likely to interact with each other than with students in different universities. Partitioning the vertices across physical machine servers according to their university affiliation may respect the principle of co-retrieval/co-location (see Educating the Planet with Pearson). Clusters form super-clusters — universities in the same region of the world are more likely to interact. However, at this scale, other dimensions of the graph (i.e. paths between vertices) could be asserting their effects on co-retrieval. The larger the scale, the harder it is to predict which bit will follow next. Below is a summary of the components of a graph and their co-location in the memory hierarchy according the perspective of a single vertex (where “perspective” is a process at the CPU).

structure       location        description
id              register        A long value uniquely identifying a vertex/edge
property        L1 cache        A string key and a primitive value
incidents       L2/L3 cache     A vertex's incoming and outgoing edges
adjacents       Block of RAM    The vertices incoming and outgoing to a vertex
clique          Page on Disk    A group of collectively adjacent vertices
cluster         Server          A group of vertices with high intra-connectivity and low inter-connectivity
super cluster   Server Rack     A group of clusters with high intra-connectivity and low inter-connectivity
filament        Data Center     A massive lattice-like structure connecting super clusters     

A Galactic Simulation via KIPAC

The larger the structure, the slower it evolves as large structures are epiphenomena of their more atomic pieces. While the atoms of computing (bits/words) are the structures processed by the CPU, these mutations at small take time to effect changes at large — nanoseconds, seconds, days, years. The evolution of local neighborhoods in the graph percolate structural changes throughout the system. Thus, optimizing the layout of a universal graph across all data centers in the world is not only NP hard (given the possible number of permutations), but it is also a constantly evolving problem. The principle of divide-and-conquer is used today — optimize for key/values, incident edges, clusters and with any “luck” (a cache hit), the next piece of data is near a CPU.

Conclusion

The concept of computing is very general and can be implemented in a multitude of ways. In the everyday real-world, the Von Neumann machine architecture is the standard. These machines are composed of a CPU, memory, and a disk. They are networked together using high-speed interconnects within the same server rack. Server racks are stored in data centers. Data centers exist in regions across the globe. As graph structures become larger, so does the computing resources required to represent and process them. However, any one micro-structure of the universal graph is ultimately operated on by a particular CPU in the cluster. The collective computation of all CPUs yields the evolution of man’s (re)definition of reality contorted in space.

Acknowledgments

Dr. Matthias Bröcheler is an advocate of algorithm and data structure designs that are sympathetic to hardware — tailored to the memory hierarchy. Discussions on such matters fostered the writing of this post.

Authors

Marko A. Rodriguez

Loopy Lattices Redux

Loopy Lattices Redux A graph can be realized as a tessellated surface. Each vertex (or point) on the surface has a set of neighbors. Two vertices are neighbors if there is an edge that connects them. For a discrete traverser (e.g. a marble), movement in a space requires a decision at each vertex. The traverser must decide whether to go left, up, east, right, west, or down — i.e., it must choose one and only one edge emanating from its current vertex. The edge chosen leads to yet another vertex. At which point, another choice must be made ad infinitum.

k-Regular Graphs: k-Ary String Generators

Binary State Machine A directed graph is k-regular if each vertex has k adjacent vertices. Starting from any vertex, there are kn possible n-length paths in a k-regular graph. 2-Regular Graph For a 2-regular graph (k=2), the number of unique paths is equivalent to the number of objects that can be represented by a binary string. For instance, there are 4,294,967,296 length 32 paths in a 2-regular graph (232). This relationship is apparent if every vertex’s incident edges are labeled with either a 0 or 1. If the traversers print the edge label at each step in an n-step walk, then all possible n-length binary strings will be generated.

A graph with 2-degree vertices is rare in nature. Binary Tree A graph with 10+ degree vertices is more common, though such k-regularity is uncommon. A 10-regular graph has 100,000,000,000,000,000,000,000,000,000,000 unique 32-length paths (1032). Tiny Binary Machine Combinatorial explosions are readily apparent when computing on natural graphs. Such truths effect the way in which technology/algorithms should be implemented when solving real-world graph problems.

Lattice Graphs: Binary Strings with an Equal Number of 0s and 1s

Lattice Graph A lattice is a planar graph with a rectangular tessellation. The “inside” of a finite lattice is regular in that each vertex connects to the same number of vertices, but the “outside” is not regular in that there are no neighbors beyond the border vertices. A 20x20 directed lattice has 441 vertices and 840 edges. As shown analytically in the original Loopy Lattices post, there are 137,846,528,820 (137.85 billion) unique paths from the top-left vertex to the bottom-right vertex. A discrete traverser must take 40 steps to make the journey. Of those 40 steps, an equal number of 1s and 0s will be “printed.” Thus, the problem of determining how many unique paths there are in a directed 20x20 lattice is a question of how many unique 40-length binary strings exist such that there is an equal number of 1s and 0s. This constraint yields a number that is much smaller than 2n. In the previous post, Gremlin (in its depth-first, enumerative form) could not calculate the answer due to the explosion of possibilities. Therefore, to answer the question, the closed form solution below was provided. The solution says “2n choose n” or, in particular, “40 choose 20.” The 20 chosen 1 slots in the 40-length binary string forces the remaining 20 positions to be 0.

Titan vs. Faunus: The Enumerative/Counting Distinction

A graph database such as Titan can be used to store a 20x20 lattice. While a 840 edge graph is extremely small for a “database,” it is necessary for the experiment to follow. The Gremlin/Groovy code to create a 20x20 lattice in Titan is provided below.

def generateLattice(n) {
  g = TitanFactory.open('bin/cassandra.local')
   
  // total number of vertices
  max = Math.pow((n+1),2)
   
  // generate the vertices
  vertices = [];
  (1..max).each { vertices.add(g.addVertex()) }
     
  // generate the edges
  vertices.eachWithIndex { v, i ->
    right = i + 1
    if (((right % (n + 1)) > 0) && (right <= max)) {
      v.addEdge('link', vertices[right])
    }
 
    down = i + n + 1
    if (down < max) {
      v.addEdge('link', vertices[down])
    }
  }
  g.commit();
  return g
}

Traversing a Lattice with Titan

Titan: Distributed Graph Database With Titan/Gremlin, it is possible to count the number of 1-, 2-, 3-, …, 40-length paths in the 20x20 lattice. A traversal of length 40 is a full traversal of the lattice from the top-left to the bottom-right. The traversal code and the runtimes are provide below. An exponentiating runtime is realized: 10.9 minutes (28 steps), 2.3 hours (29 steps), and 9.8 hours (30 steps) traversals. The calculation was halted on the 31st step. The number of 40-length paths could not be reasonably calculated with Titan/Gremlin. By exhaustively enumerating paths and with the number of paths growing exponentially as the path length increases, a calculation of this form is doomed for large values of n.

/usr/local/titan-all-0.3.1$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = generateLattice(20)
==>titangraph[cassandrathrift:127.0.0.1]
gremlin> g.V.count()
==>441
gremlin> g.E.count()
==>840
gremlin> (1..40).each{ l ->
  t = System.currentTimeMillis();
  c = g.v(4).out.loop(1){it.loops <= l}.count()
  t = System.currentTimeMillis() - t;
  println(l + ":" + c + ":" + t)
}

path length path count traversal time (ms) path length path count traversal time (ms)
1 2 9 2 4 2
3 8 1 4 16 2
5 32 3 6 64 5
7 128 10 8 256 17
9 512 36 10 1024 62
11 2048 74 12 4096 96
13 8192 49 14 16384 61
15 32768 88 16 65536 163
17 131072 325 18 262144 646
19 524288 1296 20 1048576 2573
21 2097150 5172 22 4194258 10306
23 8388054 20659 24 16772566 41555
25 33523880 82993 26 66941500 166828
27 133422540 331954 28 265069020 654070
29 523921830 8288566 30 1027813650 35512124

Traversing a Lattice with Faunus

Faunus: Graph Analytics Engine Faunus is a graph analytics engine that leverages Hadoop and a breadth-first implementation of Gremlin. With Faunus, paths can be enumerated in a similar fashion to Titan/Gremlin, but if only counts are required (destinations vs. paths), then it is more efficient to propagate and sum counters. Pascal's Triangle Instead of explicitly storing each and every Gremlin at every vertex, Faunus stores the number of Gremlins at each vertex. This is the difference between representing a list of length m and a long with value m. A consequence of this model is that it is possible to efficiently compute the number of 40-length paths in a 20x20 lattice with Faunus. This counter propagation mechanism is analogous to the mechanical technique for computing binomial coefficients as proposed by Blaise Pascal via Pascal’s triangle (in the Western world). It is important to note the breadth-first requirement of this computation.

Enumeration vs. Counting

g = FaunusFactory.open('bin/titan-cassandra-input.properties')
t = System.currentTimeMillis()
// As of Faunus 0.3.1, the loop() construct is not supported
g.v(4).out.out.out.out.count() // 4-steps
t = System.currentTimeMillis() - t

The number of paths of length n in a 20x20 lattice is plotted below in both y-linear (left) and y-logarithmic (right) form. In short, the number of paths grows exponentially as the path length increases. What is interesting to note in the following table of Faunus counts and runtimes is that Faunus is able to compute the total number of unique 40-length paths in the lattice in 10.53 minutes — 137,846,528,820.

20x20 Lattice Path Count

path length path count traversal time (ms) path length path count traversal time (ms)
1 2 32781 2 4 48071
3 8 63537 4 16 78575
5 32 94078 6 64 109246
7 128 124574 8 256 139850
9 512 156223 10 1024 171549
11 2048 186049 12 4096 201111
13 8192 218417 14 16384 232642
15 32768 248019 16 65536 263355
17 131072 278685 18 262144 296912
19 524288 308225 20 1048576 324440
21 2097150 340823 22 4194258 355349
23 8388054 371546 24 16772566 385755
25 33523880 402189 26 66941500 416868
27 133422540 433917 28 265069020 448150
29 523921830 462522 30 1027813650 478595
31 1995537270 493224 32 3821729910 508492
33 7191874140 524791 34 13237415400 539216
35 23690879520 556108 36 40885872720 568512
37 67156001220 586697 38 102501265020 601196
39 137846528820 617152

Titan’s runtime grows exponentially, in proportion to the number of paths computed. On the other hand, Faunus’ computation time grows linearly when computing an exponential path count. At step 28, Faunus executes the path count faster than Titan. This does not mean that Titan is inherently less efficient at such computations, it is simply a function of the depth-first, enumerative nature of Gremlin/Titan vs. the breadth-first, counter nature of Gremlin/Faunus. Implementing Faunus’ Gremlin engine over Titan would enable Titan to compute such path counts effectively. However, the purpose of Faunus is to serve as that — the global batch processer to Titan.

Titan and Faunus Lattice Traversal Times

Conclusion

20x20 Lattice A graph is a data structure composed of vertices and edges. The natural interpretation of a computation on a graph is a traversal — i.e., a directed walk over the vertices by means of chosen incident edges. An exhaustive exploration of all paths within a graph is typically not feasible because the number of paths grows exponentially as a function of the path length and the graph’s branch factor. As demonstrated with Titan and Faunus, the goal of the traversal and the choice of the traversal engine ultimately determines the feasibility of the computation. Once again, the loopy lattice exposes a simple truth in the theory and practice of graphs.

Acknowledgments

Dr. Vadas Gintautas read draft versions of this post and provided useful comments. Finally, this post was inspired by the work previously done by Bobby Norton, Marko A. Rodriguez, and Vadas Gintautas entitled Loopy Lattices.

Authors

Marko A. Rodriguez

Educating the Planet with Pearson

Titan: Distributed Graph Database Pearson is striving to accomplish the ambitious goal of providing an education to anyone, anywhere on the planet. New data processing technologies and theories in education are moving much of the learning experience into the digital space — into massive open online courses (MOOCs). Two years ago Pearson contacted Aurelius about applying graph theory and network science to this burgeoning space. A prototype proved promising in that it added novel, automated intelligence to the online education experience. However, at the time, there did not exist scalable, open-source graph database technology in the market. It was then that Titan was forged in order to meet the requirement of representing all universities, students, their resources, courses, etc. within a single, unified graph. Moreover, beyond representation, the graph needed to be able to support sub-second, complex graph traversals (i.e. queries) while sustaining at least 1 billion transactions a day. Pearson asked Aurelius a simple question: “Can Titan be used to educate the planet?” This post is Aurelius’ answer.

Data Loading Benchmark

Pearson Education Pearson provides free online education through its OpenClass platform. OpenClass is currently in beta with adoption by ~7000 institutions. To meet the expected growth beyond beta, it is necessary to build the platform on a scalable database system. OpenClass Moreover, it is important to build the platform on a database system that can support advanced algorithms beyond simple get/put-semantics. The latter was demonstrated via the initial prototype. To the former, a simulation of a worldwide education environment was created in Titan to alleviate Pearson’s scalability concerns.

Number of students 3.47 billion
Number of teachers 183 million
Number of courses 788 million
Number of universities 1.2 million
Number of concepts 9 thousand
Number of educational artifacts ~1.5 billion
Total number of vertices 6.24 billion
Total number of edges 121 billion

The simulated world was a graph containing 6.24 billion vertices and 121 billion edges. The edges represent students enrolled in courses, people discussing content, content referencing concepts, teachers teaching courses, material contained in activity streams, universities offering courses, and so forth. Various techniques were leveraged to ensure that the generated data was consistent with a real-world instance. For example, people names were generated from sampling the cross product of the first and last names in the US Census Bureau dataset. Gaussian distributions were applied to determine how many courses a student should be enrolled in (mean of 8) and how many courses a teacher should teach (mean of 4). The course names and descriptions were drawn from the raw MIT OpenCourseWare data dumps. Course names were appended with tokens such as “101,” “1B,” “Advanced,” etc. in order to increase the diversity of the offerings. Student comments in discussions were sampled snippets of text from the electronic books provided by Project Gutenberg. University names were generated from publicly available city name and location datasets in the CommonHubData project. Finally, concepts were linked to materials using OpenCalais. The final raw education dataset was 10 terabytes in size.

A 121 billion edge graph is too large to fit within the confines of a single machine. Fortunately, Titan/Cassandra is a distributed graph database able to represent a graph across a multi-machine cluster. The Amazon EC2 cluster utilized for the simulation was composed of 16 hi1.4xlarge machines. The specification of the machines is itemized below.

  • 60.5 GiB of memory
  • 35 EC2 Compute Units (16 virtual cores and 64-bit)
  • 2 SSD-based volumes each with 1024 GB of instance storage
  • I/O Performance: Very High (10 Gigabit Ethernet)

Educating the Planet - Disk Writes The 10 terabyte, 121 billion edge graph was loaded into the cluster in 1.48 days at a rate of approximately 1.2 million edges a second with 0 failed transactions. These numbers were possible due to new developments in Titan 0.3.0 whereby graph partitioning is achieved using a domain-based byte order partitioner. With respect to education, the dataset maintains a structure where most edges are intra-university rather than inter-university (i.e. students and teachers typically interact with others at their own university, and even more so within their own courses). As such, domain partitioning is possible where vertices within the university (i.e. graph community) are more likely to be co-located on the same physical machine.

Once in Titan, the raw 10 terabyte dataset was transformed to 5 terabytes due to data agnostic Snappy compression and the use of Titan-specific graph compression techniques (e.g. “id deltas,” type definitions, and Kryo serialization). After the data was loaded, it was immediately backed up to Amazon S3. This process took 1.4 hours using Titan’s parallel backup strategy. The nodetool statistics for the Titan/Cassandra cluster are provided below.

Address         Rack    Status   State    Load        Token
10.244.196.10   rack1   Up       Normal   329.44 GB   Token(bytes[c000000000000000])
10.244.194.142  rack1   Up       Normal   348.62 GB   Token(bytes[3000000000000000])
10.244.196.111  rack1   Up       Normal   330.86 GB   Token(bytes[b000000000000000])
10.244.195.155  rack1   Up       Normal   333.57 GB   Token(bytes[a000000000000000])
10.244.195.243  rack1   Up       Normal   330.91 GB   Token(bytes[9000000000000000])
10.244.195.209  rack1   Up       Normal   326.57 GB   Token(bytes[f000000000000000])
10.244.195.93   rack1   Up       Normal   355.26 GB   Token(bytes[4000000000000000])
10.244.195.179  rack1   Up       Normal   325.73 GB   Token(bytes[e000000000000000])
10.244.195.57   rack1   Up       Normal   351.47 GB   Token(bytes[1000000000000000])
10.244.196.27   rack1   Up       Normal   332.87 GB   Token(bytes[d000000000000000])
10.244.196.93   rack1   Up       Normal   351.81 GB   Token(bytes[2000000000000000])
10.244.195.6    rack1   Up       Normal   331.56 GB   Token(bytes[8000000000000000])
10.244.195.7    rack1   Up       Normal   327.55 GB   Token(bytes[0000000000000000])
10.244.196.84   rack1   Up       Normal   345.2 GB    Token(bytes[5000000000000000])
10.244.195.8    rack1   Up       Normal   351.26 GB   Token(bytes[6000000000000000])
10.244.194.178  rack1   Up       Normal   338.07 GB   Token(bytes[7000000000000000])

Transactional Benchmark

Titan Cluster and Application Servers The purpose of the second half of the experiment was to subject the 121 billion edge education graph to numerous concurrent transactions. These transactions simulate users interacting with the graph — solving educational problems, adding more content, discussing ideas with one another, etc. To put the 16 hi1.4xlarge cluster under heavy load, 80 m1.medium machines were spawned. These 80 machines simulate the application servers querying the graph and providing users the front-end experience. Each machine maintained 30 threads in a “while(true)-loop,” randomly selecting 1 of 16 transactional templates below and executing it. Thus, 2,400 concurrent threads communicated with the 16 hi1.4xlarge Titan cluster. A review of the Gremlin queries and their mean runtimes with standard deviations are presented in the table below. Note that many of the transactions are “complex” in that they embody a series of actions taken by a user and thus, in a real-world setting, these would typically be broken up into smaller behavioral units (i.e. multiple individual transactions).

name             # of tx    avg (ms)   std dev   description
scommentread     25550909   211.07     45.56     student reads the most recent comments for their courses
reccourse        5149469    467.37     178.20    students gets recommended courses to take
scommentshare    12825909   394.15     57.98     student reads comments in courses and shares a comment
scontent         20567687   279.32     81.83     student retrieves all content for a single course in their course list
saddfollow       12826912   193.72     22.77     student follows another student
scourses         7720965    233.38     79.44     student retrieves a list of all their courses with description
classmates       12849769   96.962     22.27     student retrieves a list of all their classmates
sprofile         7689669    53.740     22.61     student retrieves their profile
recfriend2       5178945    155.75     44.60     student is recommended people to follow (version 1)
scourseactivity  10371133   565.76     189.80    student retrieves the top 10 most recent activities in their courses
scommentadd      5182558    281.90     44.326    student reads course comments and comments at some depth in the discussion tree
recfriend1       5189435    241.33     256.48    student is recommended people to follow (version 2)
sreshare         12850473   284.07     68.20     student reads their stream and shares an item with followers
ssharelink       5140363    261.58     35.75     student shares a link with followers
sdiscussadd      2604696    246.35     34.64     student browses courses and then adds a new discussion topic to a course
sstream          76301001   224.93     84.48     student reads their personal stream     

The transactional benchmark ran for 6.25 hours and executed 228 million complex transactions at a rate of 10,267 tx/sec. Provided the consistent behavior over those 6.25 hours, it is inferred that Titan can serve ~887 million transactions a day. Given the complex nature of the transactions, a real-world production system of this form should be able to sustain greater than 1 billion transactions a day.

Conclusion

Titan: Distributed Graph Database A first attempt at this benchmark was done at the turn of the year 2012. That benchmark was for a 260 billion edge graph (7 billion students) constructed using the same graph generation techniques described in this post. The graph data was loaded, but the transactional benchmark was never executed because the requisite cluster architecture could not be fulfilled by Amazon EC2 (all 32 high I/O machines in the same placement group). Thus, the smaller-scale benchmark presented here was what was possible given EC2’s resources at the time.

While this benchmark demonstrates that Titan can handle the data scale and transactional load of “99%” of applications developed today, Aurelius is not sufficiently satisfied with the presented transactional results. By reducing the transactional load to ~5,000 tx/sec, the runtimes of the queries dropped to the 100 millisecond range. The desired result was to achieve ~100ms query times at 10k tx/sec and 250ms at 25k tx/sec. Fortunately, much was learned about Titan/Cassandra during this exploration. Advances in both inter-cluster connection management and machine-aware traversal forwarding are currently in development in order to reach the desired scaling goals.

A scalable graph database is only half the story — the story presented here. Once scaling has been accomplished, the next problem is yielding knowledge from data. For the last 2 years, Aurelius has been working with Pearson to develop novel algorithms for the education space that move beyond the typical activities seen in current web-systems: profiles, streams, recommendations, etc. Online education is ripe for automating much of the intelligence currently contributed by students and teachers. With a collective graphical model, Aurelius’ algorithms move beyond creating a dynamic interface to providing computational human augmentation. These techniques generalize and should prove fruitful to other domains.

Acknowledgements

This report was made possible due to funding provided by Pearson Education. Over the 5 months that this benchmark was developed, the following individuals provided support: Pavel Yaskevich (Cassandra tuning and Titan development) and Stephen Mallette/Blake Eggleston (RexPro development and testing). Various issues in Cassandra were identified at scale and the Cassandra community readily accepted patches and quickly released newer versions in support of the effort. Note that Titan is an open source, Apache2 licensed graph database. The Titan community has supported the project via patch submissions and testing across use cases and cluster configurations. Finally, Steve Hill of Pearson has done much to ensure Titan’s success — pushing Aurelius to answer the question: “Can Titan educate the planet?”

Authors

Matthias Broecheler Dan LaRocque Marko A. Rodriguez

Titan Server: From a Single Server to a Highly Available Cluster

Titan Growth Titan is a distributed graph database capable of storing graphs on the order of hundreds of billions of edges while, at the same time, supporting billions of real-time graph traversals a day. For most graph applications, the high-end performance aspects of Titan will never be reached. This does not mean that Titan is unsuitable for graph applications at the smaller scale — in the billions of edges and below. The purpose of this post is to introduce Titan from the perspective of a team of engineers developing a new graph-based application. These engineers will initially develop and test their codebase using a single Titan Server. When the application matures and is ready for production use, a highly-available setup is deployed. Finally, as the application becomes more popular and the data size and transactional load increases, a fully distributed cluster is leveraged. Growing a Titan database from a single server to a cluster is simply a matter of configuration. In this way, Titan gracefully scales to accommodate the changing requirements of a graph application.

Titan Single Server

Titan Single MachineAugustus and Tiberius are two software engineers who have designed an application that represents the Gods and people of Rome within a graph of familial relationships — a genealogy application. The intention is that Roman scholars will use the application to better understand the social fabric of their great Empire. While the intention is single-user, the two engineers decide to leverage Titan as the backend graph database. For one, Titan is completely free for any use (Apache 2 licensed) and two, it supports both single server and distributed deployments. The latter is important to them because the Greek Oracle of Delphi foretold that a genealogy graph would one day be used online by everyone throughout the Roman Empire.

$ wget http://s3.thinkaurelius.com/downloads/titan/titan-cassandra-0.3.0.zip
$ unzip titan-cassandra-0.3.0.zip
$ cd titan-cassandra-0.3.0
$ sudo bin/titan.sh config/titan-server-rexster.xml config/titan-server-cassandra.properties
13/03/27 12:40:32 INFO service.CassandraDaemon: JVM vendor/version: Java HotSpot(TM) 64-Bit Server VM/1.7.0_12-ea
13/03/27 12:40:32 INFO service.CassandraDaemon: Heap size: 40566784/477233152
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Loading settings from file:/Users/marko/software/aurelius/titan/config/cassandra.yaml
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Global memtable threshold is enabled at 151MB
13/03/27 12:40:32 INFO service.CacheService: Initializing key cache with capacity of 2 MBs.
13/03/27 12:40:35 INFO server.RexProRexsterServer: RexPro serving on port: [8184]
13/03/27 12:40:35 INFO server.HttpRexsterServer: Rexster Server running on: [http://localhost:8182]
13/03/27 12:40:35 INFO server.ShutdownManager: Bound shutdown socket to /127.0.0.1:8183. Starting listener thread for shutdown requests.
...

Users without wget can use curl -O or download from the Titan download page.

The above sequence of 4 shell commands downloads and starts up a Titan Server on the localhost. Titan Server embeds both Cassandra and (a lightweight version of) Rexster within the same JVM. Titan Server exposes the following language-agnostic endpoints for developers to communicate with the graph:

  1. A RESTful endpoint available at http://localhost:8182/graphs.
  2. A RexPro binary protocol endpoint available on port 8184.

Titan HTTP/RexPro

Titan Server is configured via two primary files: titan-server-rexster.xml (shown below) and cassandra.yaml (discussed in the next section). These files are located in the config/ directory of the titan-cassandra-x.y.z distribution.

<rexster>
    <http>
        <server-port>8182</server-port>
        <server-host>0.0.0.0</server-host>
        <base-uri>http://localhost</base-uri>
        <character-set>UTF-8</character-set>
        <enable-jmx>false</enable-jmx>
        <max-post-size>2097152</max-post-size>
        <max-header-size>8192</max-header-size>
        <upload-timeout-millis>30000</upload-timeout-millis>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </http>
    <rexpro>
        <server-port>8184</server-port>
        <server-host>0.0.0.0</server-host>
        <session-max-idle>1790000</session-max-idle>
        <session-check-interval>3000000</session-check-interval>
        <connection-max-idle>180000</connection-max-idle>
        <connection-check-interval>3000000</connection-check-interval>
        <enable-jmx>false</enable-jmx>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </rexpro>
    <security>
        <authentication>
            <type>none</type>
        </authentication>
    </security>
    <shutdown-port>8183</shutdown-port>
    <shutdown-host>127.0.0.1</shutdown-host>
</rexster>

NOTE: Along with the above endpoints, Titan Server also exposes a JVM native serialization interface that can be used by JVM languages. This interface, for example, is the means by which Faunus/Hadoop interacts with Titan Server for global graph analytics. For more information on this endpoint, see Using Cassandra.

Titan Highly Available

The genealogy application was showing promise as a single-user system for studying the genetic history of the Roman people and Gods. Due to the positive response, Augustus and Tiberius decide that a multi-user online genealogy service would be a successful product.

// how many siblings did jupiter have?
g.V('name','jupiter').out('brother','sister').count() 
// who is caesar's grandmother?
g.V('name','caesar').out('mother').out('mother').name 
// who are marcus' children's in-laws?
g.V('name','marcus').in('father').has('gender','M').out('married').out('father','mother').name 

As it currently stands, the genealogy data set is approximately 1 billion edges. Therefore, it can be stored and processed on a single machine. As a single-user application a single Titan Server suffices. However, with multiple users, it is important that the system is robust and can serve numerous concurrent requests. If the application is only backed by a single server, then if that server goes down, the application is unusable. To ensure 1.) no single point of failure and 2.) support for more transactions per second, Augustus and Tiberius deploy 3 machines each with a Titan Server installed.

Titan Highly Available The team updates the default config/cassandra.yaml file of each Titan Server by changing the localhost property value to be the IP address of the machine and adding a seed IP address for discoverability (see Multinode Cluster). Next, they start each Titan Server one after the other (titan.sh). To ensure that the servers properly clustered together, they use Cassandra’s nodetool.

apache-cassandra-1.2.3$ bin/nodetool ring

Datacenter: datacenter1
==========
Replicas: 1

Address         Rack        Status State   Load            Owns                Token
                                                                               57715295010532946864463892271081778854
10.223.14.57    rack1       Up     Normal  93.06 KB        49.28%              141555886663081320436455748965948652071
10.174.123.131  rack1       Up     Normal  59.73 KB        33.44%              28311611028231080169766921879398209884
10.196.0.207    rack1       Up     Normal  9.43 KB         17.28%              57715295010532946864463892271081778854

Finally, on one of the servers, the cassandra-cli tool is used to update the replication factor of the titan-keyspace.

apache-cassandra-1.2.3$ bin/cassandra-cli -h 10.174.123.131

[default@unknown] update keyspace titan with strategy_options = {replication_factor:3};
a3b7e1a3-4a88-3769-8c5e-90cda4fec0e1
[default@unknown] show schema titan;
create keyspace titan
  with placement_strategy = 'SimpleStrategy'
  and strategy_options = {replication_factor : 3}
  and durable_writes = true;

Roman Server Room With a replication factor of 3, each of the 3 Titan Servers are the primary host of approximately one-third of the vertices in the graph while, at the same time, each maintains a replica of the primary data of the other two servers. In this way, a highly-available, master-master setup is rendered. With this model, there is no single point of failure. If one of the database machines goes down, the other two are able to serve the primary data of the dead machine. If two of the machines go down, the remaining machine can serve data — albeit not with the same throughput possible when all three machines are available. With full master-master replication, the graph is duplicated and each server can support both reads and writes to the graph.

Titan Clustered

Roman Forum The following summer, the prophecy of the Oracle of Delphi comes true. An announcement is made in the Roman forum about the utility of the online genealogy application. Immediately, the plebeians of Rome join the site. They feverishly add their family histories and traverse the graph to learn more about their genetic past. This spike in usage puts an excessive amount of strain on the servers. With so many concurrent users, the three server machines have their CPU and disk I/O peaked trying to process requests.

Titan Clustered To remedy the situation, 6 more Titan Server machines are added to the cluster for a total of 9 machines. The token ring is rebalanced to ensure that each server maintains a relatively equal amount of the graph. A perfect/fair partition of 2^128 into 9 parts is below (see token ring calculator).

0
18904575940052135809661593108510408704
37809151880104271619323186217020817408
56713727820156407428984779325531226112
75618303760208543238646372434041634816
94522879700260688493040931281842470912
113427455640312814857969558651062452224
132332031580364960112364117498863288320
151236607520417086477292744868083269632

Each machine has its token updated using the following nodetool command. By repartitioning the token ring, the 3 original servers transfer their data to the newly on-boarded servers in order to distributed the data load as specified by their location in the 128-bit token space (each vertex hashes to a particular 128-bit token).

apache-cassandra-1.2.3$ bin/nodetool -h 10.223.14.57 move 0
apache-cassandra-1.2.3$ bin/nodetool -h 10.174.123.131 move 18904575940052135809661593108510408704
apache-cassandra-1.2.3$ bin/nodetool -h 10.196.0.207 move 37809151880104271619323186217020817408
...
..
.

Token Ring Partition

With the replication factor still set to 3, each server does not maintain a full replica of the graph. Instead, each server only replicates a third of the full graph (3/9). At this point, no single server has a full picture of the graph. However, because there are more servers, more transactions can be served and more data can be stored. Augustus and Tiberius have successfully grown their single-user graph application to a distributed system that stores and processes a massive genealogy graph represented across a cluster of Titan Server machines.

Conclusion

Titan Head Titan was developed from the outset to support OLTP distributed graph storage and processing. While it is important that a graph database can scale indefinitely, less than 1% of applications written today will ever leverage near trillion edge graphs. The other 99% of applications will store and process million and billion edge graphs. Titan is able to meet the requirements of both segments of the graph application space. Furthermore, Titan scales gracefully as developers move from a single server prototype, to a highly-available production system, to ultimately, a fully distributed cluster sustaining the size and workload requirements seen by 1% of applications.

Acknowledgements

Stephen Mallette and Blake Eggleston are the developers of Rexster’s RexPro. Their efforts were a driving force behind the development of Titan Server.

Authors


Marko A. Rodriguez Matthias Broecheler

Follow

Get every new post delivered to your Inbox.

Join 135 other followers