做者:明風算法
(本文由團隊中梧葦和我一塊兒撰寫,並由團隊中的林嶽,巖岫,世儀等多人Review,發表於程序員的8月刊,因爲篇幅緣由,略做刪減,本文爲完整版)apache
對於網絡科學而言,世間萬物均可以抽象成點,而事物之間的關係均可以抽象成邊,並根據不一樣的應用場景,生成不一樣的網絡,所以整個世界均可以用一個巨大的複雜網絡來表明。有關複雜網絡和圖算法的研究,在最近的十幾年取得了巨大的進展,並在多個領域有重要的應用。編程
做爲最大的電商平臺,淘寶上數億買家和賣家,天天產生數百億的行爲,包括瀏覽,收藏,購買……構成由多個巨型的行爲網絡,一樣是複雜網絡和圖算法一展身手的好場景。可是儘管能夠根據具體的業務場景進行相應的過濾,減小參與運算的頂點和邊,最後生成的圖,每每仍是動輒上億個頂點的巨型圖,在其之上的計算,對於單機版的計算框架來講,是很難完成的任務。緩存
所以,一個強大的分佈式圖計算框架是必不可少的工具。它既要有良好的性能,又要有豐富的功能和運算符,可以在海量的數據上,自如地運行各類複雜的圖算法。在淘寶,咱們在今年開始嘗試使用Spark GraphX,作爲分佈式圖計算的平臺,進行了各類算法嘗試和生產應用,取得了不錯的效果。在此和各位分享一二。網絡
在Spark年幼的時候,0.5版本就已經帶了一個Bagel小模塊,提供了相似Pregel的功能,固然,這個版本還很是的原始,性能和功能都比較弱,屬於實驗型產品。到0.8版本的時候,鑑於業界對分佈式圖計算的需求日益見漲,Spark開始獨立一個分支:Graphx-Branch,作爲獨立的圖計算模塊,借鑑GraphLab,開始設計開發GraphX。在0.9版本中,這個模塊被正式集成到主幹,雖然是alpha版本,可是已經能夠開始進行試用,小麪包圈Bagel告別舞臺。1.0版本,GraphX正式投入生產使用。併發
值得留意的是,GraphX目前依然處於快速發展中,從0.8的分支,到0.9和1.0,每一個版本代碼都有很多的改進和重構,並根據觀察,在沒有改任何代碼邏輯和運行環境,只是升級版本,切換接口和從新編譯的狀況下,每一個版本可以有10-20%的性能提高。雖然和GraphLab的性能還有必定的差距,可是憑藉着Spark總體的一體化流水線處理,社區熱烈的活躍度以及快速改進速度,使得它具備強大的競爭力。app
在正式介紹GraphX以前,先看看通用的分佈式圖計算框架。簡單來講,分佈式圖計算框架的目的,就是將對於巨型圖的各類操做,包裝爲簡單的接口,讓分佈式存儲,並行計算等複雜問題對上層透明。從而使得複雜網絡和圖算法的工程師,能夠更加聚焦在圖相關的模型設計和使用上,而不用關心底層的分佈式細節。爲了實現該目的,須要解決兩個通用的問題。框架
巨型圖的存儲整體上有邊分割和點分割兩種存儲方式。2013年GraphLab2.0推出,將其存儲方式由邊分割變爲點分割,在性能上取得重大提高,目前基本上被業界普遍接受並使用。dom
邊分割(Edge Cut)
每一個頂點都存儲一次,可是有的邊會被打斷,被分到了兩臺機器上。這樣作的好處是節省存儲空間,壞處是對於圖進行基於邊的計算時,對於一條兩個頂點被分到不一樣機器上的邊來講,要跨機器通訊傳輸數據,內網通訊流量大。
點分割(Vertex Cut)
每一個邊都只存儲一次,都只會出如今一臺機器上。鄰居多的點會被複制到多臺機器上,增長存儲開銷,同時會引起數據同步的問題。好處是能夠大幅減小內網通訊量能夠大大下降。
本來兩種方法互有利弊,但如今是點分割佔上風,各類分佈式圖計算框架,都把本身底層的存儲形式變成了點分割。主要緣由有2個:
磁盤的價格降低,存儲空間不是問題了,可是內網的通訊資源沒有突破性進展,集羣計算時內網帶寬是寶貴的,時間比磁盤更珍貴,這點就相似於常見的空間換時間的策略。
在當前的應用場景中,絕大多數網絡都是「無尺度網絡」,遵循冪律分佈,不一樣點的鄰居數量很是懸殊,邊分割會使得那些多鄰居的點所相連的邊大多數都會被分到不一樣的機器上,這樣的數據分佈會使得內網帶寬更加捉襟見肘,因而邊分割的存儲方式就被漸漸拋棄了
目前的圖計算框架,基本上都是遵循BSP計算模式。BSP全稱Bulk Synchronous Parallell,由哈佛大學Leslie Valiant和牛津大學Bill McColl提出。在BSP中,一次計算過程由一系列全局超步組成,每個超步由併發計算,通信, 柵欄同步三個步驟組成。同步完成,標誌着該一個超步的完成,以及下一個超步的開始。
BSP模式很簡潔,基於BSP模式,目前有2種比較成熟的圖計算模型:
Pregel模型——「像頂點同樣思考」
2010年,Google的新的三架馬車Caffeine、Pregel、Dremel發佈。伴隨着Pregel,BSP模型被廣爲人知。聽說Pregel的名字是爲了記念歐拉的七橋問題,那七座橋所在的河流,就是叫Pregel。
Pregel借鑑MapReduce的思想,提出了"像頂點同樣思考(Think Like A Vertex)"的圖計算模式,讓用戶無需考慮並行分佈式計算的細節,只須要實現一個頂點更新函數,讓框架在遍歷頂點時進行調用便可。
常見的代碼模板以下所示:
void Compute(MessageIterator* msgs) {
//遍歷由頂點入邊傳入的消息列表
for (; !msgs->Done(); msgs->Next())
doSomething()
//生成新的頂點值
*MutableVertexValue() = ...
//生成沿頂點出邊發送的消息
SendMessageToAllNeighbors(...);
}
這個模型雖然簡潔,可是很容易發現它的缺陷。對於鄰居數不少的頂點,它須要處理的消息很是龐大,而在這個模式下,它們是沒法被併發處理的。因此對於符合冪律分佈的天然圖,這種計算模型下,很容易發生假死或者崩潰。
GAS模型——鄰居更新模型
相比於Pregel模型的消息通訊範式,GraphLab的GAS模型更偏向共享內存風格。它容許用戶的自定義函數訪問當前頂點的整個鄰域,能夠抽象成Gather,Apply,Scatter這三個階段,常被簡稱爲GAS。相應用戶須要實現的三個獨立的函數:gather、apply和scatter。
常見的代碼模板以下所示:
//從鄰居點和邊收集數據
Message gather(Vertex u, Edge uv, Vertex v) {
Message msg = ...
return msg
}
//彙總函數
Message sum(Message left, Message right) {
return left+right
}
//更新頂點Master
void apply(Vertex u, Message sum) {
u.value = ...
}
//更新鄰邊和鄰居點
void scatter(Vertex u, Edge uv, Vertex v) {
uv.value = ...
if ((|u.delta|>ε) Active(v)
}
因爲gather/scatter函數是以單條邊爲操做粒度,那麼對於一個頂點的衆多鄰邊,能夠分別由相應的worker獨立地調用gather/scatter函數。這一設計主要是爲了適應點分割的圖存儲模式,從而避免Pregel模型會遇到的問題。
在GraphX設計的時候,點分割和GAS都已經成熟了,因此GraphX一開始就站在了巨人的肩膀上,並在設計和編碼中,針對這些問題進行了優化,在功能和性能之間尋找最佳的平衡點。
每一個Spark子模塊,如同Spark自己同樣,都有一個核心的抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,擁有Table和Graph兩種視圖,而只須要一份物理存儲。而兩種視圖都有本身的獨有的操做符,從而得到靈活操做和執行效率。
如同Spark同樣,GraphX的代碼依然很是簡潔。核心的GraphX代碼只有3千多行,而在此之上實現的Pregel模型,只要短短的二十多行。GraphX的代碼結構總體以下:
總體仍是很清晰明瞭,其中大部分的impl包的實現,都是圍繞着Partition而優化和進行。這種某種程度上說明了,點分割的存儲和相應的計算優化,的確是圖計算框架的重點和難點。
GraphX的底層設計有幾個關鍵點
對Graph視圖的全部操做,最終都會被轉換成其關聯的Table視圖的RDD操做來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列RDD的轉換過程。所以,其實Graph最終是具有了的RDD的3個關鍵特性:Immutable,Distributed,Fault-Tolerant。其中最關鍵的是不可變(Immutable)性,全部圖的轉換和操做,邏輯上都是產生了一個新圖,物理上,Graphx會有必定程度的不變頂點和邊的複用優化,對用戶透明。
兩種視圖底層共用的物理數據,由RDD[VertexPartition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式存儲的,而是由VertexPartition/EdgePartition,在內部存儲一個帶索引結構的分片數據塊,以加速不一樣視圖下的遍歷速度。不變的索引結構在RDD轉換過程當中是共用的,下降了計算和存儲開銷。
圖的分佈式存儲採用點分割模式,並且使用partitionBy方法,由用戶指定不一樣的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會緩存本地邊的關聯點的Ghost副本。劃分策略的不一樣會影響到所須要緩存的Ghost副本數量,以及每一個EdgePartition分配的邊的均衡程度,須要根據圖的結構特徵進行選取最佳的Strategy。目前有EdgePartition2d,EdgePartition1d,RandomVertexCut,CanonicalRandomVertexCut這4種策略。目前試驗的結果,在淘寶大部分場景下,EdgePartition2d效果最好。
如同Spark同樣,GraphX的Graph類,提供了豐富的圖運算符,大體結構以下:
具體每一個方法的說明和用法,能夠在官方的GraphX Programming Guide找到每一個函數的詳細說明,就不一一列舉。重點講幾個須要注意的方法:
因爲一個圖,是由3個RDD組成的,因此會佔用更多的內存。相應圖的cache,unpersist和checkpoint,更須要留意使用技巧。出於最大限度的複用邊的理念,GraphX的默認接口,只提供了unpersistVertices的方法,若是要釋放邊,須要本身調用g.edges.unpersist()方法才能釋放,這個給用戶帶來了必定的不便,可是卻給GraphX的優化,提供便利和空間。
參考Graphx的Pregel代碼,對一個大圖,目前最佳的實踐是:
var g=...
var prevG: Graph[VD, ED] = null
while(...){
prevG = g
g = g.(………………)
g.cache()
prevG.unpersistVertices(blocking=false)
prevG.edges.unpersist(blocking=false)
}
大致之意,就是根據GraphX中graph的不變性,對g作了操做並賦回給g以後,g已經不是原來的g了,並且會在下一輪迭代使用,因此必須cache。另外,你必須先用prevG,保留住對原來的圖的引用,並在新圖產生以後,快速的將舊圖完全的釋放掉。不然一個大圖,幾輪迭代下來,就會有內存泄漏的問題,很快耗光做業內存。
mrTriplets的全稱是mapReduceTriplets,它是GraphX中最核心和強大的一個接口。Pregel也基於它而來,因此對它的優化,能很大程度上影響整個GraphX的性能。
mrTriplets運算符的簡化定義是:
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A)
: VertexRDD[A]
它的計算過程以下:
map:應用於每個triplet上,生成一個或者多個消息, 消息以triplet關聯的兩個頂點中的任意一個或兩個爲目標頂點
reduce:應用於每個Vertex上,把發送給每個頂點的消息合併起來
mrTriplets最後返回的是一個VertexRDD[A], 它包含了每個頂點聚合以後的消息(類型爲A), 沒有接收到消息的頂點不會包含在返回的VertexRDD中。
在最近的版本,GraphX針對它進行了以下幾個優化,這些優化,對於Pregel以及全部上層算法工具包的性能,都有着重大的影響。其中包括:
Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets
在不少圖分析算法中,不一樣點的收斂速度變化很大。在迭代的後期,只有不多的點會有更新。所以對於沒有更新的點,下一次mrTriplets計算時EdgeRDD無需更新相應點值的本地緩存,可以大幅下降通訊開銷。
Indexing Active Edges
沒有更新的頂點在下一輪迭代時就不須要向鄰居從新發送消息。所以mrTriplets遍歷邊時,若是一條邊的鄰居點值在上一輪迭代時沒有更新,能夠直接跳過,避免了大量無用的計算和通訊。
Join Elimination
一個triplet是由一條邊和其兩個鄰居點組成的三元組,操做triplet的map函數經常只需訪問其兩個鄰居點值中的一個。例如在PageRank計算中,一個點值的更新只和其源頂點的值有關,而其所指向的目的頂點的值無關。那麼在mrTriplets計算中,就不須要VertexRDD和EdgeRDD的3-way join,而只須要2-way join。
全部的這些優化,都使得GraphX的性能,逐漸逼近GraphLab。雖然還有必定的差距,可是一體化的流水線服務,和豐富的編程接口,能夠彌補性能的稍微差距。
Graphx中的Pregel接口,並不嚴格遵循Pregel的模型,它是一個參考GAS改進的Pregel模型。定義以下:
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
這種基於mrTrilets方法的Pregel模型,和標準的Pregel的最大區別是,它的第2段參數體,接受的是3個函數參數,而不接受messageList。它不會在單個頂點上進行消息遍歷,而是會將頂點的多個ghost副本收到的消息聚合後,發送給master副本,再使用vprog函數來更新點值。消息的接收和發送,都是被自動並行化處理的,無需擔憂超級節點的問題。
常見的代碼模板以下所示:
//更新頂點
vprog(vid: Long, vert: Vertex, msg: Double): Vertex = {
v.score = msg + (1 - ALPHA) * v.weight
}
//發送消息
sendMsg(edgeTriplet: EdgeTriplet[…]): Iterator[(Long, Double)]
(destId, ALPHA * edgeTriplet.srcAttr.score * edgeTriplet.attr.weight)
}
//合併消息
mergeMsg(v1: Double, v2: Double): Double = {
v1+v2
}
能夠看到,GraphX設計這個模型的用意。它綜合了Pregel和GAS二者的優勢,即接口相對簡單,又保證性能,能夠應對點分割的圖存儲模式,勝任符合冪律分佈的天然圖的大型計算。另外值得注意的是,官方的Pregel版本是最簡單的一個版本,對於複雜的業務場景,根據這個版本擴展一個定製的Pregel,是很常見的作法。
GraphX也提供了一套圖算法,方便用戶對圖進行分析。目前最新版本,已經支持PageRank,數三角形,最大連通圖,最短路徑等6種經典的圖算法,這些算法的代碼實現,目的和重點在於通用性。若是要得到最佳性能,能夠參考其實現,進行修改和擴展,能夠知足業務需求。另外研讀這些代碼,也是理解GraphX編程的Best Practice的好方法,建議有興趣深刻研究分佈式圖算法開發的同窗都通讀一遍。
基本上,全部的關係,均可以從圖的角度來看待和處理,可是到底一個關係的價值多大?健康與否?適合用於什麼場景?不少時候是靠運營和產品憑感受來判斷和評估。如何將各類圖的指標精細化,規範化,對於產品和運營的構思進行數據上的預研指導,提供科學決策的依據,是圖譜體檢平臺設計的初衷和出發點。
基於這樣的出發點,藉助GraphX豐富的接口和工具包,針對淘寶內部林林總總的圖業務需求, 咱們開發一個圖譜體檢平臺。目前主要進行下列指標的檢查:
度分佈
度分佈是一個圖最基礎的指標,也是很是重要的一個指標。度分佈檢測的目的,主要是瞭解圖中"超級節點"的個數和規模,以及全部節點度的分佈曲線。超級節點的存在,對各類傳播算法,都會有重大的影響,不管是正面助力仍是反面的阻力,因此要預先對於這些數據量有個預估。藉助GraphX的最基本的圖信息接口:degrees: VertexRDD[Int],包括inDegrees和outDegrees,這個指標能夠輕鬆地計算出來,並進行各類各樣的統計。
二跳鄰居數
對於大部分社交關係來講,只得到一跳的度分佈是遠遠不夠的,另外一個重要的指標是二跳鄰居數。例如祕密App中,好友的好友的祕密,傳播的範圍更廣,信息量更豐富。所以二跳鄰居數的統計,是圖譜體檢中很重要的一個指標。二跳鄰居的計算GraphX沒有給出現成的接口,須要本身設計和開發。目前使用的方法是:
第一次遍歷,全部點往鄰居點傳播一個帶自身Id,生命值爲2的消息
第二次遍歷,全部點將收到的消息,往鄰居點再轉發一次,生命值爲1
最終統計全部點上,接收到的生命值爲1的Id,並進行分組彙總,獲得全部點的二跳鄰居
值得注意的是,進行這個計算以前,須要藉助度分佈,將圖中的超級節點去掉,不歸入二跳鄰居數的計算。不然這些超級節點一來會出如今第一輪傳播後,收到過多的消息而爆掉,二來它們參與計算,會影響和它們有一跳鄰居關係的頂點,致使它們不能獲得真正有效的二跳鄰居數。因此必須先篩選掉。
連通圖
檢測連通圖的目的,是弄清一個圖有幾個連通部分,以及每一個連通部分有多少頂點。這樣能夠將一個大圖分割爲多個小圖,並去掉零碎的連通部分,從而能夠在多個小子圖上,進行更加精細的操做。目前GraphX提供了ConnectedComponents和StronglyConnectedComponents算法,使用它們能夠快速的計算出相應的連通圖。
連通圖能夠進一步演化,變成社區發現算法,而該算法優劣的評判標準之一,是計算模塊的Q值,來查看所謂的modularity狀況。可是GraphX中仍是沒有對於Q值計算的函數,咱們已經實現了一個,後續會將這個實現提交到社區。
更多的指標,例如Triangle Count和K-Core,不管是藉助GraphX已有的函數,仍是本身從頭開發,都陸續在進行中。目前這個圖譜體檢平臺已經初具規模,經過平臺的創建和推廣,圖相關的產品和業務,逐漸走上「無數據,不討論,用指標來預估效果」的數據化運營之路,有效提升溝通效率,爲各類圖相關的業務開發走上科學化和系統化之路作好準備。
在圖譜體檢平臺的基礎上,咱們能夠了解到各類各樣關係的特色。不一樣的關係,都會有本身的強項和弱項,例若有些關係圖譜連通性好些,而有些關係圖譜的社交性好些,因此每每咱們須要使用關係A來豐富關係B。爲此,在圖譜體檢平臺之上,藉助GraphX,咱們開發了一個多圖合併工具,提供相似於圖的並集的概念,能夠快速的對指定的2個不一樣關係圖譜,進行合併,產生一個新的關係圖譜。
以用基於A關係的圖來擴充基於B關係的圖,生成擴充圖C爲例,融合算法基本思路以下:
若圖B中某邊的兩個頂點都在圖A中,則將該邊加入C圖(如BD邊)
若圖B中某邊的一個頂點在圖A中,另一個頂點不在,則將該邊和另外一頂點都加上(如CE邊和E點)
若圖A中某邊的兩個頂點都不在圖B中,則捨棄這條邊和頂點(如EF邊)
使用GraphX的outerJoinVertices等圖運算符,能夠很簡單地完成上述的操做。另外,在考慮圖合併的時候,也能夠考慮給不一樣的圖的邊加上不一樣的權重,綜合考慮點之間的不一樣關係的重要性。新產生的圖,會再進行一輪圖譜體檢,經過先後三個圖各個體檢指標的對比,能夠對於業務上線以後效果有個預估和判斷。若是不符合指望,能夠嘗試從新選擇擴充方案。
加權網絡上的能量傳播是經典的圖模型之一, 可用於用戶信譽度預測。模型的思路是:物以類聚,人以羣分。常和信譽度高的用戶進行交易的,信譽度天然較高,常和信譽度差的用戶有業務來往的,信譽度天然較低。模型不復雜,但淘寶全網有上億的用戶點和幾十億關係邊,要對如此規模的巨型圖進行能量傳播,並對邊的權重進行精細的調節,對圖計算框架的性能和功能都是巨大的考驗。藉助GraphX,咱們在這兩點之間取得了平衡,成功實現了該模型。
流程如圖4,先生成以用戶爲點、買賣關係爲邊的巨型圖initGraph,對選出種子用戶,分別賦予相同的初始正負能量值(TrustRank & BadRank),而後進行兩輪隨機遊走,一輪好種子傳播正能量(tr),一輪壞種子傳播負能量(br),而後正負能量相減獲得finalRank,根據finalRank判斷用戶的好壞。邊的初始傳播強度是0.85,這時AUC很低,須要再給每條邊,帶上一個由多個特徵(交易次數,金額……)組成的組合權重。每一個特徵,都有不一樣的獨立權重和偏移量。經過使用partialDerivativeAUC方法,在訓練集上計算AUC,而後對AUC求偏導,獲得每一個關係維度的獨立權重和偏移量,生成新的權重調節器(WeightAdjustor),對圖上全部邊上的權重更新,而後再進行新一輪大迭代,這樣一直到AUC穩定時,終止計算。
在接近全量的數據上進行3輪大迭代,每輪2+6次Pregel,每次Pregel大約30次小迭代後,最終的AUC從0.6提高到0.9,達到了不錯的用戶預測準確率。訓練時長在6個小時左右,不管在性能仍是準確率上,都超越業務方的指望。
通過半年多的嘗試,對於GraphX能夠勝任的圖計算的規模和性能,目前咱們都已經心中有數。以前一些想作,但由於沒有足夠的計算能力而不能實現的圖模型,現已經不是問題。咱們將會進一步將愈來愈多的圖模型,在GraphX上實現。
這些模型應用於用戶網絡的社區發現、用戶影響力、能量傳播、標籤傳播等,能夠提高用戶粘性和活躍度;而應用到推薦領域的標籤推理,人羣劃分、年齡段預測、商品交易時序跳轉,則能夠提高推薦的豐富度和準確性。複雜網絡和圖計算的天地廣闊無垠,有更多的未知等待咱們去探索和實踐,藉助Spark GraphX,將來咱們能夠迎接更大挑戰。