Skip to content

Latest commit

 

History

History
334 lines (256 loc) · 9.37 KB

File metadata and controls

334 lines (256 loc) · 9.37 KB

First: Get the sample code! https://github.com/Synata/SparkGraphAnalysis

You'll need to download spark from http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.4.tgz

Unzip it, and just take the spark_assembly jar from lib/

Or, if that link doesn't work, go to: https://spark.apache.org/downloads.html and select Pre-built for Hadoop 2.4.


Make sure Mongo is running at localhost:27017

To start with, we need some data to graph analyze. The easiest way to get some relevant graph data is your email!

We'll be using OAuth / API calls to get the data. You'll need an access token. The easiest way to get your Gmail Access Token is with the Gmail API Playground. Go to: https://developers.google.com/gmail/api/v1/reference/users/messages/list and under "Try It" click the "Authorize Oauth" button. Open the developer console, and find the request that ends with /me - Look for the header:

"Authorization: Bearer <TOKEN>" 

Copy and paste into the command:

The usage is:

sbt "run load <Gmail Access Token>"

This will dump your gmail and load metadata into Mongo.


We're going to be working on the analyzer, so add this to the Analyzer class:

//Add to analyzer class
private def getContext = {
	val driverPort = 7777
	val driverHost = "localhost"
	val master = "local"
	val app = "Spark Graph Analysis"
   
	val conf = new SparkConf() // skip loading external settings
      			.set("spark.logConf", "true")
      			.set("spark.driver.host", s"$driverHost")
      			.set("spark.driver.port", s"$driverPort")
      			.set("spark.cores.max", "2")
      			.set("spark.executor.memory", "4g")
      			.set("spark.eventLog.enabled", "false")
	val context = new SparkContext(master, app, conf)
	
	// Context setup code will go here
	
	context
}

Now, add this to the exec() function.

val context = getContext
try {
	val rdd = context.parallelize(List(1, 2, 3, 4))
	val arr = rdd.collect()
	println("Array: ")
	arr.foreach(m =>
		println(m)
	)
} finally { 
	context.stop()
}

Test that it’s all working!

sbt "run analyze"

Ok, small tricky part - we need to create a distributable jar for spark. We're using an SBT Plugin called Assembly:

sbt assembly

This creates a jar that we'll be using shortly.

Setup the full Spark Context

//Add to the getContext call
val jarFile = "target/scala-2.10/SparkGraphAnalysis-assembly-1.0.jar"
val mongoDriverFile = "lib/mongo-java-driver-2.12.4.jar"
val mongoHadoopFile = "lib/mongo-hadoop-core-1.3.0.jar"

context.addJar(jarFile)
context.addJar(mongoDriverFile)
context.addJar(mongoHadoopFile)

Test it out, make sure it’s finding the jars

sbt "run analyze"

Let’s add some Mongo In: Replace hardcoded rdd-

//Replace the try block of the exec function
val mongoUri = s"mongodb://localhost"
val hadoopConfig = new Configuration()

hadoopConfig.set("mongo.input.uri", s"$mongoUri/graph.data")
hadoopConfig.set("mongo.output.uri", s"$mongoUri/analysis.output")


val rdd = context.newAPIHadoopRDD(hadoopConfig, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

So, the really cool park about spark and graphx, is you can do neat munging. Our goal here is to create a graph of people that we can then analyze, but we’re starting with just a simple list of emails. Let’s start with creating our nodes

First, we need to add a helper function to the analyzer object

  // Add to analyzer object
  def parseObj(obj: BSONObject): Email = {
    val from = if(obj.containsField("From")) obj.get("From").asInstanceOf[BasicDBList].map(_.toString).toList else List()
    val to = if(obj.containsField("To")) obj.get("To").asInstanceOf[BasicDBList].map(_.toString).toList else List()
    val cc = if(obj.containsField("CC")) obj.get("CC").asInstanceOf[BasicDBList].map(_.toString).toList else List()
    val bcc = if(obj.containsField("BCC")) obj.get("BCC").asInstanceOf[BasicDBList].map(_.toString).toList else List()

    val subject = obj.get("Subject").toString
    val id = obj.get("_id").toString
    Email(id, subject, from, to, cc, bcc)
  }

Let's start our analysis effort by simply creating a list of unique email addresses.

//Add to try block of exec function
	val mapped = rdd.map(m => Analyzer.parseObj(m._2))
	val emailListRDD = mapped.flatMap(m => m.from ::: m.to ::: m.cc ::: m.bcc)
			.distinct()
        		.zipWithIndex()
        		.map(m => (m._2, m._1))
        		
        // Just to prove we're doing something - we'll remove this.
	emailListRDD.collect()
		.foreach(m => {
			println(m._2)
		})

Let's try it out, and dump the email addresses:

sbt "run analyze"

Alright, here's where the magic happens! Let's actually make a graph!

First, add the following code to the analyzer object

//Goes in the Analyzer Object
  def parseEdge(obj: Email, lookup: Map[String, Long]): List[Edge[String]] = {
    obj.from.flatMap { f =>
      val fId = lookup(f)
      val to = obj.to.map(t => Edge(fId, lookup(t), "SentEmailTo"))
      val cc = obj.cc.map(t => Edge(fId, lookup(t), "CCed"))
      val bcc = obj.bcc.map(t => Edge(fId, lookup(t), "BCCed"))
      to ::: cc ::: bcc
    }
  }

And add this to the exec() function.

// This goes in the exec function
    val emailLookup = emailListRDD.collect().toMap
    val idLookup = emailLookup.map(m => (m._2, m._1)).toMap

    val edges = mapped.flatMap(m => Analyzer.parseEdge(m, idLookup))

    val g = Graph(emailListRDD, edges)

    Analyzer.outputGraph("MainGraph.gml", g)

Run the analysis, and you'll be able to view your email graph. I use Gephi for visualization.

sbt "run analyze"

Ok, now the magic happens – we’re going to use two out of the box graph analysis techniques with

First, everyone here has heard of pagerank:

//Page Rank
//Add to the analyzer object
def calculatePageRankAndGetSelf(g: Graph[String, String], emailListRDD: RDD[(Long, String)]): (Long, String) = {
    val pageRanked = g.pageRank(0.01).vertices

    val ranked = pageRanked.join(emailListRDD)
      .sortBy(m => m._2._1, ascending = false)

    val prOutput = ranked.take(100) //Only grab the top 100
    prOutput.foreach { m =>
      println(s"${m._2._2}: ${m._2._1}")
    }

    val me = (prOutput.head._1, prOutput.head._2._2)

    println(s"I am: $me")

    me
}
//Add to the exec function
val me = calculatePageRankAndGetSelf(g, emailListRDD)

Let's look at strongly connected components - this is a method for community finding


 //Add to the Analyzer object
def calculateStronglyConnected(me: (Long, String), g: Graph[String, String], emailLookup: Map[Long, String]) = {
    val stronglyConnected = g.filter(m => m, vpred = (id, m: String) => id != me._1).stronglyConnectedComponents(10)

    val strongGraph = stronglyConnected.collectEdges(EdgeDirection.Out)

    strongGraph.collect().foreach { e =>
      val email = emailLookup(e._1)
      println(s"$email is strongly connected to:")
      val connected = e._2.map { inner =>
        if(inner.srcId == e._1) {
          emailLookup(inner.dstId)
        } else {
          emailLookup(inner.srcId)
        }
      }
      connected.distinct.foreach(c => println("\t" + c))
    }
  }
//Add to the exec function
calculateStronglyConnected(me, g, emailLookup)

Finally, let's write our own algorithm

First, we need some way to track messageing counts

//Add near the top of the analyzer file
case class CountTracker(inFromSource: Int, outToSource: Int) {
  def getRatio = if(outToSource == 0) Double.NegativeInfinity else inFromSource.toDouble / outToSource.toDouble
  def total = inFromSource + outToSource
  def incIn: CountTracker = copy(inFromSource + 1, outToSource)
  def incOut: CountTracker = copy(inFromSource, outToSource + 1)
  def merge(toMerge: CountTracker): CountTracker = {
    val out = CountTracker(toMerge.inFromSource + inFromSource, toMerge.outToSource + outToSource)
    out
  }
}
//Add to the Analyzer Object
def calculateCRS(me: (Long, String), g: Graph[String, String], emailListRDD: RDD[(Long, String)]) = {
    //Pregel
    def processMessages = (src: VertexId, currentCount: CountTracker, message: CountTracker) => {
      currentCount.merge(message)
    }

    def sendMessages = (triplet: EdgeTriplet[CountTracker, String]) => {
      if(triplet.srcId == me._1) {
        //Send to dest
        Iterator((triplet.dstId, CountTracker(1, 0)))
      } else if(triplet.dstId == me._1) {
        //Send to src
        Iterator((triplet.srcId, CountTracker(0, 1)))
      } else {
        Iterator.empty
      }
    }

    def mergeMessages = (a: CountTracker, b: CountTracker) => {
      a.merge(b)
    }

    val initialGraph = g.mapVertices((id, _) => CountTracker(0,0))
    val crs = initialGraph.pregel(CountTracker(0,0), activeDirection = EdgeDirection.Both, maxIterations = 1)(
      processMessages, // Vertex Program
      sendMessages,
      mergeMessages
    )

    val orderedCRS = crs.vertices.join(emailListRDD)
      .map(m => {
      val ratio = m._2._1.getRatio
      val total = m._2._1.total
      val mult = if(ratio.isInfinite) Double.NegativeInfinity else ratio * total
      (m._2._2, ratio, math.abs(1 - ratio), mult)
    }).filter(m => m._2 != Double.NegativeInfinity && m._2 != 0D)


    println(orderedCRS.sortBy(m => m._3).collect.mkString("\n"))

    println("------------")

    println(orderedCRS.sortBy(m => m._4, ascending = true).collect.mkString("\n"))
  }
//Add to the exec function
calculateCRS(me, g, emailListRDD)