Boutique Graph Data with Titan

Boutique Graph Data Titan is a distributed graph database capable of supporting graphs on the order of 100 billion edges and sustaining on the order of 1 billion transactions a day (see Educating the Planet with Pearson). Software architectures that leverage such Big Graph Data typically have 100s of application servers traversing a distributed graph represented across a multi-machine cluster. These architectures are not common in that perhaps only 1% of applications written today require that level of software/machine power to function. The other 99% of applications may only require a single machine to store and query their data (with a few extra nodes for high availability). Such boutique graph applications, which typically maintain on the order of 100 million edges, are more elegantly served by Titan 0.4.1+. In Titan 0.4.1, the in-memory caches have been advanced to support faster traversals which makes Titan’s single-machine performance comparable to other single machine-oriented graph databases. Moreover, as the application scales beyond the confines of a single machine, simply adding more nodes to the Titan cluster allows boutique graph applications to seamlessly grow to become Big Graph Data applications (see Single Server to Highly Available Cluster).

Graph Representations On-Disk and In-Memory

When representing a graph within a computer, the design of both its on-disk and in-memory representations effect its read/write performance. It is important that designs are sympathetic to the graph usage patterns of the application (see A Letter Regarding Native Graph Databases). For instance, when on-disk, it is best that co-retrieved graph data be co-located else costly random disk seeks will be required. Titan leverages an adjacency list representation on-disk where a vertex and its incident edges are co-located. Titan also allows for the sorting of a vertex’s incident edges according to application-specific edge-comparators. These vertex-centric indices support fine-grained retrieval of a vertex’s incident edges from disk and (hopefully) from the same page of disk (see A Solution to the Supernode Problem).

Parthenon Once the graph data is retrieved from disk, it is placed into memory. Titan 0.4.1+ maintains two levels of in-memory caching: warm and hot. Warm caches store the hotspots of the global adjacency list and hot caches store the locally traversed sub-graph of a transaction. Both caches keep their data elements (vertices and edges) as byte-buffers. For traversals that move from vertex-to-vertex using edge labels and properties as filters, edge object creation can be avoided by leveraging the aforementioned vertex-centric sort orders and classic binary search. With typical natural graphs having multiple orders of magnitude more edges than vertices, this greatly reduces the number of objects created and the amount of memory used. Moreover, this allows memory to be accessed sequentially which increases the chance of a CPU cache hit (see Random vs. Sequential Memory Access).

In sum, the updates in Titan 0.4.1+ address multiple levels of the memory hierarchy in that the disk, the global cache, and the local transactions each have appropriate data structures respective of their ultimate role in graph processing. The benefits of these designs are demonstrated in the sections to follow.

Titan Graph Traversals on a Boutique Graph

Amazon Review Schema In order to test both Titan’s on-disk and in-memory representations at boutique scale, a graph version of Amazon’s web rating data was generated. The benefit of this data set is two fold: 1.) it is a good size (37 million edges) to demonstrate a single machine use case and 2.) it supports edge properties which demonstrate the benefit of Titan’s vertex-centric indices in graph processing. The schema for the graph is represented on the left where there are two types of vertices: users and products. The graph is bipartite in that users link to products by reviewed-edges. The reviewed-edges maintain two properties: score (a float between 1-5) and time (a long in Unix time).

Blueprints The benchmark to follow was executed on two types of Amazon EC2 instances: m2.xlarge (hard disk drive — HDD) and c3.xlarge (solid state drive — SSD). To demonstrate Titan 0.4.1′s relative performance benefits, Titan 0.4.0 and a popular 3rd party graph database were tested as well. The same code was run on all systems by leveraging TinkerPop‘s Blueprints graph API. The JVM’s heap settings were -Xms3G -Xmx3G and default options for all systems were used — except for a warm cache being explicitly enabled for Titan 0.4.1:

cache.db-cache=true
cache.db-cache-size=0.5
cache.db-cache-time=0

The benchmark was executed as enumerated below. Note that for cold cache times, the query runtime for the first query was taken. For warm cache times, the transaction is rolled back and the query is executed again. For hot cache times, the query is not rolled back and the query is re-executed. The actual Gremlin traversals are provided in the code block below minus a .count() which was used to ensure all databases returned the same result set. Finally, note that Titan/Cassandra was used for this experiment though the recent cache developments are available to any of Titan’s underlying storage systems.

Gremlin: A Graph Traversal Language

  1. 300 random user ids were gathered from the raw Amazon web review data.
  2. 100 users were queried with the “user preference”-traversal.
  3. 100 users were queried with the “user similarity”-traversal.
  4. 100 users were queried with the “product recommendation”-traversal.
// what products has the user liked in the last year?
//  user preference
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV


// what users have liked the same products that the user liked within the last year?
//  user similarity
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV
  .inE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).outV()
  .except([user])


// what products are liked by the users that like the same products as the user within the last year? 
//   product recommendation
user.outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV.aggregate(x)
  .inE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).outV
  .except([user])[0.<1000]
  .outE('reviewed').has('score',5f).has('time',T.gte,ONE_YEAR_AGO).inV
  .except(x).groupCount(m)

The benchmark results are available for exploratory viewing at the following URLs (made possible by HighCharts JS).

The plots have more data points on the left-end of the x-axis. To examine the results of these smaller result sets, the plots can be zoomed in by dragging a square over that region of the chart. Each plot line can be activated (or deactivated) by clicking on the respective line’s name in the legend of the plot.

It is important to stress that the results of any benchmark should be understood in the context in which they were executed. This benchmark demonstrates relative performance

  • on a boutique graph dataset (i.e. a relatively small graph),
  • using default settings for the graph databases,
  • against two different long term storage mediums (HDD and SSD),
  • within the resource constraints of a single machine (m2.xlarge and c3.xlarge),
  • executing 3 particular read-based queries,
  • and with only a single-user/thread querying the database at any one time.

Cold Caches Demonstrate On-Disk Performance

Cold Cache

The diagram above plots the number of vertices returned by the traversal on the x-axis against the time it took for the traversal to complete in milliseconds on the y-axis. Titan’s cold cache performance numbers are due in large part to its sequentialization of the graph by co-locating edges with their vertices on disk (see Scalable Graph Computing: Der Gekrümmte Graph). Furthermore, with vertex-centric indices, edges are not only co-located with their vertices but also sorted such that “a score equal to 5″ is quickly fetched using an offset index. When edges are not co-located with their vertices, then the graph is not laid out sequentially. This can lead to poor fetch times as random disk seeks are required (see purple line). The effect of random disk access is made more apparent on the hard disk drive plot (as opposed to the solid state drive plot above). Note that pure sequentialization is difficult to achieve in any multi-user system which allows for random writes. For this reason, various compaction processes occur to combat defragmentation of the data on the disk.

Due to graph compression techniques based on data co-location and logical adjacency, Titan’s disk footprint remains relatively low.

  • Titan/Cassandra — 2.9 gigabytes
  • Popular Graph Database — 15.0 gigabytes

Faunus Sequentialization and a compressed data format are useful when executing distributed OLAP queries with Faunus. Rows of data (i.e. a vertex and its incident edges) can be sequentially pulled off the disk allowing for global parallel scans of the graph to execute more efficiently (see Faunus Provides Big Graph Data Analytics). Moreover, it is faster to move 2.9 gigabytes of data than 15.0 gigabytes.

Warm Caches Demonstrate Off-Heap Cache Performance

Warm Cache

Once data has been pulled off of disk, it is put into memory. Titan has always performed well in pulling graph data off of the underlying storage medium (as seen in the previous section with both HDD and SSD). In terms of in-memory performance, Titan 0.4.1 now boasts a warm cache system. The benefit of this addition is made apparent in the plot above when comparing Titan 0.4.0 and Titan 0.4.1. With Titan 0.4.1, heavily accessed areas of the graph are stored in a global cache composed primarily of byte-buffers. If the data is not in the cache (a cache miss), then Cassandra’s caching mechanisms are leveraged (typically a key-cache and an OS-based page cache). It is important to emphasize that a warm cache miss in Titan 0.4.1 yields runtimes equivalent to those seen in Titan 0.4.0 (which for the complex traversals presented, remain sub-second — see the gray line). These sub-second cache miss times are possible because of Titan’s on-disk representation. Graph sequentialization enables effective use of the OS’s page cache, where “going to disk” will typically not yield random seeks.

Hot Caches Demonstrate On-Heap Transaction Performance

Hot Cache

To answer a query, the traversal primitives first check for data in the current transaction’s hot cache, then the global warm cache, and then on a particular page of disk (which again, may be in memory because of the operating system’s page cache). Wherever the data is ultimately found, it is computed on in the local transaction’s hot cache. For traversals that move from vertex-to-vertex by filtering paths using edge properties and labels, Titan does not need to deserialize the edges into objects. Instead, it operates directly on the byte-buffers stored in the warm cache. This greatly reduces the number of objects created (|V| << |E|), the amount of time spent garbage collecting the young generation heap, and stages Titan nicely for speedy, recursive graph algorithms (to be explored in a future blog post).

Conclusion

AureliusAurelius has consistently touted Titan as a Big Graph Data solution for OLTP graph applications that leverage graphs and transactional loads so large that they require more resources than a single machine can provide. However, for organizations looking to use a graph database, 100s of billions of edges is typically not their starting point. Titan 0.4.1+ better supports boutique shops beginning with graphs and wanting to ultimately procure a massive, universal graph of their domain. Moreover, all this graph processing functionality is provided under the commercial friendly, Apache2, free software license. With Titan 0.4.1+, boutique graph applications can freely move from a single machine, to a highly available single node (e.g. a 3 machine cluster with replication factor 3), to a fully distributed graph represented across an arbitrary number of machines.

Acknowledgements

This post was embarked on by Dr. Rodriguez after he wrote an email to the Aurelius team stating that Titan’s in-memory cache should be optimized for boutique graph use cases. Pavel Yaskevich took up the challenge and was able to increase traversal speeds using software hotspot profiling. After which, Dr. Bröcheler yielded further optimizations by cleverly leveraging the sequential in-memory byte-buffer representations of the graph data. Dr. Julian McAuley provided Aurelius a pre-processed version of the Amazon web review data under the BSD license and Daniel Kuppitz leveraged that data set in order to painstakingly stage and execute the benchmark presented.

Authors

Daniel Kuppitz Matthias Broecheler Marko A. Rodriguez

A Letter Regarding Native Graph Databases

It’s fun to watch marketers create artificial distinctions between products that grab consumer attention. One of my favorite examples is Diamond Shreddies. Shreddies, a whole wheat cereal, has a square shape and was always displayed as such. So an ingenious advertiser at Kraft foods thought to advertise a new and better Diamond Shreddies. It’s a fun twist that got people’s attention and some consumers even proclaimed that Diamond Shreddies tasted better though they obviously ate the same old product.

Such marketing techniques are also used in the technology sector — unfortunately, at a detriment to consumers. Unlike Kraft’s playful approach, there are technical companies that attempt to “educate” engineers on artificial distinctions as if they were real and factual. An example from my domain is the use of the term native graph database. I recently learned that one graph database vendor decided to divide the graph database space into non-native (i.e. square) and native (i.e. diamond) graph databases. Obviously, non-native is boring, or slow, or simply bad and native is exciting, or fast, or simply good.

Problem is: There is no such thing as a native graph database.

On the Concept of Native Computing

Let’s look at the definition of “native” when applied to data as taken directly from Wikipedia’s Native Computing article:

Applied to data, native data formats or communication protocols are those supported by a certain computer hardware or software, with maximal consistency and minimal amount of additional components.

Memory Hierarchy I’m not claiming that Wikipedia is an authority on this subject, but this is a baseline definition we can work with for the purpose of this letter’s argument. From Wikipedia’s definition, it follows that a native graph database is a graph database that represents the graph (i.e. data) maximally consistent with the underlying hardware. Currently, all commercially-available hardware follows the Von Neumann architecture. Under the Von Neumann architecture, the memory subsystems are represented as a sequential memory space. Moreover, in said memory systems, sequential access is significantly faster than random access. Realize this for yourself by writing a very large array into RAM and then comparing sequential vs. random access times. If you are too busy, read Pathologies of Big Data as the author has done the comparison for you on different types of memory systems. If you are regularly working with non-trivial amounts of data, you most definitely should read the Pathologies of Big Data article.

Next, the purpose of any database is to retrieve a query result set by navigating the memory hierarchy and sequentializing memory access as much as possible. How the data is laid out in each of these memory systems, i.e. the data format, data structures and caches, explains many if not most of the differences between database systems. As an example, consider columnar databases. These relational databases store tables by columns (not rows) which makes it possible to quickly compute aggregates over columns because data access is sequential. That’s why they outperform their row-oriented counter parts on analytic queries.

We conclude that a database system is native if the data formats and structures it uses effectively sequentialize memory access across the memory hierarchy for the targeted type of workload.

Embedding a Graph in a 1-Dimensional Space

Let us now apply the concept of native computing to graph databases. Graph databases need to efficiently execute arbitrary graph traversals. A graph traversal is a restricted walk over the graph, moving from one vertex to its adjacent vertices via a selected set of incident edges. Without making any assumption on the type of traversal to be executed, it follows that a graph database needs to store vertices, their incident edges and their adjacent vertices in close proximity in the memory systems in order to sequentialize memory access (see Scalable Graph Computing: Der Gekrümmte Graph). However, those vertices have other adjacent vertices which makes it impossible to keep everything sequential (save in the most trivial graph topologies).

Consider the small graph on the left. Pick any vertex. Linearly write down that vertex, its edges and its adjacent vertices. With that initial choice made, it becomes increasingly difficult — and ultimately impossible — to add the other vertices, their edges and adjacencies into your linear list without pulling adjacent vertices apart. What you are attempting to do, and what every graph database needs to do, is to topologically embed a graph into a 1-dimensional space. There is a branch of mathematics called topological graph theory which studies such graph embeddings for arbitrary spaces and graphs. Unless the graph has no edges or forms a linear chain, there is no (strict) embedding into a 1-dimensional space. Hence, for all but the simplest graphs there exists no native data representation on typical Von Neumann computers which require sequential memory layout.

We conclude that there is no such thing as a native graph database for current computing systems.

Approximations to Embedding a Graph in a 1-Dimensional Space

Without a perfect mapping, the best that can be achieved is an approximation. Thus, the layout of a graph in sequential memory turns into an optimization problem. The question then becomes: “Can we lay out the graph sequentially so that most vertices have their neighbor close by?” For any given graph layout in memory, we can define the distance between two adjacent vertices as the absolute difference in their (sequential) memory addresses. Then, we define the objective function as the sum of distances of all adjacent vertex pairs. The optimal graph data layout is the one that minimizes this objective function. Unfortunately, this optimization problem is NP-hard, which means that the best we can hope for in practice are approximations (unless P=NP). Even those approximations are pretty expensive to compute. For example, in my research on disk-oriented graph layouts (see Disk-Oriented Graph Management), I propose an approximation method based on hierarchical clustering which took hours to calculate on graphs with 100s of millions of edges and requires that the graph structure remain fixed. A static graph structure is not an option if you need to update the graph in real-time (i.e. the typical use-case for a graph database). There is a vast trove of research on the subject of optimal graph embeddings, which I encourage you to read if you are interested in the fascinating world of hyper-dimensional manifolds, but let me conclude here that this problem is difficult, far from solved, and an area of active research.

We conclude that a graph database should be judged on the tradeoffs it makes concerning computational/memory complexity and the quality of its graph embeddings.

Titan’s Approach to Embedding a Graph in a 1-Dimensional Space

Adjacency ListLet me illustrate the tradeoff choices that the graph database Titan has made in embedding a graph into a modern computer. Titan internally represents a graph as an adjacency list. This means it stores each vertex with all of its properties and incident edges in one sequential list. This has the benefit that Titan can access all edges and their properties with one sequential scan. In fact, Titan uses a more sophisticated variant of the adjacency list format where edges are stored in a user-defined sort order. For each vertex, an offset index is built that allows for fast retrieval of subsets of the incident edges that fall within a given sequential range. We call these indices vertex-centric indices and they significantly speed up graph traversals on large graphs which have vertices with many edges — so called supernodes (see A Solution to the Supernode Problem). The downside of vertex-centric indices is that they require building index structures, keeping edges in sort order, and each edge is stored twice. This, in turn, requires more complex data structures and regular compactions of the data files after updates to the graph. Once off disk, Titan maintains the sorted adjacency list format in RAM using binary search instead of building index structures to acknowledge the slightly more favorable random access times of RAM. This allows the CPU to effectively prefetch data on traversals. We see that Titan expends additional storage space and computation to achieve a more sequential representation of the graph for quicker data retrieval.

Titan: Distributed Graph Database Once vertices are co-located with their incident edges, the next problem is, how should vertices be co-located with their adjacent vertices? Vertices are laid out sequentially in random or guided order. With random order there is no data locality between adjacent vertices but its trivial to compute and leads to balanced partitions in distributed environments. Alternatively, Titan provides a “guided order” mode which allows users to plug-in a heuristic function that tells Titan which partition block a particular vertex should be assigned to or which other vertex it should be close to. While Titan handles all of the logistics, the “intelligence” of the allocation is in the heuristic function. We believe that for some graphs it is pretty easy to come up with a good heuristic function with enough domain knowledge. For instance, the Facebook graph can be effectively partitioned using user geography as a heuristic (see Balanced Label Propagation for Partitioning Massive Graphs). This is not surprising since users who live close by are more likely to be friends on Facebook. Heuristic partitioning works well on some graphs, but requires more computation on update and is prone to imbalances in distributed deployments if not used carefully.

To conclude, the next time somebody tries to sell you a “native” graph database, be sure to reference this letter and ask them how they lay out their graph throughout all the sequential memory layers and decide for yourself the optimality of their embedding (both on disk and in memory)

Conclusion

The intention of this letter was to point out the challenges that all graph databases are facing, vindicate Titan from arbitrary categorization by vendor marketing, and to ensure that the public is properly educated on the topic at hand. All arguments presented are based on generally available computer science knowledge. Please comment on Hacker News if you find the argument flawed, lacking a proper citation, or wish to discuss the thesis at greater length. Moreover, if you are interested in this area of technology, please join Aurelius’ public mailing list and participate in helping us develop the next generation of graph technologies.

Acknowledgements

Dr. Marko A. Rodriguez provided editorial review of this letter in order to make the presented argument more concise, supported with images, and easier to consume for the general reader. Pavel Yaskevich provided astute points regarding the interplay between CPU caches and RAM.

Author

Matthias Broecheler

Developing a Domain Specific Language in Gremlin

Developing a Gremlin-based DSL Domain Specific Languages (DSLs) provide a way for programmers to increase the expressivity of their code within a specific domain. It enables the developer to gain greater productivity by hiding low-level programming constructs in favor of higher level ones that better link to the elements of the domain itself. DSLs also have the benefit of making it possible for non-programmers to “speak” in the same language as their programming counterparts, thus lowering technical communication barriers.

Pearson’s OpenClass Education-based DSL

An earlier Aurelius blog post entitled “Educating the Planet with Pearson,” spoke of the OpenClass platform and Titan’s role in Pearson’s goal of “providing an education to anyone, anywhere on the planet”. It described the educational domain space and provided a high-level explanation of some of the conceptual entity and relationship types in the graph. For example, the graph modeled students enrolling in courses, people discussing content, content referencing concepts and other entities relating to each other in different ways. When thinking in graph terminology, these “conceptual entity and relationship types” are expressed as vertices (e.g. dots, nodes) and edges (e.g. lines, relationships), so in essence, the domain model embeds conceptual meaning into graph elements.

Domain Over GraphAt Pearson, the OpenClass domain model is extended into a programmatic construct, a DSL based on Gremlin, which abstracts away the language of the graph. Engineers and analysts can then ask questions of the graph in their educational domain language, as opposed to translating those familiar terms into the language of vertices and edges. The OpenClass DSL defines the graph schema, extends the Gremlin graph traversal language into the language of education, provides standalone functions that operate over these extensions, and exposes algorithms that are developed from those extensions and functions. Together these components form a coarsely-grained API which helps bring general accessibility to complex graph traversals.

Extending Gremlin into a DSL

Gremlin is a Groovy-based DSL for traversing property graphs and has applicability in the areas of graph query, analysis, and manipulation. It provides for an intuitive way to not only think in terms of a graph, but to program in terms of one as well. One of the interesting properties of Gremlin is that it allows programmers to extend upon the language for even greater programmatic expressiveness, not just within graphs, but within the domain that the graph itself resides.

Person Wrote Post ModelAn important aspect to Pearson’s graph is the notion of “people discussing content.” For purpose of the upcoming examples, think of “content” as an online discussion forum, with instructor assigned topics for students to share discourse and open debate. A person writes a post which may be in reference to a different post that was written earlier.

Traversing Discussions in Graph Terminology

Traversing the GraphGiven the “people discussing content” graph structure, a Gremlin novice could immediately begin to navigate the graph. Asking the graph for a list of all vertices with a property key of type and a value of post yields the list of posts in the graph. The Gremlin for such a request is below:

g.V.has('type','post')

The traversal becomes a bit more involved when there is a need to walk the depth of the tree of posts:

g.V.has('type','post').out('child').loop(1){it.loops<25}{true}

To analyze and compare threads of posts within the tree, the path of each thread needs to be analyzed, such that each thread be flattened into a Map, where the key is the userName of the user who wrote the first post in the thread and the value is a unique list of vertices for the threads the user started:

m=[:]
g.V.has('type','post').out('child').loop(1){it.loops<25}{true}
   .path.groupBy(m){it[0].userName}{it}{it.collectMany{it}.unique()}.iterate()

Evaluating m after execution of the traversal would yield post vertices arranged as follows:

gremlin> m
==>marko=[v[184476380], v[282106584], v[184550536], v[189966816]]
==>josh=[v[173318448], v[188571048]]
==>daniel=[v[186130596], v[308964172]]
...
==>stephen=[v[176281532], v[182440524], v[188572948], v[282049412]]

It is not important to understand the mechanics of the Gremlin above. Its intent is to demonstrate a bit of confusion, in the sense that even a Gremlin expert might have to take a moment to deconstruct what this code is doing. Consider for a moment just how much someone unfamiliar with graphs would have to learn in order to get a set of flattened post threads into this format. The value of a DSL suddenly becomes apparent.

Traversing Discussions in the Domain Terminology

Traversing the DomainDeveloping a DSL can begin with a standalone Groovy script that can be referenced when starting a Gremln REPL or initialized into Rexster or Titan Server through the init-script configuration element of rexster.xml. In the case of OpenClass, the DSL has evolved well past the early development needs that a “script” satisfies and is now engineered as a Maven-based project deployed as a standalone JAR file.

It is a good practice to use the DSL to centralize the property name strings that make up the graph’s schema. Avoiding the use of hard-coded strings eases future refactoring efforts and makes it straightforward to identify property usage within the DSL itself.

class S {
  public static final String EDGE_CHILD = "child"
  public static final String PROPERTY_POST_ID = "postId"
  public static final String PROPERTY_TYPE = "type"
  public static final String TYPE_POST = "post"
}

Examining the Gremlin traversals from the previous section, it can be seen that there is some commonality to them in that they all start with similar types of statements, each building on the next to add additional layers of processing. With Gremlin’s User Defined Steps, it is possible to build composable base steps that extend the language of the graph or operate at a level of abstraction higher than the graph itself.

First, define a class that will be responsible for containing the step definitions and for initializing them into Gremlin:

class Steps {
  def load() {
    // this method will call methods that will initialize each step definition.
    // from the Gremlin REPL or other code base that utilizes the steps, simply
    // call new Steps().load() to make the steps available.  
  }
}

With the Steps class in place, a first step definition can be added to encapsulate post filtering:

class Steps {
  def load() {
    defineStepPost()
  }

  private def defineStepPost() {
    Gremlin.defineStep('post', [Vertex, Pipe], { _().has(S.PROPERTY_TYPE, S.TYPE_POST) })
  }
}

Including this step simplifies the three Gremlin statements written in the previous section to:

g.V.post

g.V.post.out('child').loop(1){it.loops<25}{true}

m=[:]
g.V.post.out('child').loop(1){it.loops<25}{true}
   .path.groupBy(m){it[0].userName}{it}{it.collectMany{it}.unique()}.iterate()

Gremlin GraduateThe post step replaces usage of has(S.PROPERTY_TYPE, S.TYPE_POST). That change doesn’t make the code much more readable, but it is a start. Continuing with the example, two additional steps are included, one to traverse the tree of post vertices and one to flatten each thread (or discussion path):

class Steps {
  public static final int CONTROL_MAX_DEPTH = 25
  def load() {    
    defineStepPost()
    defineStepPostTree()
    defineStepFlattenThread()
  }

  private def defineStepPost() {
    Gremlin.defineStep('post', [Vertex, Pipe], { _().has(S.PROPERTY_TYPE, S.TYPE_POST) })
  }

  private def defineStepPostTree() {
    Gremlin.defineStep('postTree', [Vertex, Pipe], { depth = CONTROL_MAX_DEPTH ->
            _().post.out(S.EDGE_CHILD).loop(1){it.loops<depth}{true}
    })
  }

  private def defineStepFlattenThread() {
    // the addition of .transform{it[0]}.dedup to the end of this Gremlin statement
    // makes flattenThread a pure side-effect in that it converts the output back to
    // the original vertices passed in.
    Gremlin.defineStep('flattenThread', [Vertex, Pipe], { m, depth = CONTROL_MAX_DEPTH, keyOn = null ->
            _().postTree(depth).path.groupBy(m){keyOn == null ? it[0] : keyOn(it[0])}{it}
            {it.collectMany{it}.unique()}.transform{it[0]}.dedup
    })
  }
}

The addition of these steps simplifies the traversals and expands their flexibility:

g.V.post

// traverses to the default depth of 25
g.V.postTree

// traverse to the assigned depth of 256
g.V.postTree(256) 

m=[:];g.V.flattenThread(m).iterate()

// traverse to depth 256
m=[:];g.V.flattenThread(m, 256).iterate() 

// traverse to depth 256, key the Map on the postId of the root vertex instead of the vertex itself 
m=[:];g.V.flattenThread(m, 256, {it.getProperty(PROPERTY_POST_ID)}).iterate()

The steps have also been defined in such a way that the DSL gains the interesting capability to parameterize behavior of the traversal. Parameterization of steps introduces flexibility to the DSL, allowing the consumers of the functions to tune the internals of the traversal for performance, filtering, transformations, etc. Note how the last example of flattenThread provides a closure for the final argument, making it possible to introduce dynamic behavior to traversals. Instead of always keying the map on userName, that behavior is now determined by the user of DSL.

DSL Development Patterns

The list below represents recommended patterns to follow when building DSLs with Gremlin:

  • Centralize property names, edge labels and other string values as global variables. Don’t embed string literals into the DSL.
  • Include a schema class with some sort of initialization function that takes a Blueprints Graph instance as an argument and configures the indices of the graph. A schema class is especially important when using Titan and its Type Definitions.
  • Standalone Groovy scripts are just a starting point for a DSL. Those scripts will quickly grow in complexity and become unmanageable. Treat the DSL as its own project. Use a dependency management and build system like Maven or Gradle to produce compiled code that can be referenced in other projects, pulled into the Gremlin REPL with Grape or copied to the path of Rexster or Titan Server and configured for use with imports and static-imports settings. Note that direct support of Grape in the REPL will be replaced in Gremlin 2.5.0 with the Gremlin.use() function which wraps Grape and performs a similar function.
  • Given deployment of the DSL to Rexster or Titan Server, client-side code no longer needs to pass long, complex Gremlin scripts as strings for remote execution via REST or RexPro. Client applications can just call parameterized functions from the DSL exposed on the server.
  • Write tests. Use the aforementioned schema class to “set-up” a Graph instance on each run of the tests to ensure that changes to the schema do not introduce problems and that new additions to the DSL will work properly in the production graph given that configuration. Use TinkerGraph for a lightweight means to test DSL operations.
  • Write in-line documentation for the schema, User Defined Steps, and other functions, but consider avoiding javadoc. Use a tool like Groc, which processes Markdown formatted text and produces documentation that includes the source code.
  • Design DSL components as composable blocks, such that one or more blocks can be used together for even higher-level operations. When possible, think generically and design functions that can be altered at the time they are called through parameterization with settings and closures.
  • The DSL is not just about extending Gremlin with User Defined Steps. Make use of the Groovy language and write standalone functions that operate on the graph, within User Defined Steps, or anywhere it makes sense to encapsulate graph functionality.
  • Use an IDE, like IntelliJ. Since Gremlin is Groovy, IDE features like syntax validation and code complete help make writing Gremlin more productive.

Conclusion

Gremlin is a general purpose graph traversal language, but developers can extend it with the specific rules and nomenclature of their domain. This additional layer to Gremlin can provide for a robust toolset for programmers, analysts and others interested in the data that the graph contains.

Pearson’s OpenClass DSL continues to expand allowing realization of the following benefits:

  • All logic related to the graph is centralized in the DSL providing a standard interface for any part of the organization that wishes to access the graph for information.
  • Non-programmers leverage the DSL in their work, as there is less “Gremlin-graph” language and more “education-graph” language.
  • Ad-hoc analysis of the graph tends to be less error prone and more productive, as the higher-order functions of the DSL are tested versions of common and sometimes mundane traversals (e.g. traversing a tree).
  • Interesting and unintended discoveries occur when exploring the graph by mixing and matching functions of the DSL.

The introduction of a DSL over Gremlin will be beneficial to projects of any size, but will quickly become a requirement as the complexity of the conceptual model of the domain increases. Investing in a DSL to make it a core component of a graph engineering strategy, should be considered a common pattern for productionalizing Gremlin in the TinkerPop and Aurelius technology stacks.

Acknowledgements

Dr. Marko A. Rodriguez read draft versions of this post and provided useful comments.

Authors

Stephen Mallette

Scalable Graph Computing: Der Gekrümmte Graph

Scalable Graph Computing is a multi-part treatment on that for which the series is named.

A graph describes which “things” are adjacent to which other “things.” A logical graph forms a topology separate from the topology of the physical, material world. When computing, the logical structure must be placed in 3D, geometric space. A scalable graph data structure demonstrates a non-random morphism between logical and physical adjacency. This post discusses scalable graph data structures from the following perspective: It is important to co-locate data in space according to the metric of co-retrieval in time. Respecting space/time is the difference between sequential and random memory access. It is the difference between scalable and unscalable (graph) computing.

Co-Location and Co-Retrieval in Computing

If X tends to be retrieved with Y, then X and Y should reside side-by-side. If groceries and then the laundromat are on the todo list then to save time, it is best that they are near one another. With respect to the memory hierarchy, X and Y should be in the same cloud region, same data center, the same server rack, the same machine, on the same page of disk, in the same block of memory, together in L* cache, and — to the limit of identity — in the CPU’s registers. It is ludicrously slower to move data across the Internet to a CPU than to move that same data from the CPU’s L1 cache. The difference is ~50 milliseconds versus ~1 nanosecond, where 50 milliseconds is 50,000,000 nanoseconds. In human terms, suppose 1 second is a nanosecond, then 50 million seconds is 1.58 years. If X is on the CPU and Y is in Connecticut, then a relative eternity must unfold while Y travels its way to X‘s side for comparison/computation (a wait state). Fortunately, real-world computer systems move data in “chunks” and scalable systems use this phenomena to their advantage.

Memory Hierarchy

Tommy MacWilliam in Computer Science E-1

All the bits in a word are necessary to define the word: ?001 could be 0001 (1) or 1001 (9). When one bit of a word is accessed, all bits must be accessed else, the meaning of the word is unclear — ?at could be cat or hat. Local machine buses (bit co-retrieval systems) move a word’s bits together. Bus-inspired co-retrieval exists everywhere. When data is fetched from disk, pages are accessed (i.e. sequential blocks of data). If the page of data with X on it is requested from the OS by the executing program, then Y should have also been on that page, else another trip to disk must be made (seek time is ~5ms). When a human is interested in a webpage, the entire document is downloaded as it is assumed that the sentence after the first will most likely be required next. Similar assumptions are made for streaming audio and video. Data moves towards the CPU from one memory structure to the next in proportion to the size of that memory structure. With any luck (aka a cache hit), the next piece of data required from the CPU will be nearby — not in memory, the disk, nor in Windham, Connecticut.

Localized Computing and Random Access Structures

Physics Computer The function f(x,y) brings two pieces of data together to yield more data. In laymen physics, atoms are data and physical laws are the functions, where the “CPU” is everywhere. An interesting aspect of man-driven computing is that logical adjacency (a graph) does not imply spatial adjacency. Software-based computing, as we know it, is about stitching together “random” areas of memory (space) so that two logically adjacent data can be pulled together for comparison at a spatially localized CPU. Random access computing is both a curse and a blessing. The curse is that physics will not do our job for us — the created graph, while embedded in space, requires a machine that is not physics to evolve it (i.e. a CPU — however see molecular computing). The blessing is that new “things” (new definitions of adjacency) emerge out of 3D reality. In the emergent referential structure, a “thing” can be a “thing” without its parts 3D-near each other. Unfortunately, this level of abstraction comes at the cost of time. For speed, the best the clever engineer can do is try their damnedest to make 3D reality and their data structure isomorphic in the literal sense — the same structure. However, as we believe it, NP!=P and P!=O(1).

[P]rogramming is basically planning and detailing the enormous traffic of words through the Von Neumann bottleneck, and much of that traffic concerns not significant data itself, but where to find it.
    — John Backus (1977)

CPU Computer Three-dimensional space, the things within it, and the physics that evolves it are not expressive enough for the likes of mind — for man has eaten from the tree of knowledge. No longer within the grace of a 1-to-1 correspondence, neurons go about constructing new maps of space/time that unite disparate areas via the construction of concepts in the hopes of realization (i.e. “full comprehension of the situation”). When the brain runs out of storage and compute resources, it goes about its way linking concepts in a computer using data structures (graphs). When a single computer is no longer sufficient, clusters of computers are leveraged. As larger graphs and more complex problems are met, graph algorithms must become more advanced (O(n1/c)) and/or the graph data structures must be represented more efficiently according to the constraints of modern hardware (see the Von Neumann bottleneck which is also a constraint of mind). In the end, both CPUs and brains are not efficient architectures as the data and the process are not one in the same and the data must ultimately be moved to a single point in space (e.g. a CPU’s ALU, which implements the function f(x,y), and the world must go through the senses to the brain).

“Neurons which fire together, wire together.”
    — Donald Hebb on Hebbian Theory

Co-Locating the Structures of a Graph

All The World on a Head of a Pin The structures of a property graph are key/value properties, vertices, edges, incidences, adjacencies, cliques, clusters, super clusters, and graph filaments. Unfortunately, the entire world can not exist on the head of a pin (the CPU’s registers). While a graph is ultimately bits/bytes/words, the aforementioned larger structures hint at the probability of co-retrieval and allow the clever engineer to use that information to place the structures in space.

When a vertex is accessed, its long identifier and key/values properties are typically accessed as well. If a vertex’s incident liked-edges are accessed according to their stars-property, then it is important to have those edges sequentially ordered by 5 star, then 4 star, then 3, and so on with an offset index to where each group can be found (see vertex-centric indices). One of the major benefits of the graph data structure is that adjacency is made explicit and is not up to arbitrary joins. When a vertex is accessed, its adjacent neighbors are known and those adjacents are more likely to be accessed next than a random distant neighbor. Thus, the vertices in a clique should reside side-by-side. Given the size of a clique (potentially 10s of vertices), they should be on the same page of disk. Loose cliques form clusters — e.g., students at a university are more likely to interact with each other than with students in different universities. Partitioning the vertices across physical machine servers according to their university affiliation may respect the principle of co-retrieval/co-location (see Educating the Planet with Pearson). Clusters form super-clusters — universities in the same region of the world are more likely to interact. However, at this scale, other dimensions of the graph (i.e. paths between vertices) could be asserting their effects on co-retrieval. The larger the scale, the harder it is to predict which bit will follow next. Below is a summary of the components of a graph and their co-location in the memory hierarchy according the perspective of a single vertex (where “perspective” is a process at the CPU).

structure       location        description
id              register        A long value uniquely identifying a vertex/edge
property        L1 cache        A string key and a primitive value
incidents       L2/L3 cache     A vertex's incoming and outgoing edges
adjacents       Block of RAM    The vertices incoming and outgoing to a vertex
clique          Page on Disk    A group of collectively adjacent vertices
cluster         Server          A group of vertices with high intra-connectivity and low inter-connectivity
super cluster   Server Rack     A group of clusters with high intra-connectivity and low inter-connectivity
filament        Data Center     A massive lattice-like structure connecting super clusters     

A Galactic Simulation via KIPAC

The larger the structure, the slower it evolves as large structures are epiphenomena of their more atomic pieces. While the atoms of computing (bits/words) are the structures processed by the CPU, these mutations at small take time to effect changes at large — nanoseconds, seconds, days, years. The evolution of local neighborhoods in the graph percolate structural changes throughout the system. Thus, optimizing the layout of a universal graph across all data centers in the world is not only NP hard (given the possible number of permutations), but it is also a constantly evolving problem. The principle of divide-and-conquer is used today — optimize for key/values, incident edges, clusters and with any “luck” (a cache hit), the next piece of data is near a CPU.

Conclusion

The concept of computing is very general and can be implemented in a multitude of ways. In the everyday real-world, the Von Neumann machine architecture is the standard. These machines are composed of a CPU, memory, and a disk. They are networked together using high-speed interconnects within the same server rack. Server racks are stored in data centers. Data centers exist in regions across the globe. As graph structures become larger, so does the computing resources required to represent and process them. However, any one micro-structure of the universal graph is ultimately operated on by a particular CPU in the cluster. The collective computation of all CPUs yields the evolution of man’s (re)definition of reality contorted in space.

Acknowledgments

Dr. Matthias Bröcheler is an advocate of algorithm and data structure designs that are sympathetic to hardware — tailored to the memory hierarchy. Discussions on such matters fostered the writing of this post.

Authors

Marko A. Rodriguez

Loopy Lattices Redux

Loopy Lattices Redux A graph can be realized as a tessellated surface. Each vertex (or point) on the surface has a set of neighbors. Two vertices are neighbors if there is an edge that connects them. For a discrete traverser (e.g. a marble), movement in a space requires a decision at each vertex. The traverser must decide whether to go left, up, east, right, west, or down — i.e., it must choose one and only one edge emanating from its current vertex. The edge chosen leads to yet another vertex. At which point, another choice must be made ad infinitum.

k-Regular Graphs: k-Ary String Generators

Binary State Machine A directed graph is k-regular if each vertex has k adjacent vertices. Starting from any vertex, there are kn possible n-length paths in a k-regular graph. 2-Regular Graph For a 2-regular graph (k=2), the number of unique paths is equivalent to the number of objects that can be represented by a binary string. For instance, there are 4,294,967,296 length 32 paths in a 2-regular graph (232). This relationship is apparent if every vertex’s incident edges are labeled with either a 0 or 1. If the traversers print the edge label at each step in an n-step walk, then all possible n-length binary strings will be generated.

A graph with 2-degree vertices is rare in nature. Binary Tree A graph with 10+ degree vertices is more common, though such k-regularity is uncommon. A 10-regular graph has 100,000,000,000,000,000,000,000,000,000,000 unique 32-length paths (1032). Tiny Binary Machine Combinatorial explosions are readily apparent when computing on natural graphs. Such truths effect the way in which technology/algorithms should be implemented when solving real-world graph problems.

Lattice Graphs: Binary Strings with an Equal Number of 0s and 1s

Lattice Graph A lattice is a planar graph with a rectangular tessellation. The “inside” of a finite lattice is regular in that each vertex connects to the same number of vertices, but the “outside” is not regular in that there are no neighbors beyond the border vertices. A 20x20 directed lattice has 441 vertices and 840 edges. As shown analytically in the original Loopy Lattices post, there are 137,846,528,820 (137.85 billion) unique paths from the top-left vertex to the bottom-right vertex. A discrete traverser must take 40 steps to make the journey. Of those 40 steps, an equal number of 1s and 0s will be “printed.” Thus, the problem of determining how many unique paths there are in a directed 20x20 lattice is a question of how many unique 40-length binary strings exist such that there is an equal number of 1s and 0s. This constraint yields a number that is much smaller than 2n. In the previous post, Gremlin (in its depth-first, enumerative form) could not calculate the answer due to the explosion of possibilities. Therefore, to answer the question, the closed form solution below was provided. The solution says “2n choose n” or, in particular, “40 choose 20.” The 20 chosen 1 slots in the 40-length binary string forces the remaining 20 positions to be 0.

Titan vs. Faunus: The Enumerative/Counting Distinction

A graph database such as Titan can be used to store a 20x20 lattice. While a 840 edge graph is extremely small for a “database,” it is necessary for the experiment to follow. The Gremlin/Groovy code to create a 20x20 lattice in Titan is provided below.

def generateLattice(n) {
  g = TitanFactory.open('bin/cassandra.local')
   
  // total number of vertices
  max = Math.pow((n+1),2)
   
  // generate the vertices
  vertices = [];
  (1..max).each { vertices.add(g.addVertex()) }
     
  // generate the edges
  vertices.eachWithIndex { v, i ->
    right = i + 1
    if (((right % (n + 1)) > 0) && (right <= max)) {
      v.addEdge('link', vertices[right])
    }
 
    down = i + n + 1
    if (down < max) {
      v.addEdge('link', vertices[down])
    }
  }
  g.commit();
  return g
}

Traversing a Lattice with Titan

Titan: Distributed Graph Database With Titan/Gremlin, it is possible to count the number of 1-, 2-, 3-, …, 40-length paths in the 20x20 lattice. A traversal of length 40 is a full traversal of the lattice from the top-left to the bottom-right. The traversal code and the runtimes are provide below. An exponentiating runtime is realized: 10.9 minutes (28 steps), 2.3 hours (29 steps), and 9.8 hours (30 steps) traversals. The calculation was halted on the 31st step. The number of 40-length paths could not be reasonably calculated with Titan/Gremlin. By exhaustively enumerating paths and with the number of paths growing exponentially as the path length increases, a calculation of this form is doomed for large values of n.

/usr/local/titan-all-0.3.1$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = generateLattice(20)
==>titangraph[cassandrathrift:127.0.0.1]
gremlin> g.V.count()
==>441
gremlin> g.E.count()
==>840
gremlin> (1..40).each{ l ->
  t = System.currentTimeMillis();
  c = g.v(4).out.loop(1){it.loops <= l}.count()
  t = System.currentTimeMillis() - t;
  println(l + ":" + c + ":" + t)
}

path length path count traversal time (ms) path length path count traversal time (ms)
1 2 9 2 4 2
3 8 1 4 16 2
5 32 3 6 64 5
7 128 10 8 256 17
9 512 36 10 1024 62
11 2048 74 12 4096 96
13 8192 49 14 16384 61
15 32768 88 16 65536 163
17 131072 325 18 262144 646
19 524288 1296 20 1048576 2573
21 2097150 5172 22 4194258 10306
23 8388054 20659 24 16772566 41555
25 33523880 82993 26 66941500 166828
27 133422540 331954 28 265069020 654070
29 523921830 8288566 30 1027813650 35512124

Traversing a Lattice with Faunus

Faunus: Graph Analytics Engine Faunus is a graph analytics engine that leverages Hadoop and a breadth-first implementation of Gremlin. With Faunus, paths can be enumerated in a similar fashion to Titan/Gremlin, but if only counts are required (destinations vs. paths), then it is more efficient to propagate and sum counters. Pascal's Triangle Instead of explicitly storing each and every Gremlin at every vertex, Faunus stores the number of Gremlins at each vertex. This is the difference between representing a list of length m and a long with value m. A consequence of this model is that it is possible to efficiently compute the number of 40-length paths in a 20x20 lattice with Faunus. This counter propagation mechanism is analogous to the mechanical technique for computing binomial coefficients as proposed by Blaise Pascal via Pascal’s triangle (in the Western world). It is important to note the breadth-first requirement of this computation.

Enumeration vs. Counting

g = FaunusFactory.open('bin/titan-cassandra-input.properties')
t = System.currentTimeMillis()
// As of Faunus 0.3.1, the loop() construct is not supported
g.v(4).out.out.out.out.count() // 4-steps
t = System.currentTimeMillis() - t

The number of paths of length n in a 20x20 lattice is plotted below in both y-linear (left) and y-logarithmic (right) form. In short, the number of paths grows exponentially as the path length increases. What is interesting to note in the following table of Faunus counts and runtimes is that Faunus is able to compute the total number of unique 40-length paths in the lattice in 10.53 minutes — 137,846,528,820.

20x20 Lattice Path Count

path length path count traversal time (ms) path length path count traversal time (ms)
1 2 32781 2 4 48071
3 8 63537 4 16 78575
5 32 94078 6 64 109246
7 128 124574 8 256 139850
9 512 156223 10 1024 171549
11 2048 186049 12 4096 201111
13 8192 218417 14 16384 232642
15 32768 248019 16 65536 263355
17 131072 278685 18 262144 296912
19 524288 308225 20 1048576 324440
21 2097150 340823 22 4194258 355349
23 8388054 371546 24 16772566 385755
25 33523880 402189 26 66941500 416868
27 133422540 433917 28 265069020 448150
29 523921830 462522 30 1027813650 478595
31 1995537270 493224 32 3821729910 508492
33 7191874140 524791 34 13237415400 539216
35 23690879520 556108 36 40885872720 568512
37 67156001220 586697 38 102501265020 601196
39 137846528820 617152

Titan’s runtime grows exponentially, in proportion to the number of paths computed. On the other hand, Faunus’ computation time grows linearly when computing an exponential path count. At step 28, Faunus executes the path count faster than Titan. This does not mean that Titan is inherently less efficient at such computations, it is simply a function of the depth-first, enumerative nature of Gremlin/Titan vs. the breadth-first, counter nature of Gremlin/Faunus. Implementing Faunus’ Gremlin engine over Titan would enable Titan to compute such path counts effectively. However, the purpose of Faunus is to serve as that — the global batch processer to Titan.

Titan and Faunus Lattice Traversal Times

Conclusion

20x20 Lattice A graph is a data structure composed of vertices and edges. The natural interpretation of a computation on a graph is a traversal — i.e., a directed walk over the vertices by means of chosen incident edges. An exhaustive exploration of all paths within a graph is typically not feasible because the number of paths grows exponentially as a function of the path length and the graph’s branch factor. As demonstrated with Titan and Faunus, the goal of the traversal and the choice of the traversal engine ultimately determines the feasibility of the computation. Once again, the loopy lattice exposes a simple truth in the theory and practice of graphs.

Acknowledgments

Dr. Vadas Gintautas read draft versions of this post and provided useful comments. Finally, this post was inspired by the work previously done by Bobby Norton, Marko A. Rodriguez, and Vadas Gintautas entitled Loopy Lattices.

Authors

Marko A. Rodriguez

Educating the Planet with Pearson

Titan: Distributed Graph Database Pearson is striving to accomplish the ambitious goal of providing an education to anyone, anywhere on the planet. New data processing technologies and theories in education are moving much of the learning experience into the digital space — into massive open online courses (MOOCs). Two years ago Pearson contacted Aurelius about applying graph theory and network science to this burgeoning space. A prototype proved promising in that it added novel, automated intelligence to the online education experience. However, at the time, there did not exist scalable, open-source graph database technology in the market. It was then that Titan was forged in order to meet the requirement of representing all universities, students, their resources, courses, etc. within a single, unified graph. Moreover, beyond representation, the graph needed to be able to support sub-second, complex graph traversals (i.e. queries) while sustaining at least 1 billion transactions a day. Pearson asked Aurelius a simple question: “Can Titan be used to educate the planet?” This post is Aurelius’ answer.

Data Loading Benchmark

Pearson Education Pearson provides free online education through its OpenClass platform. OpenClass is currently in beta with adoption by ~7000 institutions. To meet the expected growth beyond beta, it is necessary to build the platform on a scalable database system. OpenClass Moreover, it is important to build the platform on a database system that can support advanced algorithms beyond simple get/put-semantics. The latter was demonstrated via the initial prototype. To the former, a simulation of a worldwide education environment was created in Titan to alleviate Pearson’s scalability concerns.

Number of students 3.47 billion
Number of teachers 183 million
Number of courses 788 million
Number of universities 1.2 million
Number of concepts 9 thousand
Number of educational artifacts ~1.5 billion
Total number of vertices 6.24 billion
Total number of edges 121 billion

The simulated world was a graph containing 6.24 billion vertices and 121 billion edges. The edges represent students enrolled in courses, people discussing content, content referencing concepts, teachers teaching courses, material contained in activity streams, universities offering courses, and so forth. Various techniques were leveraged to ensure that the generated data was consistent with a real-world instance. For example, people names were generated from sampling the cross product of the first and last names in the US Census Bureau dataset. Gaussian distributions were applied to determine how many courses a student should be enrolled in (mean of 8) and how many courses a teacher should teach (mean of 4). The course names and descriptions were drawn from the raw MIT OpenCourseWare data dumps. Course names were appended with tokens such as “101,” “1B,” “Advanced,” etc. in order to increase the diversity of the offerings. Student comments in discussions were sampled snippets of text from the electronic books provided by Project Gutenberg. University names were generated from publicly available city name and location datasets in the CommonHubData project. Finally, concepts were linked to materials using OpenCalais. The final raw education dataset was 10 terabytes in size.

A 121 billion edge graph is too large to fit within the confines of a single machine. Fortunately, Titan/Cassandra is a distributed graph database able to represent a graph across a multi-machine cluster. The Amazon EC2 cluster utilized for the simulation was composed of 16 hi1.4xlarge machines. The specification of the machines is itemized below.

  • 60.5 GiB of memory
  • 35 EC2 Compute Units (16 virtual cores and 64-bit)
  • 2 SSD-based volumes each with 1024 GB of instance storage
  • I/O Performance: Very High (10 Gigabit Ethernet)

Educating the Planet - Disk Writes The 10 terabyte, 121 billion edge graph was loaded into the cluster in 1.48 days at a rate of approximately 1.2 million edges a second with 0 failed transactions. These numbers were possible due to new developments in Titan 0.3.0 whereby graph partitioning is achieved using a domain-based byte order partitioner. With respect to education, the dataset maintains a structure where most edges are intra-university rather than inter-university (i.e. students and teachers typically interact with others at their own university, and even more so within their own courses). As such, domain partitioning is possible where vertices within the university (i.e. graph community) are more likely to be co-located on the same physical machine.

Once in Titan, the raw 10 terabyte dataset was transformed to 5 terabytes due to data agnostic Snappy compression and the use of Titan-specific graph compression techniques (e.g. “id deltas,” type definitions, and Kryo serialization). After the data was loaded, it was immediately backed up to Amazon S3. This process took 1.4 hours using Titan’s parallel backup strategy. The nodetool statistics for the Titan/Cassandra cluster are provided below.

Address         Rack    Status   State    Load        Token
10.244.196.10   rack1   Up       Normal   329.44 GB   Token(bytes[c000000000000000])
10.244.194.142  rack1   Up       Normal   348.62 GB   Token(bytes[3000000000000000])
10.244.196.111  rack1   Up       Normal   330.86 GB   Token(bytes[b000000000000000])
10.244.195.155  rack1   Up       Normal   333.57 GB   Token(bytes[a000000000000000])
10.244.195.243  rack1   Up       Normal   330.91 GB   Token(bytes[9000000000000000])
10.244.195.209  rack1   Up       Normal   326.57 GB   Token(bytes[f000000000000000])
10.244.195.93   rack1   Up       Normal   355.26 GB   Token(bytes[4000000000000000])
10.244.195.179  rack1   Up       Normal   325.73 GB   Token(bytes[e000000000000000])
10.244.195.57   rack1   Up       Normal   351.47 GB   Token(bytes[1000000000000000])
10.244.196.27   rack1   Up       Normal   332.87 GB   Token(bytes[d000000000000000])
10.244.196.93   rack1   Up       Normal   351.81 GB   Token(bytes[2000000000000000])
10.244.195.6    rack1   Up       Normal   331.56 GB   Token(bytes[8000000000000000])
10.244.195.7    rack1   Up       Normal   327.55 GB   Token(bytes[0000000000000000])
10.244.196.84   rack1   Up       Normal   345.2 GB    Token(bytes[5000000000000000])
10.244.195.8    rack1   Up       Normal   351.26 GB   Token(bytes[6000000000000000])
10.244.194.178  rack1   Up       Normal   338.07 GB   Token(bytes[7000000000000000])

Transactional Benchmark

Titan Cluster and Application Servers The purpose of the second half of the experiment was to subject the 121 billion edge education graph to numerous concurrent transactions. These transactions simulate users interacting with the graph — solving educational problems, adding more content, discussing ideas with one another, etc. To put the 16 hi1.4xlarge cluster under heavy load, 80 m1.medium machines were spawned. These 80 machines simulate the application servers querying the graph and providing users the front-end experience. Each machine maintained 30 threads in a “while(true)-loop,” randomly selecting 1 of 16 transactional templates below and executing it. Thus, 2,400 concurrent threads communicated with the 16 hi1.4xlarge Titan cluster. A review of the Gremlin queries and their mean runtimes with standard deviations are presented in the table below. Note that many of the transactions are “complex” in that they embody a series of actions taken by a user and thus, in a real-world setting, these would typically be broken up into smaller behavioral units (i.e. multiple individual transactions).

name             # of tx    avg (ms)   std dev   description
scommentread     25550909   211.07     45.56     student reads the most recent comments for their courses
reccourse        5149469    467.37     178.20    students gets recommended courses to take
scommentshare    12825909   394.15     57.98     student reads comments in courses and shares a comment
scontent         20567687   279.32     81.83     student retrieves all content for a single course in their course list
saddfollow       12826912   193.72     22.77     student follows another student
scourses         7720965    233.38     79.44     student retrieves a list of all their courses with description
classmates       12849769   96.962     22.27     student retrieves a list of all their classmates
sprofile         7689669    53.740     22.61     student retrieves their profile
recfriend2       5178945    155.75     44.60     student is recommended people to follow (version 1)
scourseactivity  10371133   565.76     189.80    student retrieves the top 10 most recent activities in their courses
scommentadd      5182558    281.90     44.326    student reads course comments and comments at some depth in the discussion tree
recfriend1       5189435    241.33     256.48    student is recommended people to follow (version 2)
sreshare         12850473   284.07     68.20     student reads their stream and shares an item with followers
ssharelink       5140363    261.58     35.75     student shares a link with followers
sdiscussadd      2604696    246.35     34.64     student browses courses and then adds a new discussion topic to a course
sstream          76301001   224.93     84.48     student reads their personal stream     

The transactional benchmark ran for 6.25 hours and executed 228 million complex transactions at a rate of 10,267 tx/sec. Provided the consistent behavior over those 6.25 hours, it is inferred that Titan can serve ~887 million transactions a day. Given the complex nature of the transactions, a real-world production system of this form should be able to sustain greater than 1 billion transactions a day.

Conclusion

Titan: Distributed Graph Database A first attempt at this benchmark was done at the turn of the year 2012. That benchmark was for a 260 billion edge graph (7 billion students) constructed using the same graph generation techniques described in this post. The graph data was loaded, but the transactional benchmark was never executed because the requisite cluster architecture could not be fulfilled by Amazon EC2 (all 32 high I/O machines in the same placement group). Thus, the smaller-scale benchmark presented here was what was possible given EC2′s resources at the time.

While this benchmark demonstrates that Titan can handle the data scale and transactional load of “99%” of applications developed today, Aurelius is not sufficiently satisfied with the presented transactional results. By reducing the transactional load to ~5,000 tx/sec, the runtimes of the queries dropped to the 100 millisecond range. The desired result was to achieve ~100ms query times at 10k tx/sec and 250ms at 25k tx/sec. Fortunately, much was learned about Titan/Cassandra during this exploration. Advances in both inter-cluster connection management and machine-aware traversal forwarding are currently in development in order to reach the desired scaling goals.

A scalable graph database is only half the story — the story presented here. Once scaling has been accomplished, the next problem is yielding knowledge from data. For the last 2 years, Aurelius has been working with Pearson to develop novel algorithms for the education space that move beyond the typical activities seen in current web-systems: profiles, streams, recommendations, etc. Online education is ripe for automating much of the intelligence currently contributed by students and teachers. With a collective graphical model, Aurelius’ algorithms move beyond creating a dynamic interface to providing computational human augmentation. These techniques generalize and should prove fruitful to other domains.

Acknowledgements

This report was made possible due to funding provided by Pearson Education. Over the 5 months that this benchmark was developed, the following individuals provided support: Pavel Yaskevich (Cassandra tuning and Titan development) and Stephen Mallette/Blake Eggleston (RexPro development and testing). Various issues in Cassandra were identified at scale and the Cassandra community readily accepted patches and quickly released newer versions in support of the effort. Note that Titan is an open source, Apache2 licensed graph database. The Titan community has supported the project via patch submissions and testing across use cases and cluster configurations. Finally, Steve Hill of Pearson has done much to ensure Titan’s success — pushing Aurelius to answer the question: “Can Titan educate the planet?”

Authors

Matthias Broecheler Dan LaRocque Marko A. Rodriguez

Titan Server: From a Single Server to a Highly Available Cluster

Titan Growth Titan is a distributed graph database capable of storing graphs on the order of hundreds of billions of edges while, at the same time, supporting billions of real-time graph traversals a day. For most graph applications, the high-end performance aspects of Titan will never be reached. This does not mean that Titan is unsuitable for graph applications at the smaller scale — in the billions of edges and below. The purpose of this post is to introduce Titan from the perspective of a team of engineers developing a new graph-based application. These engineers will initially develop and test their codebase using a single Titan Server. When the application matures and is ready for production use, a highly-available setup is deployed. Finally, as the application becomes more popular and the data size and transactional load increases, a fully distributed cluster is leveraged. Growing a Titan database from a single server to a cluster is simply a matter of configuration. In this way, Titan gracefully scales to accommodate the changing requirements of a graph application.

Titan Single Server

Titan Single MachineAugustus and Tiberius are two software engineers who have designed an application that represents the Gods and people of Rome within a graph of familial relationships — a genealogy application. The intention is that Roman scholars will use the application to better understand the social fabric of their great Empire. While the intention is single-user, the two engineers decide to leverage Titan as the backend graph database. For one, Titan is completely free for any use (Apache 2 licensed) and two, it supports both single server and distributed deployments. The latter is important to them because the Greek Oracle of Delphi foretold that a genealogy graph would one day be used online by everyone throughout the Roman Empire.

$ wget http://s3.thinkaurelius.com/downloads/titan/titan-cassandra-0.3.0.zip
$ unzip titan-cassandra-0.3.0.zip
$ cd titan-cassandra-0.3.0
$ sudo bin/titan.sh config/titan-server-rexster.xml config/titan-server-cassandra.properties
13/03/27 12:40:32 INFO service.CassandraDaemon: JVM vendor/version: Java HotSpot(TM) 64-Bit Server VM/1.7.0_12-ea
13/03/27 12:40:32 INFO service.CassandraDaemon: Heap size: 40566784/477233152
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Loading settings from file:/Users/marko/software/aurelius/titan/config/cassandra.yaml
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Global memtable threshold is enabled at 151MB
13/03/27 12:40:32 INFO service.CacheService: Initializing key cache with capacity of 2 MBs.
13/03/27 12:40:35 INFO server.RexProRexsterServer: RexPro serving on port: [8184]
13/03/27 12:40:35 INFO server.HttpRexsterServer: Rexster Server running on: [http://localhost:8182]
13/03/27 12:40:35 INFO server.ShutdownManager: Bound shutdown socket to /127.0.0.1:8183. Starting listener thread for shutdown requests.
...

Users without wget can use curl -O or download from the Titan download page.

The above sequence of 4 shell commands downloads and starts up a Titan Server on the localhost. Titan Server embeds both Cassandra and (a lightweight version of) Rexster within the same JVM. Titan Server exposes the following language-agnostic endpoints for developers to communicate with the graph:

  1. A RESTful endpoint available at http://localhost:8182/graphs.
  2. A RexPro binary protocol endpoint available on port 8184.

Titan HTTP/RexPro

Titan Server is configured via two primary files: titan-server-rexster.xml (shown below) and cassandra.yaml (discussed in the next section). These files are located in the config/ directory of the titan-cassandra-x.y.z distribution.

<rexster>
    <http>
        <server-port>8182</server-port>
        <server-host>0.0.0.0</server-host>
        <base-uri>http://localhost</base-uri>
        <character-set>UTF-8</character-set>
        <enable-jmx>false</enable-jmx>
        <max-post-size>2097152</max-post-size>
        <max-header-size>8192</max-header-size>
        <upload-timeout-millis>30000</upload-timeout-millis>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </http>
    <rexpro>
        <server-port>8184</server-port>
        <server-host>0.0.0.0</server-host>
        <session-max-idle>1790000</session-max-idle>
        <session-check-interval>3000000</session-check-interval>
        <connection-max-idle>180000</connection-max-idle>
        <connection-check-interval>3000000</connection-check-interval>
        <enable-jmx>false</enable-jmx>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </rexpro>
    <security>
        <authentication>
            <type>none</type>
        </authentication>
    </security>
    <shutdown-port>8183</shutdown-port>
    <shutdown-host>127.0.0.1</shutdown-host>
</rexster>

NOTE: Along with the above endpoints, Titan Server also exposes a JVM native serialization interface that can be used by JVM languages. This interface, for example, is the means by which Faunus/Hadoop interacts with Titan Server for global graph analytics. For more information on this endpoint, see Using Cassandra.

Titan Highly Available

The genealogy application was showing promise as a single-user system for studying the genetic history of the Roman people and Gods. Due to the positive response, Augustus and Tiberius decide that a multi-user online genealogy service would be a successful product.

// how many siblings did jupiter have?
g.V('name','jupiter').out('brother','sister').count() 
// who is caesar's grandmother?
g.V('name','caesar').out('mother').out('mother').name 
// who are marcus' children's in-laws?
g.V('name','marcus').in('father').has('gender','M').out('married').out('father','mother').name 

As it currently stands, the genealogy data set is approximately 1 billion edges. Therefore, it can be stored and processed on a single machine. As a single-user application a single Titan Server suffices. However, with multiple users, it is important that the system is robust and can serve numerous concurrent requests. If the application is only backed by a single server, then if that server goes down, the application is unusable. To ensure 1.) no single point of failure and 2.) support for more transactions per second, Augustus and Tiberius deploy 3 machines each with a Titan Server installed.

Titan Highly Available The team updates the default config/cassandra.yaml file of each Titan Server by changing the localhost property value to be the IP address of the machine and adding a seed IP address for discoverability (see Multinode Cluster). Next, they start each Titan Server one after the other (titan.sh). To ensure that the servers properly clustered together, they use Cassandra’s nodetool.

apache-cassandra-1.2.3$ bin/nodetool ring

Datacenter: datacenter1
==========
Replicas: 1

Address         Rack        Status State   Load            Owns                Token
                                                                               57715295010532946864463892271081778854
10.223.14.57    rack1       Up     Normal  93.06 KB        49.28%              141555886663081320436455748965948652071
10.174.123.131  rack1       Up     Normal  59.73 KB        33.44%              28311611028231080169766921879398209884
10.196.0.207    rack1       Up     Normal  9.43 KB         17.28%              57715295010532946864463892271081778854

Finally, on one of the servers, the cassandra-cli tool is used to update the replication factor of the titan-keyspace.

apache-cassandra-1.2.3$ bin/cassandra-cli -h 10.174.123.131

[default@unknown] update keyspace titan with strategy_options = {replication_factor:3};
a3b7e1a3-4a88-3769-8c5e-90cda4fec0e1
[default@unknown] show schema titan;
create keyspace titan
  with placement_strategy = 'SimpleStrategy'
  and strategy_options = {replication_factor : 3}
  and durable_writes = true;

Roman Server Room With a replication factor of 3, each of the 3 Titan Servers are the primary host of approximately one-third of the vertices in the graph while, at the same time, each maintains a replica of the primary data of the other two servers. In this way, a highly-available, master-master setup is rendered. With this model, there is no single point of failure. If one of the database machines goes down, the other two are able to serve the primary data of the dead machine. If two of the machines go down, the remaining machine can serve data — albeit not with the same throughput possible when all three machines are available. With full master-master replication, the graph is duplicated and each server can support both reads and writes to the graph.

Titan Clustered

Roman Forum The following summer, the prophecy of the Oracle of Delphi comes true. An announcement is made in the Roman forum about the utility of the online genealogy application. Immediately, the plebeians of Rome join the site. They feverishly add their family histories and traverse the graph to learn more about their genetic past. This spike in usage puts an excessive amount of strain on the servers. With so many concurrent users, the three server machines have their CPU and disk I/O peaked trying to process requests.

Titan Clustered To remedy the situation, 6 more Titan Server machines are added to the cluster for a total of 9 machines. The token ring is rebalanced to ensure that each server maintains a relatively equal amount of the graph. A perfect/fair partition of 2^128 into 9 parts is below (see token ring calculator).

0
18904575940052135809661593108510408704
37809151880104271619323186217020817408
56713727820156407428984779325531226112
75618303760208543238646372434041634816
94522879700260688493040931281842470912
113427455640312814857969558651062452224
132332031580364960112364117498863288320
151236607520417086477292744868083269632

Each machine has its token updated using the following nodetool command. By repartitioning the token ring, the 3 original servers transfer their data to the newly on-boarded servers in order to distributed the data load as specified by their location in the 128-bit token space (each vertex hashes to a particular 128-bit token).

apache-cassandra-1.2.3$ bin/nodetool -h 10.223.14.57 move 0
apache-cassandra-1.2.3$ bin/nodetool -h 10.174.123.131 move 18904575940052135809661593108510408704
apache-cassandra-1.2.3$ bin/nodetool -h 10.196.0.207 move 37809151880104271619323186217020817408
...
..
.

Token Ring Partition

With the replication factor still set to 3, each server does not maintain a full replica of the graph. Instead, each server only replicates a third of the full graph (3/9). At this point, no single server has a full picture of the graph. However, because there are more servers, more transactions can be served and more data can be stored. Augustus and Tiberius have successfully grown their single-user graph application to a distributed system that stores and processes a massive genealogy graph represented across a cluster of Titan Server machines.

Conclusion

Titan Head Titan was developed from the outset to support OLTP distributed graph storage and processing. While it is important that a graph database can scale indefinitely, less than 1% of applications written today will ever leverage near trillion edge graphs. The other 99% of applications will store and process million and billion edge graphs. Titan is able to meet the requirements of both segments of the graph application space. Furthermore, Titan scales gracefully as developers move from a single server prototype, to a highly-available production system, to ultimately, a fully distributed cluster sustaining the size and workload requirements seen by 1% of applications.

Acknowledgements

Stephen Mallette and Blake Eggleston are the developers of Rexster’s RexPro. Their efforts were a driving force behind the development of Titan Server.

Authors


Marko A. Rodriguez Matthias Broecheler

Polyglot Persistence and Query with Gremlin

Gremlin Data Lab

Complex data storage architectures are not typically grounded to a single database. In these environments, data is highly disparate, which means that it exists in many forms, is aggregated and duplicated at different levels, and in the worst case, the meaning of the data is not clearly understood. Environments featuring disparate data can present challenges to those seeking to integrate it for purposes of analytics, ETL (Extract-Transform-Load) and other business services. Having easy ways to work with data across these types of environments enables the rapid engineering of data solutions.

Some causes for data disparity rise from the need to store data in different database types, so as to take advantage of the specific benefits that each type exposes. Some examples of different database types include (please see Getting and Putting Data from a Database):

  • Relational Database: A relational database, such as MySQL, Oracle or Microsoft SQL Server, organizes data into tables with rows and columns, using a schema to help govern data integrity.
  • Document Store: A document-oriented database such as MongoDB, CouchDB, or RavenDB, organizes data into the concept of a document, which is typically semi-structured as nested maps and encoded to some format such as JSON.
  • Graph Database: A graph is a data structure that organizes data into the concepts of vertices and edges. Vertices might be thought of as “dots” and edges might be thought of as “lines”, where the lines connect those dots via some relationship. Graphs represent a very natural way to model real-world relationships between different entities. Examples of graph databases are Titan, Neo4j, OrientDB, Dex and InfiniteGraph.

Gremlin is a domain specific language (DSL) for traversing graphs. It is built using the metaprogramming facilities of Groovy, a dynamic programming language for the Java Virtual Machine (JVM). In the same way that Gremlin adds upon Groovy, Groovy adds upon Java, by providing an extended API and programmatic shortcuts that can cut down on the verbosity of Java itself.

Gremlin Gremlin comes equipped with a terminal, also known as a REPL or CLI, which provides an interface through which the programmer can interactively traverse the graph. Given Gremlin’s role as a DSL for graphs, performing interactions with a graph represent the typical usage of the terminal. However, given that the Gremlin terminal is actually a Groovy terminal, the full power of Groovy is available as well:

  • Access to the full APIs for Java and Groovy
  • Access to external JARs (i.e. 3rd party libraries)
  • Gremlin and Groovy’s syntactic sugar
  • An extensible programming environment via metaprogramming

With these capabilities in hand, Gremlin presents a way to interact with a multi-database environment with great efficiency. The following sections detail two different use cases, where Gremlin acts as an ad-hoc data workbench for rapid development of integrated database solutions centered around a graph.

Polyglot Persistence

Data Lab Terminal

Loading data to a graph from a different data source might take some careful planning. The formation of a load strategy is highly dependent on the size of the data, its source format, the complexity of the graph schema and other environmental factors. In cases where the complexity of the load is low, such as scenarios where the data set is small and the graph schema simplistic, the load strategy might be to utilize the Gremlin terminal to load the data.

MongoDB as a Data Source

Consider a scenario where the source data resides in MongoDB. The source data itself contains information which indicates a “follows” relationship between two users, similar to the concept of a user following another user on Twitter. Unlike graphs, document stores, such as MongoDB, do not maintain a notion of linked objects and therefore make it difficult to represent the network of users for analytical purposes.

The MongoDB data model consists of databases and collections, where a database is a set of collections and a collection contains a set of documents. The data for these “follows” relationships resides in a database called “network” and is in a collection called “follows.” The individual documents in that collection look like this:

{ "_id" : ObjectId("4ff74c4ae4b01be7d54cb2d3"), "followed" : "1", "followedBy" : "3", "createdAt" : ISODate("2013-01-01T20:36:26.804Z") }
{ "_id" : ObjectId("4ff74c58e4b01be7d54cb2d4"), "followed" : "2", "followedBy" : "3", "createdAt" : ISODate("2013-01-15T20:36:40.211Z") }
{ "_id" : ObjectId("4ff74d13e4b01be7d54cb2dd"), "followed" : "1", "followedBy" : "2", "createdAt" : ISODate("2013-01-07T20:39:47.283Z") }

This kind of data set translates easily to a graph structure. The following diagram expresses how the document data in MongoDB would be expressed as a graph. Follows Graph

To begin the graph loading process, the Gremlin terminal needs to have access to a client library for MongoDB. GMongo is just such a library and provides an expressive syntax for working with MongoDB in Groovy. The GMongo jar file and its dependency, the Mongo Java Driver jar, must be placed in the GREMLIN_HOME/lib directory. With those files in place, start Gremlin with:

GREMLIN_HOME/bin/gremlin.sh

Gremlin automatically imports a number of classes during its initialization process. The GMongo classes will not be part of those default imports. Classes from external libraries must be explicitly imported before they can be utilized. The following code demonstrates the import of GMongo into the terminal session and then the initialization of connectivity to the running MongoDB “network” database.

gremlin> import com.gmongo.GMongo
==>import com.tinkerpop.gremlin.*
...
==>import com.gmongo.GMongo
gremlin> mongo = new GMongo()    
==>com.gmongo.GMongo@6d1e7cc6
gremlin> db = mongo.getDB("network")
==>network

At this point, it is possible to issue any number of MongoDB commands to bring that data into the terminal.

gremlin> db.follows.findOne().followed
==>followed=1
gremlin> db.follows.find().limit(1)         
==>{ "_id" : { "$oid" : "4ff74c4ae4b01be7d54cb2d3"} , "followed" : "1" , "followedBy" : "3" , "createdAt" : { "$date" : "2013-01-01T20:36:26.804Z"}}

The steps for loading the data to a Blueprints-enabled graph (in this case, a local Titan instance) are as follows.

gremlin> g = TitanFactory.open('/tmp/titan')              
==>titangraph[local:/tmp/titan]
gremlin> // first grab the unique list of user identifiers
gremlin> x=[] as Set; db.follows.find().each{x.add(it.followed); x.add(it.followedBy)}
gremlin> x
==>1
==>3
==>2
gremlin> // create a vertex for the unique list of users
gremlin> x.each{g.addVertex(it)}
==>1
==>3
==>2
gremlin> // load the edges
gremlin> db.follows.find().each{g.addEdge(g.v(it.followedBy),g.v(it.followed),'follows',[followsTime:it.createdAt.getTime()])} 
gremlin> g.V
==>v[1]
==>v[3]
==>v[2]
gremlin> g.E
==>e[2][2-follows->1]
==>e[1][3-follows>2]
==>e[0][3-follows->1]
gremlin> g.e(2).map
==>{followsTime=1341607187283} 

This method for graph-related ETL is lightweight and low-effort, making it a fit for a variety of use cases that stem from the need to quickly get data into a graph for ad-hoc analysis.

MySQL as a Data Source

MySQL

The process for extracting data from MySQL is not so different from MongoDB. Assume that the same “follows” data is in MySQL in a four column table called “follows.”

id followed followed_by created_at
10001 1 3 2013-01-01T20:36:26.804Z
10002 2 3 2013-01-15T20:36:40.211Z
10003 1 2 2013-01-07T20:39:47.283Z

Aside from some field name formatting changes and the “id” column being a long value as opposed to a MongoDB identifier, the data is the same as the previous example and has the same problems for network analytics as MongoDB did.

Groovy SQL is straightforward in its approach to accessing data over JDBC. To make use of it inside of the Gremlin terminal, the MySQL JDBC driver jar file must be placed in the GREMLIN_HOME/lib directory. Once that file is in place, start the Gremlin terminal and execute the following commands:

gremlin> import groovy.sql.Sql
...
gremlin> sql = Sql.newInstance("jdbc:mysql://localhost/network", "username","password", "com.mysql.jdbc.Driver")
...
gremlin> g = TitanFactory.open('/tmp/titan')              
==>titangraph[local:/tmp/titan]
gremlin> // first grab the unique list of user identifiers
gremlin> x=[] as Set; sql.eachRow("select * from follows"){x.add(it.followed); x.add(it.followed_by)}
gremlin> x
==>1
==>3
==>2
gremlin> // create a vertex for the unique list of users
gremlin> x.each{g.addVertex(it)}
==>1
==>3
==>2
gremlin> // load the edges
gremlin> sql.eachRow("select * from follows"){g.addEdge(g.v(it.followed_by),g.v(it.followed),'follows',[followsTime:it.created_at.getTime()])} 
gremlin> g.V
==>v[1]
==>v[3]
==>v[2]
gremlin> g.E
==>e[2][2-follows->1]
==>e[1][3-follows>2]
==>e[0][3-follows->1]
gremlin> g.e(2).map
==>{followsTime=1341607187283}

Aside from some data access API differences, there is little separating the script to load the data from MongoDB and the script to load data from MySQL. Both examples demonstrate options for data integration that carry little cost and effort.

Polyglot Queries

A graph database is likely accompanied by other data sources, which together represent the total data strategy for an organization. With a graph established and populated with data, engineers and scientists can utilize the Gremlin terminal to query the graph and develop algorithms that will become the basis for future application services. An issue arises when the graph does not contain all the data that the Gremlin user needs to do their work.

In these cases, it is possible to use the Gremlin terminal to execute what can be thought of as a polyglot query. A polyglot query blends data together from a variety of data sources and data storage types to produce a single result set. The concept of the polyglot query can be demonstrated by extending upon the last scenario where “follows” data was migrated to a graph from MongoDB. Assume that there is another collection in MongoDB called “profiles”, which contains the user demographics data, such as name, age, etc. Using the Gremlin terminal, this “missing data” can be made part of the analysis.

gremlin> // a simple query within the graph
gremlin> g.v(1).in    
==>v[3]
==>v[2]
gremlin> // a polyglot query that incorporates data from the graph and MongoDB
gremlin> g.v(1).in.transform{[userId:it.id,userName:db.profiles.findOne(uid:it.id).name]}
==>{userId=3, userName=willis}
==>{userId=2, userName=arnold}

The first Gremlin statement above represents a one-step traversal, which simply asks to see the users who follow vertex “1.” Although it is now clear how many users follow this vertex, the results are not terribly meaningful. It is only a list of vertex identifiers and given the example thus far, there is no way to expand those results as that data is representative of the total data in the graph. To really understand these results, it would be good to grab the name of the user from the “profile” collection in MongoDB and blend that attribute into the output. The second line of Gremlin, the polyglot query, looks to do just that. It expands that limited view of the data by performing the same traversal and then reaching out to MongoDB to find the user’s name in the “profile” collection.

Polyglot Query

The anatomy of the polyglot query is as such:

  • g.v(1).in – get the incoming vertices to vertex 1
  • transform{...} – for each incoming vertex, process it with a closure that produces a map (i.e. set of key/value pairs) for each vertex
  • [userId:it.id, - use the “id” of the vertex as the value of the “userId” key in the map
  • userName:db.profiles.findOne(uId:it.id).name] – blend in the user’s name by querying MongoDB with findOne() to look up a “profile” document in MongoDB, grabbing the value of the “name” key from that document and making that the value of the “userName” field in the output

With the name of the users included in the results, the final output becomes more user friendly, perhaps allowing greater insights to surface.

Conclusion

Loading data to the graph and gathering data not available in the graph itself are two examples of the flexibility of the Gremlin terminal, but other use cases exist.

  • Write the output of an algorithm to a file or database for ad-hoc analysis in other tools like Microsoft Excel, R or Business Intelligence reporting tools.
  • Read text-based data files from the file system (e.g. CSV files) to generate graph data.
  • Traversals that build in-memory maps of significant size could benefit from using MapDB, which has Map implementations backed by disk or off-heap memory.
  • Validate traversals and algorithms before committing to a particular design, by building a small “throwaway” graph from a subset of external data that is relevant to what will be tested. This approach is also relevant to basic ad-hoc analysis of data that may not yet be in a graph, but would benefit from a graph data structure and the related toolsets available.
  • Not all graph data requires a graph database. Gremlin supports GraphML, GraphSON, and GML as file-based graph formats. They can be readily inserted into an in-memory TinkerGraph. Utilize Gremlin to analyze these graphs using path expressions in ways not possible with typical graph analysis tools like iGraph, NetworkX, JUNG, etc.
  • “Data debugging” is possible given Gremlin rapid turnaround between query and result. Traversing the graph to make sure the data was loaded correctly from the Gremlin terminal, is important for ensuring that the data was properly curated.
  • Access to data need not be limited to locally accessible files and databases. The same techniques for writing and reading data to and from those resources can be applied to third-party web services and other APIs, using Groovy’s HTTPBuilder.
  • Pull data into a graph to output as GraphML or other format, which can be visualized in Cytoscape, Gephi or other graph visualization tools.

Gremlin Running The power and flexibility of Gremlin and Groovy make it possible to seamlessly interact with disparate data. This capability enables analysts, engineers and scientists to utilize the Gremlin terminal as a lightweight workbench in a lab of data, making it possible to do rapid, ad-hoc analysis centered around graph data structures. Moreover, as algorithms are discovered, designed and tested, those Gremlin traversals can ultimately be deployed into the production system.

Authors


Stephen Mallette Marko A. Rodriguez

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

This is an archival repost of a blog post that was originally published on Hortonworks’ blog.

The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

HDP Monitor In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. For a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play within HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph — though growing). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500].name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount.cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information, so that computational resources are not wasted loading needless aspects of the graph. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class).order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Faunus Provides Big Graph Data Analytics

Faunus is an Apache 2 licensed distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster. Faunus makes global graph scans efficient because it leverages sequential disk reads/writes in concert with various on-disk compression techniques. Moreover, for non-enumerative calculations, Faunus is able to linearly scale in the face of combinatorial explosions. To substantiate these aforementioned claims, this post presents a series of analyses using a graph representation of Wikipedia (as provided by DBpedia version 3.7). The DBpedia knowledge graph is stored in a 7 m1.xlarge Titan/HBase Amazon EC2 cluster and then batch processed using Faunus/Hadoop. Within the Aurelius Graph Cluster, Faunus provides Big Graph Data analytics.

Ingesting DBpedia into Titan

The DBpedia knowledge base currently describes 3.77 million things, out of which 2.35 million are classified in a consistent Ontology, including 764,000 persons, 573,000 places (including 387,000 populated places), 333,000 creative works (including 112,000 music albums, 72,000 films and 18,000 video games), 192,000 organizations (including 45,000 companies and 42,000 educational institutions), 202,000 species and 5,500 diseases. (via DBpedia.org)

DBpedia is a Linked Data effort focused on providing a machine-consumable representation of Wikipedia. The n-triple format distributed by DBpedia can be easily mapped to the property graph model supported by many graph computing systems including Faunus. The data is ingested into a 7 m1.xlarge Titan/HBase cluster on Amazon EC2 using the BatchGraph wrapper of the Blueprints graph API.

Faunus’ Integration with Titan

On each region server in the Titan/HBase cluster there exists a Hadoop datanode and task tracker. Faunus uses Hadoop to execute breadth-first representations of Gremlin queries/traversals by compiling them down to a chain of MapReduce jobs. Next, Hadoop’s SequenceFile format serves as the intermediate HDFS data format between jobs (i.e. traversal steps). Within the SequenceFile, Faunus leverages compression techniques such as variable-width encoding and prefix compression schemes to ensure a small HDFS footprint. Global analyses of the graph can execute more quickly than what is possible from a graph database such as Titan as the SequenceFile format does not maintain the data structures necessary for random read/write access and, because of its immutable nature, can more easily be laid sequentially on disk.

ubuntu@ip-10-140-13-228:~/faunus$ bin/gremlin.sh 

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-hbase.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g.getProperties()
==>faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
==>faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
==>faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
==>faunus.output.location=dbpedia
==>faunus.output.location.overwrite=true
gremlin> g._() 
12/11/09 15:17:45 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/11/09 15:17:45 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.IdentityMap.Map]
12/11/09 15:17:50 INFO mapred.JobClient: Running job: job_201211081058_0003
...
gremlin> hdfs.ls()
==>rwxr-xr-x ubuntu supergroup 0 (D) dbpedia
gremlin>

The first step to any repeated analyses of a graph using Faunus is to pull the requisite data from a source location. For the examples in this post, the graph source is Titan/HBase. In the code snippet above, the identity function is evaluated which simply maps the Titan/HBase representation of DBpedia over to an HDFS SequenceFile (g._()). This process takes approximately 16 minutes. The chart below presents the average number of bytes per minute written to and from the cluster’s disks during two distinct phases of processing.

  1. On the left is the ingestion of the raw DBpedia data into Titan via BatchGraph. Numerous low-volume writes occur over a long period of time.
  2. On the right is Faunus’ mapping of the Titan DBpedia graph to a SequenceFile in HDFS. Fewer high volume reads/writes occur over a shorter period of time.

The plot reiterates the known result that sequential reads from disk are nearly 1.5x faster than random reads from memory and 4-5 orders of magnitude faster than random reads from disk (see The Pathologies of Big Data). Faunus capitalizes on these features of the memory hierarchy so as to ensure rapid full graph scans.

Faunus’ Dataflows within HDFS: Graph and SideEffect

Faunus has two parallel data flows: graph and sideeffect. Each MapReduce job reads the graph, mutates it in some way, and then writes it back to HDFS as graph* (or to its ultimate sink location). The most prevalent mutation to graph* is the propagation of traversers (i.e. the state of the computation). The graph SequenceFile encodes not only the graph data, but also computational metadata such as which traversers are at which elements (vertices/edges). Other mutations are more structural in nature like property updates and/or edge creation (e.g. graph rewriting). The second data flow is a step-specific statistic about the graph that is stored in sideeffect*. Side-effects include, for example:

  • aggregates: counts, groups, sets, etc.
  • graph data: element identifiers, properties, labels, etc.
  • traversal data: enumeration of paths.
  • derivations: functional transformations of graph data.
gremlin> g.getProperties()
==>faunus.graph.input.format=org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
==>faunus.input.location=dbpedia/job-0
==>faunus.graph.output.format=com.thinkaurelius.faunus.formats.noop.NoOpOutputFormat
==>faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
==>faunus.output.location=output
==>faunus.output.location.overwrite=true
gremlin> hdfs.ls('dbpedia/job-0')
==>rw-r--r-- ubuntu supergroup 426590846 graph-m-00000
==>rw-r--r-- ubuntu supergroup 160159134 graph-m-00001
...
gremlin> g.E.label.groupCount()
...
gremlin> hdfs.ls('output/job-0')
==>rw-r--r-- ubuntu supergroup 37 sideeffect-r-00000
==>rw-r--r-- ubuntu supergroup 18 sideeffect-r-00001
...
gremlin> hdfs.head('output/job-0')
==>deathplace	144374
==>hasBroader	1463237
==>birthplace	561837
==>page	8824974
==>primarytopic	8824974
==>subject	13610094
==>wikipageredirects	5074113
==>wikiPageExternalLink	6319697
==>wikipagedisambiguates	1004742
==>hasRelated	28748
==>wikipagewikilink	145877010

The Traversal Mechanics of Faunus

It is important to understand how Faunus stores computation within the SequenceFile. When the step g.V is evaluated, a single traverser (a long value of 1) is placed on each vertex in the graph. When count() is evaluated, the number of traversers in the graph are summed together and returned. A similar process occurs for g.E save that a single traverser is added to each edge in the graph.

gremlin> g.V.count()
==>30962172
gremlin> g.E.count()
==>191733800

If the number of traversers at a particular element are required (i.e. a count — above) as oppposed to the specific traverser instances themselves (and their respective path histories — below), then the time it takes to compute a combinatorial computation can scale linearly with the number of MapReduce iterations. The Faunus/Gremlin traversals below count (not enumerate) the number of 0-, 1-, 2-, 3-, 4-, and 5-step paths in the DBpedia graph. Note that the runtimes scale linearly at approximately 15 minutes per traversal step even though the results compound exponentially such that, in the last example, it is determined that there are 251 quadrillion length 5 paths in the DBpedia graph.

gremlin> g.V.count() // 2.5 minutes
==>30962172
gremlin> g.V.out.count() // 17 minutes
==>191733800
gremlin> g.V.out.out.count() // 35 minutes
==>27327666320
gremlin> g.V.out.out.out.count() // 50 minutes
==>5429258407462
gremlin> g.V.out.out.out.out.count() // 70 minutes 
==>1148261617434916
gremlin> g.V.out.out.out.out.out.count() // 85 minutes
==>251818304970074185

While this result might seem outlandish, it is possible to analytically estimate the empirically derived path counts. The average degree of the vertices in the graph is 6, but the total number of 5-step paths is much more sensitive to the connectivity of high degree vertices. When analyzing only the top 25% most connected vertices — the 200k vertices shown in red below the blue line — the average degree is 260. This yields an estimated path count of:

This number is consistent with the actual 5-path count calculated by Faunus. Both the computed and analytic result demonstrate a feature of natural graphs that all graph analysts should be aware of — combinatorial explosions abound (see Loopy Lattices).

gremlin> g.V.sideEffect('{it.outDegree = it.outE.count()}').outDegree.groupCount()
12/11/11 18:36:16 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
...
==>1001	6
==>101	4547
==>1016	10
==>1022	5
==>1037	9
gremlin>

Conclusion

Faunus is a freely available, Apache 2 licensed, distributed graph analytics engine. It is currently in its 0.1-alpha stage with a 0.1 release planned for Winter 2012/2013. Faunus serves as one of the OLAP components of the Aurelius Graph Cluster.

In the world of graph computing, no one solution will meet all computing needs. Titan supports use cases in which thousands of concurrent users are executing short, ego-centric traversals over a single massive-scale graph. Faunus, on the other hand, supports global traversals of the graph in uses cases such as offline data science and/or production-oriented batch processing. Finally, Fulgora will serve as an in-memory graph processor for heavily threaded, iterative graph and machine learning algorithms. Together, the Aurelius Graph Cluster provides integrated solution coverage to various graph computing problems.

Related Material

Jacobs, A., “The Pathologies of Big Data,” Communications of the ACM, 7(6), July 2009.

Norton, B., Rodriguez, M.A., “Loopy Lattices,” Aurelius Blog, April 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Lin, J., Schatz, M., “Design Patterns for Efficient Graph Algorithms in MapReduce,” Mining and Learning with Graphs Proceedings, 2010.

Authors


Marko A. Rodriguez Stephen Mallette Vadas Gintautas Matthias Broecheler

Follow

Get every new post delivered to your Inbox.

Join 115 other followers