Educating the Planet with Pearson

Titan: Distributed Graph Database Pearson is striving to accomplish the ambitious goal of providing an education to anyone, anywhere on the planet. New data processing technologies and theories in education are moving much of the learning experience into the digital space — into massive open online courses (MOOCs). Two years ago Pearson contacted Aurelius about applying graph theory and network science to this burgeoning space. A prototype proved promising in that it added novel, automated intelligence to the online education experience. However, at the time, there did not exist scalable, open-source graph database technology in the market. It was then that Titan was forged in order to meet the requirement of representing all universities, students, their resources, courses, etc. within a single, unified graph. Moreover, beyond representation, the graph needed to be able to support sub-second, complex graph traversals (i.e. queries) while sustaining at least 1 billion transactions a day. Pearson asked Aurelius a simple question: “Can Titan be used to educate the planet?” This post is Aurelius’ answer.

Data Loading Benchmark

Pearson Education Pearson provides free online education through its OpenClass platform. OpenClass is currently in beta with adoption by ~7000 institutions. To meet the expected growth beyond beta, it is necessary to build the platform on a scalable database system. OpenClass Moreover, it is important to build the platform on a database system that can support advanced algorithms beyond simple get/put-semantics. The latter was demonstrated via the initial prototype. To the former, a simulation of a worldwide education environment was created in Titan to alleviate Pearson’s scalability concerns.

Number of students 3.47 billion
Number of teachers 183 million
Number of courses 788 million
Number of universities 1.2 million
Number of concepts 9 thousand
Number of educational artifacts ~1.5 billion
Total number of vertices 6.24 billion
Total number of edges 121 billion

The simulated world was a graph containing 6.24 billion vertices and 121 billion edges. The edges represent students enrolled in courses, people discussing content, content referencing concepts, teachers teaching courses, material contained in activity streams, universities offering courses, and so forth. Various techniques were leveraged to ensure that the generated data was consistent with a real-world instance. For example, people names were generated from sampling the cross product of the first and last names in the US Census Bureau dataset. Gaussian distributions were applied to determine how many courses a student should be enrolled in (mean of 8) and how many courses a teacher should teach (mean of 4). The course names and descriptions were drawn from the raw MIT OpenCourseWare data dumps. Course names were appended with tokens such as “101,” “1B,” “Advanced,” etc. in order to increase the diversity of the offerings. Student comments in discussions were sampled snippets of text from the electronic books provided by Project Gutenberg. University names were generated from publicly available city name and location datasets in the CommonHubData project. Finally, concepts were linked to materials using OpenCalais. The final raw education dataset was 10 terabytes in size.

A 121 billion edge graph is too large to fit within the confines of a single machine. Fortunately, Titan/Cassandra is a distributed graph database able to represent a graph across a multi-machine cluster. The Amazon EC2 cluster utilized for the simulation was composed of 16 hi1.4xlarge machines. The specification of the machines is itemized below.

  • 60.5 GiB of memory
  • 35 EC2 Compute Units (16 virtual cores and 64-bit)
  • 2 SSD-based volumes each with 1024 GB of instance storage
  • I/O Performance: Very High (10 Gigabit Ethernet)

Educating the Planet - Disk Writes The 10 terabyte, 121 billion edge graph was loaded into the cluster in 1.48 days at a rate of approximately 1.2 million edges a second with 0 failed transactions. These numbers were possible due to new developments in Titan 0.3.0 whereby graph partitioning is achieved using a domain-based byte order partitioner. With respect to education, the dataset maintains a structure where most edges are inter-university rather than intra-university (i.e. students and teachers typically interact with others at their own university, and even more so within their own courses). As such, domain partitioning is possible where vertices within the university (i.e. graph community) are more likely to be co-located on the same physical machine.

Once in Titan, the raw 10 terabyte dataset was transformed to 5 terabytes due to data agnostic Snappy compression and the use of Titan-specific graph compression techniques (e.g. “id deltas,” type definitions, and Kryo serialization). After the data was loaded, it was immediately backed up to Amazon S3. This process took 1.4 hours using Titan’s parallel backup strategy. The nodetool statistics for the Titan/Cassandra cluster are provided below.

Address         Rack    Status   State    Load        Token
10.244.196.10   rack1   Up       Normal   329.44 GB   Token(bytes[c000000000000000])
10.244.194.142  rack1   Up       Normal   348.62 GB   Token(bytes[3000000000000000])
10.244.196.111  rack1   Up       Normal   330.86 GB   Token(bytes[b000000000000000])
10.244.195.155  rack1   Up       Normal   333.57 GB   Token(bytes[a000000000000000])
10.244.195.243  rack1   Up       Normal   330.91 GB   Token(bytes[9000000000000000])
10.244.195.209  rack1   Up       Normal   326.57 GB   Token(bytes[f000000000000000])
10.244.195.93   rack1   Up       Normal   355.26 GB   Token(bytes[4000000000000000])
10.244.195.179  rack1   Up       Normal   325.73 GB   Token(bytes[e000000000000000])
10.244.195.57   rack1   Up       Normal   351.47 GB   Token(bytes[1000000000000000])
10.244.196.27   rack1   Up       Normal   332.87 GB   Token(bytes[d000000000000000])
10.244.196.93   rack1   Up       Normal   351.81 GB   Token(bytes[2000000000000000])
10.244.195.6    rack1   Up       Normal   331.56 GB   Token(bytes[8000000000000000])
10.244.195.7    rack1   Up       Normal   327.55 GB   Token(bytes[0000000000000000])
10.244.196.84   rack1   Up       Normal   345.2 GB    Token(bytes[5000000000000000])
10.244.195.8    rack1   Up       Normal   351.26 GB   Token(bytes[6000000000000000])
10.244.194.178  rack1   Up       Normal   338.07 GB   Token(bytes[7000000000000000])

Transactional Benchmark

Titan Cluster and Application Servers The purpose of the second half of the experiment was to subject the 121 billion edge education graph to numerous concurrent transactions. These transactions simulate users interacting with the graph — solving educational problems, adding more content, discussing ideas with one another, etc. To put the 16 hi1.4xlarge cluster under heavy load, 80 m1.medium machines were spawned. These 80 machines simulate the application servers querying the graph and providing users the front-end experience. Each machine maintained 30 threads in a “while(true)-loop,” randomly selecting 1 of 16 transactional templates below and executing it. Thus, 2,400 concurrent threads communicated with the 16 hi1.4xlarge Titan cluster. A review of the Gremlin queries and their mean runtimes with standard deviations are presented in the table below. Note that many of the transactions are “complex” in that they embody a series of actions taken by a user and thus, in a real-world setting, these would typically be broken up into smaller behavioral units (i.e. multiple individual transactions).

name             # of tx    avg (ms)   std dev   description
scommentread     25550909   211.07     45.56     student reads the most recent comments for their courses
reccourse        5149469    467.37     178.20    students gets recommended courses to take
scommentshare    12825909   394.15     57.98     student reads comments in courses and shares a comment
scontent         20567687   279.32     81.83     student retrieves all content for a single course in their course list
saddfollow       12826912   193.72     22.77     student follows another student
scourses         7720965    233.38     79.44     student retrieves a list of all their courses with description
classmates       12849769   96.962     22.27     student retrieves a list of all their classmates
sprofile         7689669    53.740     22.61     student retrieves their profile
recfriend2       5178945    155.75     44.60     student is recommended people to follow (version 1)
scourseactivity  10371133   565.76     189.80    student retrieves the top 10 most recent activities in their courses
scommentadd      5182558    281.90     44.326    student reads course comments and comments at some depth in the discussion tree
recfriend1       5189435    241.33     256.48    student is recommended people to follow (version 2)
sreshare         12850473   284.07     68.20     student reads their stream and shares an item with followers
ssharelink       5140363    261.58     35.75     student shares a link with followers
sdiscussadd      2604696    246.35     34.64     student browses courses and then adds a new discussion topic to a course
sstream          76301001   224.93     84.48     student reads their personal stream     

The transactional benchmark ran for 6.25 hours and executed 228 million complex transactions at a rate of 10,267 tx/sec. Provided the consistent behavior over those 6.25 hours, it is inferred that Titan can serve ~887 million transactions a day. Given the complex nature of the transactions, a real-world production system of this form should be able to sustain greater than 1 billion transactions a day.

Conclusion

Titan: Distributed Graph Database A first attempt at this benchmark was done at the turn of the year 2012. That benchmark was for a 260 billion edge graph (7 billion students) constructed using the same graph generation techniques described in this post. The graph data was loaded, but the transactional benchmark was never executed because the requisite cluster architecture could not be fulfilled by Amazon EC2 (all 32 high I/O machines in the same placement group). Thus, the smaller-scale benchmark presented here was what was possible given EC2′s resources at the time.

While this benchmark demonstrates that Titan can handle the data scale and transactional load of “99%” of applications developed today, Aurelius is not sufficiently satisfied with the presented transactional results. By reducing the transactional load to ~5,000 tx/sec, the runtimes of the queries dropped to the 100 millisecond range. The desired result was to achieve ~100ms query times at 10k tx/sec and 250ms at 25k tx/sec. Fortunately, much was learned about Titan/Cassandra during this exploration. Advances in both inter-cluster connection management and machine-aware traversal forwarding are currently in development in order to reach the desired scaling goals.

A scalable graph database is only half the story — the story presented here. Once scaling has been accomplished, the next problem is yielding knowledge from data. For the last 2 years, Aurelius has been working with Pearson to develop novel algorithms for the education space that move beyond the typical activities seen in current web-systems: profiles, streams, recommendations, etc. Online education is ripe for automating much of the intelligence currently contributed by students and teachers. With a collective graphical model, Aurelius’ algorithms move beyond creating a dynamic interface to providing computational human augmentation. These techniques generalize and should prove fruitful to other domains.

Acknowledgements

This report was made possible due to funding provided by Pearson Education. Over the 5 months that this benchmark was developed, the following individuals provided support: Pavel Yaskevich (Cassandra tuning and Titan development) and Stephen Mallette/Blake Eggleston (RexPro development and testing). Various issues in Cassandra were identified at scale and the Cassandra community readily accepted patches and quickly released newer versions in support of the effort. Note that Titan is an open source, Apache2 licensed graph database. The Titan community has supported the project via patch submissions and testing across use cases and cluster configurations. Finally, Steve Hill of Pearson has done much to ensure Titan’s success — pushing Aurelius to answer the question: “Can Titan educate the planet?”

Authors

Matthias Broecheler Dan LaRocque Marko A. Rodriguez

Titan Server: From a Single Server to a Highly Available Cluster

Titan Growth Titan is a distributed graph database capable of storing graphs on the order of hundreds of billions of edges while, at the same time, supporting billions of real-time graph traversals a day. For most graph applications, the high-end performance aspects of Titan will never be reached. This does not mean that Titan is unsuitable for graph applications at the smaller scale — in the billions of edges and below. The purpose of this post is to introduce Titan from the perspective of a team of engineers developing a new graph-based application. These engineers will initially develop and test their codebase using a single Titan Server. When the application matures and is ready for production use, a highly-available setup is deployed. Finally, as the application becomes more popular and the data size and transactional load increases, a fully distributed cluster is leveraged. Growing a Titan database from a single server to a cluster is simply a matter of configuration. In this way, Titan gracefully scales to accommodate the changing requirements of a graph application.

Titan Single Server

Titan Single MachineAugustus and Tiberius are two software engineers who have designed an application that represents the Gods and people of Rome within a graph of familial relationships — a genealogy application. The intention is that Roman scholars will use the application to better understand the social fabric of their great Empire. While the intention is single-user, the two engineers decide to leverage Titan as the backend graph database. For one, Titan is completely free for any use (Apache 2 licensed) and two, it supports both single server and distributed deployments. The latter is important to them because the Greek Oracle of Delphi foretold that a genealogy graph would one day be used online by everyone throughout the Roman Empire.

$ wget http://s3.thinkaurelius.com/downloads/titan/titan-cassandra-0.3.0.zip
$ unzip titan-cassandra-0.3.0.zip
$ cd titan-cassandra-0.3.0
$ sudo bin/titan.sh config/titan-server-rexster.xml config/titan-server-cassandra.properties
13/03/27 12:40:32 INFO service.CassandraDaemon: JVM vendor/version: Java HotSpot(TM) 64-Bit Server VM/1.7.0_12-ea
13/03/27 12:40:32 INFO service.CassandraDaemon: Heap size: 40566784/477233152
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Loading settings from file:/Users/marko/software/aurelius/titan/config/cassandra.yaml
13/03/27 12:40:32 INFO config.DatabaseDescriptor: Global memtable threshold is enabled at 151MB
13/03/27 12:40:32 INFO service.CacheService: Initializing key cache with capacity of 2 MBs.
13/03/27 12:40:35 INFO server.RexProRexsterServer: RexPro serving on port: [8184]
13/03/27 12:40:35 INFO server.HttpRexsterServer: Rexster Server running on: [http://localhost:8182]
13/03/27 12:40:35 INFO server.ShutdownManager: Bound shutdown socket to /127.0.0.1:8183. Starting listener thread for shutdown requests.
...

Users without wget can use curl -O or download from the Titan download page.

The above sequence of 4 shell commands downloads and starts up a Titan Server on the localhost. Titan Server embeds both Cassandra and (a lightweight version of) Rexster within the same JVM. Titan Server exposes the following language-agnostic endpoints for developers to communicate with the graph:

  1. A RESTful endpoint available at http://localhost:8182/graphs.
  2. A RexPro binary protocol endpoint available on port 8184.

Titan HTTP/RexPro

Titan Server is configured via two primary files: titan-server-rexster.xml (shown below) and cassandra.yaml (discussed in the next section). These files are located in the config/ directory of the titan-cassandra-x.y.z distribution.

<rexster>
    <http>
        <server-port>8182</server-port>
        <server-host>0.0.0.0</server-host>
        <base-uri>http://localhost</base-uri>
        <character-set>UTF-8</character-set>
        <enable-jmx>false</enable-jmx>
        <max-post-size>2097152</max-post-size>
        <max-header-size>8192</max-header-size>
        <upload-timeout-millis>30000</upload-timeout-millis>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </http>
    <rexpro>
        <server-port>8184</server-port>
        <server-host>0.0.0.0</server-host>
        <session-max-idle>1790000</session-max-idle>
        <session-check-interval>3000000</session-check-interval>
        <connection-max-idle>180000</connection-max-idle>
        <connection-check-interval>3000000</connection-check-interval>
        <enable-jmx>false</enable-jmx>
        <thread-pool>
            <worker>
                <core-size>8</core-size>
                <max-size>8</max-size>
            </worker>
            <kernal>
                <core-size>4</core-size>
                <max-size>4</max-size>
            </kernal>
        </thread-pool>
        <io-strategy>leader-follower</io-strategy>
    </rexpro>
    <security>
        <authentication>
            <type>none</type>
        </authentication>
    </security>
    <shutdown-port>8183</shutdown-port>
    <shutdown-host>127.0.0.1</shutdown-host>
</rexster>

NOTE: Along with the above endpoints, Titan Server also exposes a JVM native serialization interface that can be used by JVM languages. This interface, for example, is the means by which Faunus/Hadoop interacts with Titan Server for global graph analytics. For more information on this endpoint, see Using Cassandra.

Titan Highly Available

The genealogy application was showing promise as a single-user system for studying the genetic history of the Roman people and Gods. Due to the positive response, Augustus and Tiberius decide that a multi-user online genealogy service would be a successful product.

// how many siblings did jupiter have?
g.V('name','jupiter').out('brother','sister').count() 
// who is caesar's grandmother?
g.V('name','caesar').out('mother').out('mother').name 
// who are marcus' children's in-laws?
g.V('name','marcus').in('father').has('gender','M').out('married').out('father','mother').name 

As it currently stands, the genealogy data set is approximately 1 billion edges. Therefore, it can be stored and processed on a single machine. As a single-user application a single Titan Server suffices. However, with multiple users, it is important that the system is robust and can serve numerous concurrent requests. If the application is only backed by a single server, then if that server goes down, the application is unusable. To ensure 1.) no single point of failure and 2.) support for more transactions per second, Augustus and Tiberius deploy 3 machines each with a Titan Server installed.

Titan Highly Available The team updates the default config/cassandra.yaml file of each Titan Server by changing the localhost property value to be the IP address of the machine and adding a seed IP address for discoverability (see Multinode Cluster). Next, they start each Titan Server one after the other (titan.sh). To ensure that the servers properly clustered together, they use Cassandra’s nodetool.

apache-cassandra-1.2.3$ bin/nodetool ring

Datacenter: datacenter1
==========
Replicas: 1

Address         Rack        Status State   Load            Owns                Token
                                                                               57715295010532946864463892271081778854
10.223.14.57    rack1       Up     Normal  93.06 KB        49.28%              141555886663081320436455748965948652071
10.174.123.131  rack1       Up     Normal  59.73 KB        33.44%              28311611028231080169766921879398209884
10.196.0.207    rack1       Up     Normal  9.43 KB         17.28%              57715295010532946864463892271081778854

Finally, on one of the servers, the cassandra-cli tool is used to update the replication factor of the titan-keyspace.

apache-cassandra-1.2.3$ bin/cassandra-cli -h 10.174.123.131

[default@unknown] update keyspace titan with strategy_options = {replication_factor:3};
a3b7e1a3-4a88-3769-8c5e-90cda4fec0e1
[default@unknown] show schema titan;
create keyspace titan
  with placement_strategy = 'SimpleStrategy'
  and strategy_options = {replication_factor : 3}
  and durable_writes = true;

Roman Server Room With a replication factor of 3, each of the 3 Titan Servers are the primary host of approximately one-third of the vertices in the graph while, at the same time, each maintains a replica of the primary data of the other two servers. In this way, a highly-available, master-master setup is rendered. With this model, there is no single point of failure. If one of the database machines goes down, the other two are able to serve the primary data of the dead machine. If two of the machines go down, the remaining machine can serve data — albeit not with the same throughput possible when all three machines are available. With full master-master replication, the graph is duplicated and each server can support both reads and writes to the graph.

Titan Clustered

Roman Forum The following summer, the prophecy of the Oracle of Delphi comes true. An announcement is made in the Roman forum about the utility of the online genealogy application. Immediately, the plebeians of Rome join the site. They feverishly add their family histories and traverse the graph to learn more about their genetic past. This spike in usage puts an excessive amount of strain on the servers. With so many concurrent users, the three server machines have their CPU and disk I/O peaked trying to process requests.

Titan Clustered To remedy the situation, 6 more Titan Server machines are added to the cluster for a total of 9 machines. The token ring is rebalanced to ensure that each server maintains a relatively equal amount of the graph. A perfect/fair partition of 2^128 into 9 parts is below (see token ring calculator).

0
18904575940052135809661593108510408704
37809151880104271619323186217020817408
56713727820156407428984779325531226112
75618303760208543238646372434041634816
94522879700260688493040931281842470912
113427455640312814857969558651062452224
132332031580364960112364117498863288320
151236607520417086477292744868083269632

Each machine has its token updated using the following nodetool command. By repartitioning the token ring, the 3 original servers transfer their data to the newly on-boarded servers in order to distributed the data load as specified by their location in the 128-bit token space (each vertex hashes to a particular 128-bit token).

apache-cassandra-1.2.3$ bin/nodetool -h 10.223.14.57 move 0
apache-cassandra-1.2.3$ bin/nodetool -h 10.174.123.131 move 18904575940052135809661593108510408704
apache-cassandra-1.2.3$ bin/nodetool -h 10.196.0.207 move 37809151880104271619323186217020817408
...
..
.

Token Ring Partition

With the replication factor still set to 3, each server does not maintain a full replica of the graph. Instead, each server only replicates a third of the full graph (3/9). At this point, no single server has a full picture of the graph. However, because there are more servers, more transactions can be served and more data can be stored. Augustus and Tiberius have successfully grown their single-user graph application to a distributed system that stores and processes a massive genealogy graph represented across a cluster of Titan Server machines.

Conclusion

Titan Head Titan was developed from the outset to support OLTP distributed graph storage and processing. While it is important that a graph database can scale indefinitely, less than 1% of applications written today will ever leverage near trillion edge graphs. The other 99% of applications will store and process million and billion edge graphs. Titan is able to meet the requirements of both segments of the graph application space. Furthermore, Titan scales gracefully as developers move from a single server prototype, to a highly-available production system, to ultimately, a fully distributed cluster sustaining the size and workload requirements seen by 1% of applications.

Acknowledgements

Stephen Mallette and Blake Eggleston are the developers of Rexster’s RexPro. Their efforts were a driving force behind the development of Titan Server.

Authors


Marko A. Rodriguez Matthias Broecheler

Polyglot Persistence and Query with Gremlin

Gremlin Data Lab

Complex data storage architectures are not typically grounded to a single database. In these environments, data is highly disparate, which means that it exists in many forms, is aggregated and duplicated at different levels, and in the worst case, the meaning of the data is not clearly understood. Environments featuring disparate data can present challenges to those seeking to integrate it for purposes of analytics, ETL (Extract-Transform-Load) and other business services. Having easy ways to work with data across these types of environments enables the rapid engineering of data solutions.

Some causes for data disparity rise from the need to store data in different database types, so as to take advantage of the specific benefits that each type exposes. Some examples of different database types include (please see Getting and Putting Data from a Database):

  • Relational Database: A relational database, such as MySQL, Oracle or Microsoft SQL Server, organizes data into tables with rows and columns, using a schema to help govern data integrity.
  • Document Store: A document-oriented database such as MongoDB, CouchDB, or RavenDB, organizes data into the concept of a document, which is typically semi-structured as nested maps and encoded to some format such as JSON.
  • Graph Database: A graph is a data structure that organizes data into the concepts of vertices and edges. Vertices might be thought of as “dots” and edges might be thought of as “lines”, where the lines connect those dots via some relationship. Graphs represent a very natural way to model real-world relationships between different entities. Examples of graph databases are Titan, Neo4j, OrientDB, Dex and InfiniteGraph.

Gremlin is a domain specific language (DSL) for traversing graphs. It is built using the metaprogramming facilities of Groovy, a dynamic programming language for the Java Virtual Machine (JVM). In the same way that Gremlin adds upon Groovy, Groovy adds upon Java, by providing an extended API and programmatic shortcuts that can cut down on the verbosity of Java itself.

Gremlin Gremlin comes equipped with a terminal, also known as a REPL or CLI, which provides an interface through which the programmer can interactively traverse the graph. Given Gremlin’s role as a DSL for graphs, performing interactions with a graph represent the typical usage of the terminal. However, given that the Gremlin terminal is actually a Groovy terminal, the full power of Groovy is available as well:

  • Access to the full APIs for Java and Groovy
  • Access to external JARs (i.e. 3rd party libraries)
  • Gremlin and Groovy’s syntactic sugar
  • An extensible programming environment via metaprogramming

With these capabilities in hand, Gremlin presents a way to interact with a multi-database environment with great efficiency. The following sections detail two different use cases, where Gremlin acts as an ad-hoc data workbench for rapid development of integrated database solutions centered around a graph.

Polyglot Persistence

Data Lab Terminal

Loading data to a graph from a different data source might take some careful planning. The formation of a load strategy is highly dependent on the size of the data, its source format, the complexity of the graph schema and other environmental factors. In cases where the complexity of the load is low, such as scenarios where the data set is small and the graph schema simplistic, the load strategy might be to utilize the Gremlin terminal to load the data.

MongoDB as a Data Source

Consider a scenario where the source data resides in MongoDB. The source data itself contains information which indicates a “follows” relationship between two users, similar to the concept of a user following another user on Twitter. Unlike graphs, document stores, such as MongoDB, do not maintain a notion of linked objects and therefore make it difficult to represent the network of users for analytical purposes.

The MongoDB data model consists of databases and collections, where a database is a set of collections and a collection contains a set of documents. The data for these “follows” relationships resides in a database called “network” and is in a collection called “follows.” The individual documents in that collection look like this:

{ "_id" : ObjectId("4ff74c4ae4b01be7d54cb2d3"), "followed" : "1", "followedBy" : "3", "createdAt" : ISODate("2013-01-01T20:36:26.804Z") }
{ "_id" : ObjectId("4ff74c58e4b01be7d54cb2d4"), "followed" : "2", "followedBy" : "3", "createdAt" : ISODate("2013-01-15T20:36:40.211Z") }
{ "_id" : ObjectId("4ff74d13e4b01be7d54cb2dd"), "followed" : "1", "followedBy" : "2", "createdAt" : ISODate("2013-01-07T20:39:47.283Z") }

This kind of data set translates easily to a graph structure. The following diagram expresses how the document data in MongoDB would be expressed as a graph. Follows Graph

To begin the graph loading process, the Gremlin terminal needs to have access to a client library for MongoDB. GMongo is just such a library and provides an expressive syntax for working with MongoDB in Groovy. The GMongo jar file and its dependency, the Mongo Java Driver jar, must be placed in the GREMLIN_HOME/lib directory. With those files in place, start Gremlin with:

GREMLIN_HOME/bin/gremlin.sh

Gremlin automatically imports a number of classes during its initialization process. The GMongo classes will not be part of those default imports. Classes from external libraries must be explicitly imported before they can be utilized. The following code demonstrates the import of GMongo into the terminal session and then the initialization of connectivity to the running MongoDB “network” database.

gremlin> import com.gmongo.GMongo
==>import com.tinkerpop.gremlin.*
...
==>import com.gmongo.GMongo
gremlin> mongo = new GMongo()    
==>com.gmongo.GMongo@6d1e7cc6
gremlin> db = mongo.getDB("network")
==>network

At this point, it is possible to issue any number of MongoDB commands to bring that data into the terminal.

gremlin> db.follows.findOne().followed
==>followed=1
gremlin> db.follows.find().limit(1)         
==>{ "_id" : { "$oid" : "4ff74c4ae4b01be7d54cb2d3"} , "followed" : "1" , "followedBy" : "3" , "createdAt" : { "$date" : "2013-01-01T20:36:26.804Z"}}

The steps for loading the data to a Blueprints-enabled graph (in this case, a local Titan instance) are as follows.

gremlin> g = TitanFactory.open('/tmp/titan')              
==>titangraph[local:/tmp/titan]
gremlin> // first grab the unique list of user identifiers
gremlin> x=[] as Set; db.follows.find().each{x.add(it.followed); x.add(it.followedBy)}
gremlin> x
==>1
==>3
==>2
gremlin> // create a vertex for the unique list of users
gremlin> x.each{g.addVertex(it)}
==>1
==>3
==>2
gremlin> // load the edges
gremlin> db.follows.find().each{g.addEdge(g.v(it.followedBy),g.v(it.followed),'follows',[followsTime:it.createdAt.getTime()])} 
gremlin> g.V
==>v[1]
==>v[3]
==>v[2]
gremlin> g.E
==>e[2][2-follows->1]
==>e[1][3-follows>2]
==>e[0][3-follows->1]
gremlin> g.e(2).map
==>{followsTime=1341607187283} 

This method for graph-related ETL is lightweight and low-effort, making it a fit for a variety of use cases that stem from the need to quickly get data into a graph for ad-hoc analysis.

MySQL as a Data Source

MySQL

The process for extracting data from MySQL is not so different from MongoDB. Assume that the same “follows” data is in MySQL in a four column table called “follows.”

id followed followed_by created_at
10001 1 3 2013-01-01T20:36:26.804Z
10002 2 3 2013-01-15T20:36:40.211Z
10003 1 2 2013-01-07T20:39:47.283Z

Aside from some field name formatting changes and the “id” column being a long value as opposed to a MongoDB identifier, the data is the same as the previous example and has the same problems for network analytics as MongoDB did.

Groovy SQL is straightforward in its approach to accessing data over JDBC. To make use of it inside of the Gremlin terminal, the MySQL JDBC driver jar file must be placed in the GREMLIN_HOME/lib directory. Once that file is in place, start the Gremlin terminal and execute the following commands:

gremlin> import groovy.sql.Sql
...
gremlin> sql = Sql.newInstance("jdbc:mysql://localhost/network", "username","password", "com.mysql.jdbc.Driver")
...
gremlin> g = TitanFactory.open('/tmp/titan')              
==>titangraph[local:/tmp/titan]
gremlin> // first grab the unique list of user identifiers
gremlin> x=[] as Set; sql.eachRow("select * from follows"){x.add(it.followed); x.add(it.followed_by)}
gremlin> x
==>1
==>3
==>2
gremlin> // create a vertex for the unique list of users
gremlin> x.each{g.addVertex(it)}
==>1
==>3
==>2
gremlin> // load the edges
gremlin> sql.eachRow("select * from follows"){g.addEdge(g.v(it.followed_by),g.v(it.followed),'follows',[followsTime:it.created_at.getTime()])} 
gremlin> g.V
==>v[1]
==>v[3]
==>v[2]
gremlin> g.E
==>e[2][2-follows->1]
==>e[1][3-follows>2]
==>e[0][3-follows->1]
gremlin> g.e(2).map
==>{followsTime=1341607187283}

Aside from some data access API differences, there is little separating the script to load the data from MongoDB and the script to load data from MySQL. Both examples demonstrate options for data integration that carry little cost and effort.

Polyglot Queries

A graph database is likely accompanied by other data sources, which together represent the total data strategy for an organization. With a graph established and populated with data, engineers and scientists can utilize the Gremlin terminal to query the graph and develop algorithms that will become the basis for future application services. An issue arises when the graph does not contain all the data that the Gremlin user needs to do their work.

In these cases, it is possible to use the Gremlin terminal to execute what can be thought of as a polyglot query. A polyglot query blends data together from a variety of data sources and data storage types to produce a single result set. The concept of the polyglot query can be demonstrated by extending upon the last scenario where “follows” data was migrated to a graph from MongoDB. Assume that there is another collection in MongoDB called “profiles”, which contains the user demographics data, such as name, age, etc. Using the Gremlin terminal, this “missing data” can be made part of the analysis.

gremlin> // a simple query within the graph
gremlin> g.v(1).in    
==>v[3]
==>v[2]
gremlin> // a polyglot query that incorporates data from the graph and MongoDB
gremlin> g.v(1).in.transform{[userId:it.id,userName:db.profiles.findOne(uid:it.id).name]}
==>{userId=3, userName=willis}
==>{userId=2, userName=arnold}

The first Gremlin statement above represents a one-step traversal, which simply asks to see the users who follow vertex “1.” Although it is now clear how many users follow this vertex, the results are not terribly meaningful. It is only a list of vertex identifiers and given the example thus far, there is no way to expand those results as that data is representative of the total data in the graph. To really understand these results, it would be good to grab the name of the user from the “profile” collection in MongoDB and blend that attribute into the output. The second line of Gremlin, the polyglot query, looks to do just that. It expands that limited view of the data by performing the same traversal and then reaching out to MongoDB to find the user’s name in the “profile” collection.

Polyglot Query

The anatomy of the polyglot query is as such:

  • g.v(1).in – get the incoming vertices to vertex 1
  • transform{...} – for each incoming vertex, process it with a closure that produces a map (i.e. set of key/value pairs) for each vertex
  • [userId:it.id, - use the “id” of the vertex as the value of the “userId” key in the map
  • userName:db.profiles.findOne(uId:it.id).name] – blend in the user’s name by querying MongoDB with findOne() to look up a “profile” document in MongoDB, grabbing the value of the “name” key from that document and making that the value of the “userName” field in the output

With the name of the users included in the results, the final output becomes more user friendly, perhaps allowing greater insights to surface.

Conclusion

Loading data to the graph and gathering data not available in the graph itself are two examples of the flexibility of the Gremlin terminal, but other use cases exist.

  • Write the output of an algorithm to a file or database for ad-hoc analysis in other tools like Microsoft Excel, R or Business Intelligence reporting tools.
  • Read text-based data files from the file system (e.g. CSV files) to generate graph data.
  • Traversals that build in-memory maps of significant size could benefit from using MapDB, which has Map implementations backed by disk or off-heap memory.
  • Validate traversals and algorithms before committing to a particular design, by building a small “throwaway” graph from a subset of external data that is relevant to what will be tested. This approach is also relevant to basic ad-hoc analysis of data that may not yet be in a graph, but would benefit from a graph data structure and the related toolsets available.
  • Not all graph data requires a graph database. Gremlin supports GraphML, GraphSON, and GML as file-based graph formats. They can be readily inserted into an in-memory TinkerGraph. Utilize Gremlin to analyze these graphs using path expressions in ways not possible with typical graph analysis tools like iGraph, NetworkX, JUNG, etc.
  • “Data debugging” is possible given Gremlin rapid turnaround between query and result. Traversing the graph to make sure the data was loaded correctly from the Gremlin terminal, is important for ensuring that the data was properly curated.
  • Access to data need not be limited to locally accessible files and databases. The same techniques for writing and reading data to and from those resources can be applied to third-party web services and other APIs, using Groovy’s HTTPBuilder.
  • Pull data into a graph to output as GraphML or other format, which can be visualized in Cytoscape, Gephi or other graph visualization tools.

Gremlin Running The power and flexibility of Gremlin and Groovy make it possible to seamlessly interact with disparate data. This capability enables analysts, engineers and scientists to utilize the Gremlin terminal as a lightweight workbench in a lab of data, making it possible to do rapid, ad-hoc analysis centered around graph data structures. Moreover, as algorithms are discovered, designed and tested, those Gremlin traversals can ultimately be deployed into the production system.

Authors


Stephen Mallette Marko A. Rodriguez

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

This is an archival repost of a blog post that was originally published on Hortonworks’ blog.

The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

HDP Monitor In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. For a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play within HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph — though growing). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500].name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount.cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information, so that computational resources are not wasted loading needless aspects of the graph. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class).order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Faunus Provides Big Graph Data Analytics

Faunus is an Apache 2 licensed distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster. Faunus makes global graph scans efficient because it leverages sequential disk reads/writes in concert with various on-disk compression techniques. Moreover, for non-enumerative calculations, Faunus is able to linearly scale in the face of combinatorial explosions. To substantiate these aforementioned claims, this post presents a series of analyses using a graph representation of Wikipedia (as provided by DBpedia version 3.7). The DBpedia knowledge graph is stored in a 7 m1.xlarge Titan/HBase Amazon EC2 cluster and then batch processed using Faunus/Hadoop. Within the Aurelius Graph Cluster, Faunus provides Big Graph Data analytics.

Ingesting DBpedia into Titan

The DBpedia knowledge base currently describes 3.77 million things, out of which 2.35 million are classified in a consistent Ontology, including 764,000 persons, 573,000 places (including 387,000 populated places), 333,000 creative works (including 112,000 music albums, 72,000 films and 18,000 video games), 192,000 organizations (including 45,000 companies and 42,000 educational institutions), 202,000 species and 5,500 diseases. (via DBpedia.org)

DBpedia is a Linked Data effort focused on providing a machine-consumable representation of Wikipedia. The n-triple format distributed by DBpedia can be easily mapped to the property graph model supported by many graph computing systems including Faunus. The data is ingested into a 7 m1.xlarge Titan/HBase cluster on Amazon EC2 using the BatchGraph wrapper of the Blueprints graph API.

Faunus’ Integration with Titan

On each region server in the Titan/HBase cluster there exists a Hadoop datanode and task tracker. Faunus uses Hadoop to execute breadth-first representations of Gremlin queries/traversals by compiling them down to a chain of MapReduce jobs. Next, Hadoop’s SequenceFile format serves as the intermediate HDFS data format between jobs (i.e. traversal steps). Within the SequenceFile, Faunus leverages compression techniques such as variable-width encoding and prefix compression schemes to ensure a small HDFS footprint. Global analyses of the graph can execute more quickly than what is possible from a graph database such as Titan as the SequenceFile format does not maintain the data structures necessary for random read/write access and, because of its immutable nature, can more easily be laid sequentially on disk.

ubuntu@ip-10-140-13-228:~/faunus$ bin/gremlin.sh 

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-hbase.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g.getProperties()
==>faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
==>faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
==>faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
==>faunus.output.location=dbpedia
==>faunus.output.location.overwrite=true
gremlin> g._() 
12/11/09 15:17:45 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/11/09 15:17:45 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.IdentityMap.Map]
12/11/09 15:17:50 INFO mapred.JobClient: Running job: job_201211081058_0003
...
gremlin> hdfs.ls()
==>rwxr-xr-x ubuntu supergroup 0 (D) dbpedia
gremlin>

The first step to any repeated analyses of a graph using Faunus is to pull the requisite data from a source location. For the examples in this post, the graph source is Titan/HBase. In the code snippet above, the identity function is evaluated which simply maps the Titan/HBase representation of DBpedia over to an HDFS SequenceFile (g._()). This process takes approximately 16 minutes. The chart below presents the average number of bytes per minute written to and from the cluster’s disks during two distinct phases of processing.

  1. On the left is the ingestion of the raw DBpedia data into Titan via BatchGraph. Numerous low-volume writes occur over a long period of time.
  2. On the right is Faunus’ mapping of the Titan DBpedia graph to a SequenceFile in HDFS. Fewer high volume reads/writes occur over a shorter period of time.

The plot reiterates the known result that sequential reads from disk are nearly 1.5x faster than random reads from memory and 4-5 orders of magnitude faster than random reads from disk (see The Pathologies of Big Data). Faunus capitalizes on these features of the memory hierarchy so as to ensure rapid full graph scans.

Faunus’ Dataflows within HDFS: Graph and SideEffect

Faunus has two parallel data flows: graph and sideeffect. Each MapReduce job reads the graph, mutates it in some way, and then writes it back to HDFS as graph* (or to its ultimate sink location). The most prevalent mutation to graph* is the propagation of traversers (i.e. the state of the computation). The graph SequenceFile encodes not only the graph data, but also computational metadata such as which traversers are at which elements (vertices/edges). Other mutations are more structural in nature like property updates and/or edge creation (e.g. graph rewriting). The second data flow is a step-specific statistic about the graph that is stored in sideeffect*. Side-effects include, for example:

  • aggregates: counts, groups, sets, etc.
  • graph data: element identifiers, properties, labels, etc.
  • traversal data: enumeration of paths.
  • derivations: functional transformations of graph data.
gremlin> g.getProperties()
==>faunus.graph.input.format=org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
==>faunus.input.location=dbpedia/job-0
==>faunus.graph.output.format=com.thinkaurelius.faunus.formats.noop.NoOpOutputFormat
==>faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
==>faunus.output.location=output
==>faunus.output.location.overwrite=true
gremlin> hdfs.ls('dbpedia/job-0')
==>rw-r--r-- ubuntu supergroup 426590846 graph-m-00000
==>rw-r--r-- ubuntu supergroup 160159134 graph-m-00001
...
gremlin> g.E.label.groupCount()
...
gremlin> hdfs.ls('output/job-0')
==>rw-r--r-- ubuntu supergroup 37 sideeffect-r-00000
==>rw-r--r-- ubuntu supergroup 18 sideeffect-r-00001
...
gremlin> hdfs.head('output/job-0')
==>deathplace	144374
==>hasBroader	1463237
==>birthplace	561837
==>page	8824974
==>primarytopic	8824974
==>subject	13610094
==>wikipageredirects	5074113
==>wikiPageExternalLink	6319697
==>wikipagedisambiguates	1004742
==>hasRelated	28748
==>wikipagewikilink	145877010

The Traversal Mechanics of Faunus

It is important to understand how Faunus stores computation within the SequenceFile. When the step g.V is evaluated, a single traverser (a long value of 1) is placed on each vertex in the graph. When count() is evaluated, the number of traversers in the graph are summed together and returned. A similar process occurs for g.E save that a single traverser is added to each edge in the graph.

gremlin> g.V.count()
==>30962172
gremlin> g.E.count()
==>191733800

If the number of traversers at a particular element are required (i.e. a count — above) as oppposed to the specific traverser instances themselves (and their respective path histories — below), then the time it takes to compute a combinatorial computation can scale linearly with the number of MapReduce iterations. The Faunus/Gremlin traversals below count (not enumerate) the number of 0-, 1-, 2-, 3-, 4-, and 5-step paths in the DBpedia graph. Note that the runtimes scale linearly at approximately 15 minutes per traversal step even though the results compound exponentially such that, in the last example, it is determined that there are 251 quadrillion length 5 paths in the DBpedia graph.

gremlin> g.V.count() // 2.5 minutes
==>30962172
gremlin> g.V.out.count() // 17 minutes
==>191733800
gremlin> g.V.out.out.count() // 35 minutes
==>27327666320
gremlin> g.V.out.out.out.count() // 50 minutes
==>5429258407462
gremlin> g.V.out.out.out.out.count() // 70 minutes 
==>1148261617434916
gremlin> g.V.out.out.out.out.out.count() // 85 minutes
==>251818304970074185

While this result might seem outlandish, it is possible to analytically estimate the empirically derived path counts. The average degree of the vertices in the graph is 6, but the total number of 5-step paths is much more sensitive to the connectivity of high degree vertices. When analyzing only the top 25% most connected vertices — the 200k vertices shown in red below the blue line — the average degree is 260. This yields an estimated path count of:

This number is consistent with the actual 5-path count calculated by Faunus. Both the computed and analytic result demonstrate a feature of natural graphs that all graph analysts should be aware of — combinatorial explosions abound (see Loopy Lattices).

gremlin> g.V.sideEffect('{it.outDegree = it.outE.count()}').outDegree.groupCount()
12/11/11 18:36:16 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
...
==>1001	6
==>101	4547
==>1016	10
==>1022	5
==>1037	9
gremlin>

Conclusion

Faunus is a freely available, Apache 2 licensed, distributed graph analytics engine. It is currently in its 0.1-alpha stage with a 0.1 release planned for Winter 2012/2013. Faunus serves as one of the OLAP components of the Aurelius Graph Cluster.

In the world of graph computing, no one solution will meet all computing needs. Titan supports use cases in which thousands of concurrent users are executing short, ego-centric traversals over a single massive-scale graph. Faunus, on the other hand, supports global traversals of the graph in uses cases such as offline data science and/or production-oriented batch processing. Finally, Fulgora will serve as an in-memory graph processor for heavily threaded, iterative graph and machine learning algorithms. Together, the Aurelius Graph Cluster provides integrated solution coverage to various graph computing problems.

Related Material

Jacobs, A., “The Pathologies of Big Data,” Communications of the ACM, 7(6), July 2009.

Norton, B., Rodriguez, M.A., “Loopy Lattices,” Aurelius Blog, April 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Lin, J., Schatz, M., “Design Patterns for Efficient Graph Algorithms in MapReduce,” Mining and Learning with Graphs Proceedings, 2010.

Authors


Marko A. Rodriguez Stephen Mallette Vadas Gintautas Matthias Broecheler

A Solution to the Supernode Problem

In graph theory and network science, a “supernode” is a vertex with a disproportionately high number of incident edges. While supernodes are rare in natural graphs (as statistically demonstrated with power-law degree distributions), they show up frequently during graph analysis. The reason being is that supernodes are connected to so many other vertices that they exist on numerous paths in the graph. Therefore, an arbitrary traversal is likely to touch a supernode. In graph computing, supernodes can lead to system performance problems. Fortunately, for property graphs, there is a theoretical and applied solution to this problem.

Supernodes in the Real-World

Peer-to-Peer File Sharing

At the turn of the millenium, online file sharing was being supported by services like Napster and Gnutella. Unlike Napster, Gnutella is a true peer-to-peer system in that it has no central file index. Instead, a client’s search is sent to its adjacent clients. If those clients don’t have the file, then the request propagates to their adjacent clients, so forth and so on. As in any natural graph, a supernode is only a few steps away. Therefore, in many peer-to-peer networks, supernode clients are quickly inundated with search requests and in turn, a DoS is effected.

Social Network Celebrities

President Barack Obama currently has 21,322,866 followers on Twitter. When Obama tweets, that tweet must register in the activity streams of 21+ million accounts. The Barack Obama vertex is considered a supernode. As an opposing example, when Stephen Mallette tweets, only 59 streams need to be updated. Twitter realizes this discrepancy and maintains different mechanisms for handling “the Obamas” (i.e. the celebrities) and “the Stephens” (i.e. the plebeians) of the Twitter-sphere.

Blueprints and Vertex Queries

Blueprints is a Java interface for graph-based software. Various graph databases, in-memory graph engines, and batch-analytics frameworks make use of Blueprints. In June 2012, Blueprints 2.x was released with support for “vertex queries.” A vertex query is best explained with an example.

Suppose there is a vertex named Dan. Incident to Dan are 1,110 edges. These edges denote the people Dan knows (10 edges), the things he likes (100 edges), and the tweets he has tweeted (1000 edges). If Dan wants a list of all the people he knows and incident edges are not indexed by label, then Dan would have to iterate through all 1,110 edges to find the 10 people he knew. However, if Dan’s edges are indexed by edge label, then a lookup into a hash on knows would immediately yield the 10 people — O(n) vs. O(1), where n is the number of edges incident to Dan.

The idea of partitioning edges by discriminating qualities can be taken a step further in property graphs. Property graphs support key/value pairs on vertices and edges. For example, a knows-edge can have a type-property with possible values of “work,” “family,” and “favorite” and a since property specifying when the relationship began. Similarly, likes-edges can have a 1-to-5 rating-property and tweet-edges can have a timestamp denoting when the tweet was tweeted. Blueprints’ Query allows the developer to specify contraints on the incident edges to be retrieved. For example, to get all of Dan’s highly rated items, the following Blueprints code is evaluated.

dan.query().labels("likes").interval("rating",4,6).vertices()

Titan and Vertex-Centric Indices

Blueprints only provides the interface for representing vertex queries. It is up to the underlying graph system to use the specified constraints to their advantage. The distributed graph database Titan makes extensive use of vertex-centric indices for fine-grained retrieval of edge data from both disk and memory. To demonstrate the effectiveness of these indices, a benchmark is provided using Titan/BerkeleyDB (an ACID variant of Titan — see Titan’s storage overview).

10 Titan/BerkeleyDB instances are created with a person-vertex named Dan. 5 of those instances have vertex-centric indices, and 5 do not. Each of the 5 instances per type have a variable number of edges incident to Dan. These numbers are provided below.

total incident edges knows-edges likes-edges tweets-edges
111 1 10 100
1,110 10 100 1000
11,100 100 1000 10000
111,000 1000 10000 100000
1,110,000 10000 100000 1000000

The Gremlin/Groovy script to generate the aforementioned star-graphs is provided below, where i is the variable defining the size of the resultant graph.

g = TitanFactory.open('/tmp/supernode')
// index configuration snippet goes here for Titan w/ vertex-centric indices
g.createKeyIndex('name',Vertex.class)
g.addVertex([name:'dan'])
  
r = new Random(100)
types = ['work','family','favorite']
(1..i).each{g.addEdge(g.V('name','dan').next(),g.addVertex(),'knows',[type:types.get(r.nextInt(3)),since:it]); stopTx(g,it)}
(1..(i*10)).each{g.addEdge(g.V('name','dan').next(),g.addVertex(),'likes',[rating:r.nextInt(5)]); stopTx(g,it)}
(1..(i*100)).each{g.addEdge(g.V('name','dan').next(),g.addVertex(),'tweets',[time:it]); stopTx(g,it)}

For the 5 Titan/BerkeleyDB instances with vertex-centric indices, the following code fragment was evaluated. This code defines the indices (see Titan’s type configurations).

type = g.makeType().name('type').simple().functional(false).dataType(String.class).makePropertyKey()
since = g.makeType().name('since').simple().functional(false).dataType(Integer.class).makePropertyKey()
rating = g.makeType().name('rating').simple().functional(false).dataType(Integer.class).makePropertyKey()
time = g.makeType().name('time').simple().functional(false).dataType(Integer.class).makePropertyKey()
g.makeType().name('knows').primaryKey(type,since).makeEdgeLabel()
g.makeType().name('likes').primaryKey(rating).makeEdgeLabel()
g.makeType().name('tweets').primaryKey(time).makeEdgeLabel()

Next, three traversals rooted at Dan are presented. The first gets all the people Dan knows of a particular randomly chosen type (e.g. family members). The second returns all of the things that Dan has highly rated (i.e. 4 or 5 star ratings). The third retrieves Dan’s 10 most recent tweets. Finally, note that Gremlin compiles each expression to an appropriate vertex query (see Gremlin’s traversal optimizations).

g.V('name','dan').outE('knows').has('type',types.get(r.nextInt(3)).inV
g.V('name','dan').outE('likes').interval('rating',4,6).inV
g.V('name','dan').outE('tweets').has('time',T.gt,(i*100)-10).inV

The traversals above were each run 25 times with the database restarted after each query in order to demonstrate response times with cold JVM caches. Note that in-memory, warm-cache response times show a similar pattern (albeit relatively faster). The averaged results are plotted below where the y-axis is on a log scale. The green, red, and blue colors denote the first, second and third queries, respectively. Moreover, there is a light and a dark version of each color. The light version is Titan/BerkeleyDB without vertex-centric indices and the dark version is Titan/BerkeleyDB with vertex-centric indices.

Perhaps the most impressive result is the retrieval of Dan’s 10 most recent tweets (blue). With vertex-centric indices (dark blue), as the number of Dan’s tweets grow to 1 million, the time it takes to get the top 10 stays constant at around 1.5 milliseconds. Without indices, this query grows proportionate to the amount of data and ultimately requires 13 seconds to complete (light blue). That is a 4 orders of magnitude difference in response time for the same result set. This example demonstrates how useful vertex-centric indices are for activity stream-type systems.

The plot on the right displays the number of vertices returned by each query over each graph size. As expected, the number of tweets stays constant at 10 while the number of knows and likes vertices retrieved grows proportionate to the growing graphs. While the examples on the same graph (with and without indices) return the same data, getting to that data is faster with vertex-centric indices.

Finally, Titan also supports composite key indices. The graph construction code fragment previous assigns a primary key of both type and since to knows-edges. Therefore, retrieving Dan’s 10 most recent coworkers is more efficient than, in-memory, getting all of Dan’s coworkers and then sorting on since. The interested reader can explore the runtimes of such composite vertex-centric queries by augmenting the provided code snippets.

Conclusion

A supernode is only a problem when the discriminating information between edges is ignored. If all edges are treated equally, then linear O(n) searches through the incident edge set of a vertex are required. However when indices and sort orders are used, O(log(n)) and O(1) lookups can be achieved. The presented results demonstrate 2-5x faster retrievals for the presented knows/likes queries and up to 10,000x faster for the tweets query when vertex-centric indices are employed. Now consider when a traversal is more than a single hop. The runtimes compound in a combinatoric manner. Compounding at 1 millisecond vs 10 seconds leads to astronomical differences in overall traversal runtime.

The graph database Titan can scale to support 100s of billions of edges (via Apache Cassandra and HBase). Vertices with a million+ incident edges are frequent in such massive graphs. In the world of Big Graph Data, it is important to store and retrieve data from disk and memory efficiently. With Titan, edge filtering is pushed down to the disk-level so only requisite data is actually fetched and brought into memory. Vertex-centric queries and indices overcome the supernode problem by intelligently leveraging the label and property information of the edges incident to a vertex.

Related Material

Rodriguez, M.A., Broecheler, M., “Titan: The Rise of Big Graph Data,” Public Lecture at Jive Software, Palo Alto, 2012.

Broecheler, M., LaRocque, D., Rodriguez, M.A., “Titan Provides Real-Time Big Graph Data,” Aurelius Blog, August 2012.

Authors

Matthias Broecheler Marko A. Rodriguez

Deploying the Aurelius Graph Cluster

The Aurelius Graph Cluster is a cluster of interoperable graph technologies that can be deployed on a multi-machine compute cluster. This post demonstrates how to set up the cluster on Amazon EC2 (a popular cloud service provider) with the following graph technologies:

Titan is an Apache2-licensed distributed graph database that leverages existing persistence technologies such as Apache HBase and Cassandra. Titan implements the Blueprints graph API and therefore supports the Gremlin graph traversal/query language. [OLTP]

Faunus is an Apache2-licensed batch analytics, graph computing framework based on Apache Hadoop. Faunus leverages the Blueprints graph API and exposes Gremlin as its traversal/query language. [OLAP]

Please note the date of this publication. There may exist newer versions of the technologies discussed as well as other deployment techniques. Finally, all commands point to an example cluster and any use of the commands should be respective of the specific cluster being computed on.

Cluster Configuration

The examples in this post assume the reader has access to an Amazon EC2 account. The first step is to create a machine instance that has, at minimum, Java 1.6+ on it. This instance is used to spawn the graph cluster. The name given to this instance is agc-master and it is a modest m1.small machine. On agc-master, Apache Whirr 0.8.0 is downloaded and unpacked.

~$ ssh ubuntu@ec2-184-72-209-80.compute-1.amazonaws.com
...
ubuntu@ip-10-117-55-34:~$ wget http://www.apache.org/dist/whirr/whirr-0.8.0/whirr-0.8.0.tar.gz
ubuntu@ip-10-117-55-34:~$ tar -xzf whirr-0.8.0.tar.gz

Whirr is a cloud service agnostic tool that simplifies the creation and destruction of a compute cluster. A Whirr “recipe” (i.e. a properties file) describes the machines in a cluster and their respective services. The recipe used in this post is provided below and saved to a text file named agc.properties on agc-master. The recipe defines a 5 m1.large machine cluster containing HBase 0.94.1 and Hadoop 1.0.3 (see whirr.instance-templates). HBase will serve as the database persistance engine for Titan and Hadoop will serve as the batch computing engine for Faunus.

whirr.cluster-name=agc
whirr.instance-templates=1 zookeeper+hadoop-namenode+hadoop-jobtracker+hbase-master,4 hadoop-datanode+hadoop-tasktracker+hbase-regionserver
whirr.provider=aws-ec2
whirr.identity=${env:AWS_ACCESS_KEY_ID}
whirr.credential=${env:AWS_SECRET_ACCESS_KEY}
whirr.hardware-id=m1.large
whirr.image-id=us-east-1/ami-da0cf8b3
whirr.location-id=us-east-1
whirr.hbase.tarball.url=http://archive.apache.org/dist/hbase/hbase-0.94.1/hbase-0.94.1.tar.gz
whirr.hadoop.tarball.url=http://archive.apache.org/dist/hadoop/core/hadoop-1.0.3/hadoop-1.0.3.tar.gz
hbase-site.dfs.replication=2

From agc-master, the following commands will launch the previously described cluster. Note that the first two lines require specific Amazon EC2 account information. When the launch completes, the Amazon EC2 web admin console will show the 5 m1.large machines.

ubuntu@ip-10-117-55-34:~$ export AWS_ACCESS_KEY_ID= # requires account specific information
ubuntu@ip-10-117-55-34:~$ export AWS_SECRET_ACCESS_KEY= # requires account specific information
ubuntu@ip-10-117-55-34:~$ ssh-keygen -t rsa -P ''
ubuntu@ip-10-117-55-34:~$ whirr-0.8.0/bin/whirr launch-cluster --config agc.properties

The deployed cluster is diagrammed on the right where each machine maintains its respective software services. The sections to follow will demonstrate how to load and then process graph data within the cluster. Titan will serve as the data source for Faunus’ batch analytic jobs.

Loading Graph Data into Titan

Titan is a highly scalable, distributed graph database that leverages existing persistence engines. Titan 0.1.0 supports Apache Cassandra (AP), Apache HBase (CP), and Oracle BerkeleyDB (CA). Each of these backends emphasizes a different aspect of the CAP theorem. For the purpose of this post, Apache HBase is utilized and therefore, Titan is consistent (C) and partitioned (P).

For the sake of simplicity, the 1 zookeeper+hadoop-namenode+hadoop-jobtracker+hbase-master machine will be used for cluster interactions. The IP address can be found in the Whirr instance metadata on agc-master. The reason for using this machine is that numerous services are already installed on it (e.g. HBase shell, Hadoop, etc.) and therefore, no manual software installation is required on agc-master.

ubuntu@ip-10-117-55-34:~$ more .whirr/agc/instances 
us-east-1/i-3c121b41	zookeeper,hadoop-namenode,hadoop-jobtracker,hbase-master	54.242.14.83	10.12.27.208
us-east-1/i-34121b49	hadoop-datanode,hadoop-tasktracker,hbase-regionserver	184.73.57.182	10.40.23.46
us-east-1/i-38121b45	hadoop-datanode,hadoop-tasktracker,hbase-regionserver	54.242.151.125	10.12.119.135
us-east-1/i-3a121b47	hadoop-datanode,hadoop-tasktracker,hbase-regionserver	184.73.145.69	10.35.63.206
us-east-1/i-3e121b43	hadoop-datanode,hadoop-tasktracker,hbase-regionserver	50.16.174.157	10.224.3.16

Once in the machine via ssh, Titan 0.1.0 is downloaded, unzipped, and the Gremlin console is started.

ubuntu@ip-10-117-55-34:~$ ssh 54.242.14.83
...
ubuntu@ip-10-12-27-208:~$ wget https://github.com/downloads/thinkaurelius/titan/titan-0.1.0.zip
ubuntu@ip-10-12-27-208:~$ sudo apt-get install unzip
ubuntu@ip-10-12-27-208:~$ unzip titan-0.1.0.zip
ubuntu@ip-10-12-27-208:~$ cd titan-0.1.0/
ubuntu@ip-10-12-27-208:~/titan-0.1.0$ bin/gremlin.sh 

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> 

A toy 1 million vertex/edge graph is loaded into Titan using the Gremlin/Groovy script below (simply cut-and-paste the source into the Gremlin console and wait approximately 3 minutes). The code implements a preferential attachment algorithm. For an explanation of this algorithm, please see the second column of page 33 in Mark Newman‘s article The Structure and Function of Complex Networks.

// connect Titan to HBase in batch loading mode
conf = new BaseConfiguration()
conf.setProperty('storage.backend','hbase')
conf.setProperty('storage.hostname','localhost')
conf.setProperty('storage.batch-loading','true');
g = TitanFactory.open(conf)

// preferentially attach a growing vertex set
size = 1000000; ids = [g.addVertex().id]; rand = new Random();
(1..size).each{
  v = g.addVertex();
  u = g.v(ids.get(rand.nextInt(ids.size())))
  g.addEdge(v,u,'linked');
  ids.add(u.id);
  ids.add(v.id);
  if(it % 10000 == 0) {
    g.stopTransaction(SUCCESS)
    println it
  }
}; g.shutdown()

Batch Analytics with Faunus

Faunus is a Hadoop-based graph computing framework. It supports performant global graph analyses by making use of sequential reads from disk (see The Pathologies of Big Data). Faunus provides connectivity to Titan/HBase, Titan/Cassandra, any Rexster-fronted graph database, and to text/binary files stored in HDFS. From the 1 zookeeper+hadoop-namenode+hadoop-jobtracker+hbase-master machine, Faunus 0.1-alpha is downloaded and unzipped. The provided titan-hbase.properties file should be updated with hbase.zookeeper.quorum=10.12.27.208 instead of localhost. The IP address 10.12.27.208 is provided by ~/.whirr/agc/instances on agc-master. Finally, the Gremlin console is started.

ubuntu@ip-10-12-27-208:~$ wget https://github.com/downloads/thinkaurelius/faunus/faunus-0.1-alpha.zip
ubuntu@ip-10-12-27-208:~$ unzip faunus-0.1-alpha.zip 
ubuntu@ip-10-12-27-208:~$ cd faunus-0.1-alpha/
ubuntu@ip-10-12-27-208:~/faunus-0.1-alpha$ vi bin/titan-hbase.properties 
ubuntu@ip-10-12-27-208:~/faunus-0.1-alpha$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> 

A few example Faunus jobs are provided below. The final job on line 9 generates an in-degree distribution. The in-degree of a vertex is defined as the number of incoming edges to the vertex. The outputted result states how many vertices (second column) have a particular in-degree (first column). For example, 167,050 vertices have only 1 incoming edge.

gremlin> g = FaunusFactory.open('bin/titan-hbase.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g.V.count() // how many vertices in the graph?
==>1000001
gremlin> g.E.count() // how many edges in the graph?
==>1000000
gremlin> g.V.out.out.out.count() // how many length 3 paths are in the graph?
==>988780
gremlin> g.V.sideEffect('{it.degree = it.inE.count()}').degree.groupCount // what is the graph's in-degree distribution?
==>1	167050
==>10	2305
==>100	6
==>108	3
==>119	3
==>122	3
==>133	1
==>144	2
==>155	1
==>166	2
==>18	471
==>188	1
==>21	306
==>232	1
==>254	1
==>...
gremlin> 

To conclude, the in-degree distribution result is pulled from Hadoop’s HDFS (stored in output/job-0). Next, scp is used to download the file to agc-master and then again to download the file to a local machine (e.g. a laptop). If the local machine has R installed, then the file can be plotted and visualized (see the final diagram below). The log-log plot demonstrates the known result that the preferential attachment algorithm generates a graph with a power-law degree distribution (i.e. “natural statistics”).

ubuntu@ip-10-12-27-208:~$ hadoop fs -getmerge output/job-0 distribution.txt
ubuntu@ip-10-12-27-208:~$ head -n5 distribution.txt 
1	167050
10	2305
100	6
108	3
119	3
ubuntu@ip-10-12-27-208:~$ exit
...
ubuntu@ip-10-117-55-34:~$ scp 54.242.14.83:~/distribution.txt .
ubuntu@ip-10-117-55-34:~$ exit
...
~$ scp ubuntu@ec2-184-72-209-80.compute-1.amazonaws.com:~/distribution.txt .
~$ r
> t = read.table('distribution.txt')
> plot(t,log='xy',xlab='in-degree',ylab='frequency')

Conclusion

The Aurelius Graph Cluster is used for processing massive-scale graphs, where massive-scale denotes a graph so large it does not fit within the resource confines of a single machine. In other words, the Aurelius Graph Cluster is all about Big Graph Data. The two cluster technologies explored in this post were Titan and Faunus. They serve two distinct graph computing needs. Titan supports thousands of concurrent real-time, topologically local graph interactions. Faunus, on the other hand, supports long running, topologically global graph analyses. In other words, they provide OLTP and OLAP functionality, respectively.

References

London, G., “Set Up a Hadoop/HBase Cluster on EC2 in (About) an Hour,” Cloudera Developer Center, October 2012.

Newman, M., “The Structure and Function of Complex Networks,” SIAM Review, volume 45, pages 167-256, 2003.

Jacobs, A., “The Pathologies of Big Data,” ACM Communications, volume 7, number 6, July 2009.

Authors

Marko A. Rodriguez Dan LaRocque

Follow

Get every new post delivered to your Inbox.

Join 81 other followers