3. 圖算法

3.1 PageRank排名算法

  3.1.1 算法概述 

      PageRank,即網頁排名,又稱網頁級別、Google 左側排名或佩奇排名算法

      是Google 創始人拉里·佩奇和謝爾蓋·布林於1997 年構建早期的搜索系統原型時提出的鏈 接分析算法,在揉合了諸如Title 標識和Keywords 標識等全部其它因素以後,Google 經過PageRank 來調整結果,使那些更具「等級/重要性」的網頁在搜索結果中另網站排名得到提高,從而提升搜索結果的相關性和質量apache

  3.1.2 從入鏈數量到PageRank  

      PageRank的計算基於如下兩個基本假設:瀏覽器

      數量假設:在Web 圖模型中,若是一個頁面節點接收到的其餘網頁指向的入鏈數量越多,那麼這個頁面越重要。ide

      質量假設:指向頁面A的入鏈質量不一樣,質量高的頁面會經過連接向其餘頁面傳遞更多的權重。因此越是質量高的頁面指向頁面A,則頁面A越重要。網站

      利用以上兩個假設,PageRank算法剛開始賦予每一個網頁相同的重要性得分,經過迭代遞歸計算來更新每一個頁面節點的PageRank得分,直到得分穩定爲止。PageRank計算得出的結果是網頁的重要性評價,這和用戶輸入的查詢是沒有任何關係的,即算法是主題無關的ui

  3.1.3 PageRank算法原理 

      PageRank的計算充分利用了兩個假設:數量假設和質量假設。步驟以下:搜索引擎

      1) 在初始階段:網頁經過連接關係構建起Web圖,每一個頁面設置相同的PageRank值,經過若干輪的計算,會獲得每一個頁面所得到的最終PageRank值。隨着每一輪的計算進行,網頁當前的PageRank值會不斷獲得更新atom

      2) 在一輪中更新頁面PageRank得分的計算方法: 在一輪更新頁面PageRank得分的計算中,每一個頁面將其當前的PageRank值平均分配到本頁面包含的出鏈上,這樣每一個連接即得到了相應的權值。而每一個頁面將全部指向本頁面的入鏈所傳入的權值求和,便可獲得新的PageRank得分。當每一個頁面都得到了更新後的PageRank值,就完成了一輪PageRank計 算spa

  3.1.3.1 基本思想: 

      若是網頁T存在一個指向網頁A的鏈接,則代表T的全部者認爲A比較重要,從而把T的一部分重要性得分賦予A。這個重要性得分值爲:PR(T)/L(T)scala

      其中PR(T)爲T的PageRank值,L(T)爲T的出鏈數

      則A的PageRank值爲一系列相似於T的頁面重要性得分值的累加

      即一個頁面的得票數由全部鏈向它的頁面的重要性來決定,到一個頁面的超連接至關於對該頁投一票。一個頁面的PageRank是由全部鏈向它的頁面(鏈入頁面)的重要性通過遞歸算法獲得的。一個有較多鏈入的頁面會有較高的等級,相反若是一個頁面沒有任何鏈入頁面,那麼它沒有等級

      咱們設向量B爲第1、第二...第 N 個網頁的網頁排名

        page72image39748416

      矩陣A表明網頁之間的權重輸出關係,其中amn表明第m個網頁向第n個網頁的輸出權重

        page72image39762976

      輸出權重計算較爲簡單:假設m一共有10個出鏈,指向n的一共有2個,那麼m向n輸出的權重就爲2/10

      如今問題變爲:A是已知的,咱們要經過計算獲得B。 假設Bi是第i次迭代的結果,那麼

        page72image39434256

      初始假設全部網頁的排名都是 1/N (N爲網頁總數量),即

        page73image39747216

      經過上述迭代計算,最終Bi會收斂,即Bi無限趨近於B,此時B = B × A

  3.1.3.2 具體示例 

      假設有網頁A、B、C、D,它們之間的連接關係以下圖所示

        

      計算B1以下:

        

      不斷迭代,計算結果以下:

第 1 次迭代: 0.125, 0.333, 0.083, 0.458 
第 2 次迭代: 0.042, 0.500, 0.042, 0.417 
第 3 次迭代: 0.021, 0.431, 0.014, 0.535 
第 4 次迭代: 0.007, 0.542, 0.007, 0.444 
第 5 次迭代: 0.003, 0.447, 0.002, 0.547 
第 6 次迭代: 0.001, 0.549, 0.001, 0.449 
第 7 次迭代: 0.001, 0.449, 0.000, 0.550
第 8 次迭代: 0.000, 0.550, 0.000, 0.450 
第 9 次迭代: 0.000, 0.450, 0.000, 0.550
第 10次迭代: 0.000, 0.550, 0.000, 0.450

      咱們能夠發現,A和C的權重變爲0,而B和D的權重也趨於在0.5附近擺動。從圖中也能夠觀察出:A和C之間有互相連接,但它們又把權重輸出給了B和D,而B和D 之間互相連接,並不向A或C輸出任何權重,因此長此以往權重就都轉移到B和D了

  3.1.3.3 PageRank的改進 

    上面是最簡單正常的狀況,考慮一下兩種特殊狀況:

      

    第一種狀況是,B存在導向本身的連接,迭代計算過程是:

第 1 次迭代: 0.125, 0.583, 0.083, 0.208
第 2 次迭代: 0.042, 0.833, 0.042, 0.083 
第 3 次迭代: 0.021, 0.931, 0.014, 0.035 
第 4 次迭代: 0.007, 0.972, 0.007, 0.014 
第 5 次迭代: 0.003, 0.988, 0.002, 0.006 
第 6 次迭代: 0.001, 0.995, 0.001, 0.002 
第 7 次迭代: 0.001, 0.998, 0.000, 0.001 
第 8 次迭代: 0.000, 0.999, 0.000, 0.000 
第 9 次迭代: 0.000, 1.000, 0.000, 0.000 
第10次迭代: 0.000, 1.000, 0.000, 0.000 
... ...

      咱們發現最終B權重變爲1,其它全部網頁的權重都變爲了0

      第二種狀況是B是孤立於其它網頁的,既沒有入鏈也沒有出鏈,迭代計算過程是:

第 1 次迭代: 0.125, 0.000, 0.125, 0.250 
第 2 次迭代: 0.063, 0.000, 0.063, 0.125 
第 3 次迭代: 0.031, 0.000, 0.031, 0.063 
第 4 次迭代: 0.016, 0.000, 0.016, 0.031 
第 5 次迭代: 0.008, 0.000, 0.008, 0.016 
第 6 次迭代: 0.004, 0.000, 0.004, 0.008 
第 7 次迭代: 0.002, 0.000, 0.002, 0.004
第 8 次迭代: 0.001, 0.000, 0.001, 0.002 
第 9 次迭代: 0.000, 0.000, 0.000, 0.001 
第10次迭代: 0.000, 0.000, 0.000, 0.000 
... ...

      咱們發現全部網頁權重都變爲了 0

      出現這種狀況是由於上面的數學模型出現了問題,該模型認爲上網者從一個網頁瀏覽下一個網頁都是經過頁面的超連接。想象一下正常的上網情景,其實咱們在看完一個網頁後,可能直接在瀏覽器輸入一個網址,而不經過上一個頁面的超連接
      咱們假設每一個網頁被用戶經過直接訪問方式的機率是相等的,即1/N,N 爲網頁

      總數,設矩陣e 以下:

        page75image39817280

      設用戶經過頁面超連接瀏覽下一網頁的機率爲 α,則直接訪問的方式瀏覽下一個網 頁的機率爲 1 - α,改進上一節的迭代公式爲:

        page75image39822064

      一般狀況下設 α 爲 0.8,上一節」具體示例」的計算變爲以下:

        

      迭代過程以下:

第 1 次迭代: 0.150, 0.317, 0.117, 0.417 
第 2 次迭代: 0.097, 0.423, 0.090, 0.390 
第 3 次迭代: 0.086, 0.388, 0.076, 0.450
第 4 次迭代: 0.080, 0.433, 0.073, 0.413 
第 5 次迭代: 0.079, 0.402, 0.071, 0.447 
第 6 次迭代: 0.079, 0.429, 0.071, 0.421 
第 7 次迭代: 0.078, 0.408, 0.071, 0.443 
第 8 次迭代: 0.078, 0.425, 0.071, 0.426 
第 9 次迭代: 0.078, 0.412, 0.071, 0.439 
第10次迭代: 0.078, 0.422, 0.071, 0.428 
第11次迭代: 0.078, 0.414, 0.071, 0.437 
第12次迭代: 0.078, 0.421, 0.071, 0.430 
第13次迭代: 0.078, 0.415, 0.071, 0.436 
第14次迭代: 0.078, 0.419, 0.071, 0.431 
第15次迭代: 0.078, 0.416, 0.071, 0.435 
第16次迭代: 0.078, 0.419, 0.071, 0.432 
第17次迭代: 0.078, 0.416, 0.071, 0.434 
第18次迭代: 0.078, 0.418, 0.071, 0.432 
第19次迭代: 0.078, 0.417, 0.071, 0.434
第20次迭代: 0.078, 0.418, 0.071, 0.433 
... ...

  3.1.3.4 修正PageRank計算公式:  

      因爲存在一些出鏈爲0,也就是那些不連接任何其餘網頁的網,也稱爲孤立網頁,使得不少網頁能被訪問到。所以須要對PageRank 公式進行修正,即在簡單公式的基礎上增長了阻尼係數(damping factor)q,q 通常取值q=0.85

      其意義是,在任意時刻,用戶到達某頁面後並繼續向後瀏覽的機率。1- q= 0.15 就是用戶中止點擊,隨機跳到新URL 的機率)的算法被用到了全部頁面上,估算頁面可能被上網者放入書籤的機率。

      最後,即全部這些被換算爲一個百分比再乘上一個係數q。因爲下面的算法,沒有頁面的PageRank 會是0。因此,Google 經過數學系統給了每一個頁面一個最小值

        page77image39728128

      這個公式就是.S Brin 和L. Page 在《The Anatomy of a Large- scale Hypertextual Web Search Engine Computer Networks and ISDN Systems 》定義的公式。

      因此一個頁面的PageRank 是由其餘頁面的PageRank 計算獲得。Google 不斷的重複計算每一個頁面的PageRank。若是給每一個頁面一個隨機PageRank 值(非0),那麼通過不斷的重複計算,這些頁面的PR 值會趨向於正常和穩定。這就是搜索引擎使用它的緣由

      首先求完整的公式:

      Arvind Arasu 在《Junghoo Cho Hector Garcia - Molina, Andreas Paepcke, Sriram Raghavan. Searching the Web》 更加準確的表達爲:

        page77image39730416

      page77image39841984是被研究的頁面,page77image39729376是鏈入頁面的數量,page77image39726256是鏈出頁面的數量,而N是全部頁面的數量

      PageRank值是一個特殊矩陣中的特徵向量。這個特徵向量爲:

        page78image39745552

      R是以下等式的一個解:

        page78image39746176

      若是網頁i有指向網頁j的一個連接,則

        page78image39744512

      不然 page78image39746384=0

  3.1.4 Spark GraphX實現 

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

3.2 廣度優先遍歷(參考)  

val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
val vprog = { (id: VertexId, attr: Double, msg: Double) => math.min(attr,msg) }
val sendMessage = { (triplet: EdgeTriplet[Double, Int]) =>
  var iter:Iterator[(VertexId, Double)] = Iterator.empty
  val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
  val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
  if(!(isSrcMarked && isDstMarked)){
    if(isSrcMarked){
      iter = Iterator((triplet.dstId,triplet.srcAttr+1))
    }else{
      iter = Iterator((triplet.srcId,triplet.dstAttr+1))
    }
  }
  iter
}
val reduceMessage = { (a: Double, b: Double) => math.min(a,b) }
val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)(vprog, sendMessage, reduceMessage) 
println(bfs.vertices.collect.mkString("\n"))

3.3 單源最短路徑(參考)  

import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
  * Computes shortest paths to the given set of landmark vertices, returning a
graph where each
  * vertex attribute is a map containing the shortest-path distance to each
reachable landmark. */
object ShortestPaths {
  /** Stores a map from the vertex id of a landmark to the distance to that
landmark. */
  type SPMap = Map[VertexId, Int]
  private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
  private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = (spmap1.keySet ++ spmap2.keySet).map {
    k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
  }.toMap
/**
  * Computes shortest paths to the given set of landmark vertices. *
  * @tparam ED the edge attribute type (not used in the computation) *
  * @param graph the graph for which to compute the shortest paths
  * @param landmarks the list of landmark vertex ids. Shortest paths will be
computed to each
  * landmark.
  *
  * @return a graph where each vertex attribute is a map containing the
shortest-path distance to
  * each reachable landmark vertex.
  */
  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]):
  Graph[SPMap, ED] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() 
    }
    val initialMessage = makeMap()
    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { addMaps(attr, msg)
    }
    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }
    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}

3.4 連通圖(參考)  

import scala.reflect.ClassTag
import org.apache.spark.graphx._
/** Connected components algorithm. */
object ConnectedComponents {
  /**
  * Compute the connected component membership of each vertex and return a graph with the vertex
  * value containing the lowest vertex id in the connected component containing that vertex.
  * @tparam VD the vertex attribute type (discarded in the computation)
  * @tparam ED the edge attribute type (preserved in the computation)
  * @param graph the graph for which to compute the connected components
  * @param maxIterations the maximum number of iterations to run for
  * @return a graph with vertex attributes containing the smallest vertex in each connected component
  */

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
                                      maxIterations: Int): Graph[VertexId, ED] = {
    require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
      s" but got ${maxIterations}")
    val ccGraph = graph.mapVertices { case (vid, _) => vid }
    def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
      if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr))
      } else if (edge.srcAttr > edge.dstAttr) { Iterator((edge.srcId, edge.dstAttr))
      } else { Iterator.empty
      }
    }
    val initialMessage = Long.MaxValue
    val pregelGraph = Pregel(ccGraph, initialMessage,
      maxIterations, EdgeDirection.Either)(
      vprog = (id, attr, msg) => math.min(attr, msg),
      sendMsg = sendMessage,
      mergeMsg = (a, b) => math.min(a, b))
    ccGraph.unpersist()
    pregelGraph
  } // end of connectedComponents
/**
  * Compute the connected component membership of each vertex and return a
graph with the vertex
  * value containing the lowest vertex id in the connected component
containing that vertex.
  *
  * @tparam VD the vertex attribute type (discarded in the computation) 
  * @tparam ED the edge attribute type (preserved in the computation)
  * @param graph the graph for which to compute the connected components
  * @return a graph with vertex attributes containing the smallest vertex in eac connected component
  */
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
  run(graph, Int.MaxValue) }
}

3.5 三角計數(參考)  

import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
  * Compute the number of triangles passing through each vertex.
  *
  * The algorithm is relatively straightforward and can be computed in three
steps:
  *
  * <ul>
  * <li> Compute the set of neighbors for each vertex.</li>
  * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
  * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
  * </ul>
  *
  * There are two implementations. The default `TriangleCount.run`
implementation first removes
  * self cycles and canonicalizes the graph to ensure that the following
conditions hold:
  *<ul>
  * <li> There are no self edges </li>
  * <li> All edges are oriented src > dst </li>
  * <li> There are no duplicate edges </li>
  * </ul>
  * However, the canonicalization procedure is costly as it requires repartitioning the graph.
  * If the input data is already in "canonical form" with self cycles removed then the
  * `TriangleCount.runPreCanonicalized` should be used instead.
  * {{{
  * val canonicalGraph = graph.mapEdges(e =>
1).removeSelfEdges().canonicalizeEdges()
  * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
  * }}}
  *
  */
object TriangleCount {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Transform the edge data something cheap to shuffle and then canonicalize
    val canonicalGraph = graph.mapEdges(e =>
    true).removeSelfEdges().convertToCanonicalEdges()
    // Get the triangle counts
    val counters = runPreCanonicalized(canonicalGraph).vertices
    // Join them bath with the original graph
    graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
    optCounter.getOrElse(0)
    }
  }

  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Construct set representations of the neighborhoods
    val nbrSets: VertexRDD[VertexSet] =
      graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
        val set = new VertexSet(nbrs.length)
        var i = 0
        while (i < nbrs.length) {
          // prevent self cycle
          if (nbrs(i) != vid) {
            set.add(nbrs(i))
          }
          i += 1
        }
        set
      }
    // join the sets with the graph
    val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
      (vid, _, optSet) => optSet.getOrElse(null)
    }

    // Edge function computes intersection of smaller vertex with larger vertex
    def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
      val (smallSet, largeSet) =
        if (ctx.srcAttr.size < ctx.dstAttr.size) {
          (ctx.srcAttr, ctx.dstAttr)
        } else {
          (ctx.dstAttr, ctx.srcAttr)
        }
      val iter = smallSet.iterator
      var counter: Int = 0
      while (iter.hasNext) {
        val vid = iter.next()
        if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
          counter += 1
        }
      }
      ctx.sendToSrc(counter)
      ctx.sendToDst(counter)
    }

    // compute the intersection along edges
    val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
    // Merge counters with the graph and divide by two since each triangle is counted twice
    graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
      val dblCount = optCounter.getOrElse(0)
      // This algorithm double counts each triangle so the final count should be even
      require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
      dblCount / 2
    } 
  }
}
相關文章
相關標籤/搜索