使用GraphFrames進行飛通常的圖計算

GraphFrame是將Spark中的Graph算法統一到DataFrame接口的Graph操做接口。支持多種語言,能夠經過Python使用。html

本博客包括 On-Time Flight Performance with GraphFrames notebook 的完整內容,其中包括一些擴展功能,您能夠經過 Databricks Community Edition免費試用(加入 beta waitlist)git

Graphframes是開源項目,資源以下:github


介紹

圖結構是一個解決不少數據問題的直觀的方法。不管是遍歷社會網絡,餐館推薦,或者是飛行路徑,均可以經過圖結構的上下文來快速地理解所面臨的問題: 頂點(Vertices)、邊(edges)和屬性(properties)。 例如,飛行數據的分析是一個經典的圖論問題,機場用 vertices表明,飛行路線用 edges 來表明。同時,這裏有不少屬性與飛行路線有關,好比離港延誤、飛機的類型和裝載能力等等。算法

在這篇文章中,咱們使用 GraphFrames (參見最近的介紹: Introducing GraphFrames) 經過Databricks notebooks 進行快速而簡便的飛行數據分析,這個數據以graph的結構進行組織。shell

由於咱們在使用 graph structures, 咱們能夠簡單地提出幾個在表格數據結構下不是那麼直觀看見的問題,好比:structural motifs, airport ranking(使用 PageRank),城市之間的最短路徑等等。GraphFrames提高了DataFrame API的分佈式計算和表達的能力,簡化了Spark SQL engine的查詢而且提高了性能。除此以外,GraphFrames所帶來的圖論分析能力能夠用於 Python、Scala和Java等多種語言環境。c#

安裝 GraphFrames Spark軟件包

爲了使用 GraphFrames, 你須要首先安裝 GraphFrames Spark Packages。在Databricks中安裝軟件包是一個簡單的過程( 參見: few simple steps )(join the beta waitlist here  to try for yourself).網絡

注意, 爲了在spark-shell, pyspark, or spark-submit引用GraphFrames,需按下面的方法啓動Spark的環境:數據結構

$SPARK_HOME/bin/spark-shell --packages graphframes:graphframes:0.1.0-spark1.6

 

準備 Flight Datasets

組成airports的圖數據集(vertices)的兩個部分在這裏: OpenFlights Airport, airline 和 route data ,departuredelays dataset (edges) 在  Airline On-Time Performance and Causes of Flight Delays: On_Time Data分佈式

在安裝GraphFrames Spark軟件包後(參考 GraphFrames Spark Package), 您能夠import 建立vertices, edges, 和 GraphFrame (在 PySpark中) 以下所示:ide

1

2

3

4

5

6

7

8

9

# Import graphframes (from Spark-Packages)

from     graphframes     import     *

# Create Vertices (airports) and Edges (flights)

tripVertices=airports.withColumnRenamed("IATA","id").distinct()

tripEdges=departureDelays.select("tripid","delay","src","dst","city_dst"  ,"state_dst")

# This GraphFrame builds upon the vertices and edges based on our trips (flights)

tripGraph=GraphFrame(tripVertices, tripEdges)

例如, tripEdges包含的飛行數據有出發地的 IATA airport code (src) 和目的地IATA airport code (dst), city (city_dst),  state (state_dst) 以及departure delays (delay)。

tripEdges

在tripGraph上簡單查詢

如今你已經建立本身的 tripGraph GraphFrame, 能夠執行幾個鍵大的查詢,來快速地遍歷和理解你的GraphFrame數據。例如, 爲了瞭解GraphFrame中的機場和路線信息, 運行下面的 PySpark代碼。

1

2

print("Airports: %d" % tripGraph.vertices.count())

print("Trips: %d"     %     tripGraph.edges.count())

將返回輸出:

Airports: 279
Trips: 1361141

由於GraphFrames 是基於Spark中的DataFrame的Graphs數據結構, 您能夠編寫和使用DataFrame API的高級和複雜的查詢表達式。  例如, 下面的查詢容許咱們在flights (edges)過濾出從 SFO airport出發的 delayed flights (delay > 0)。這裏還能夠計算和排序平均 delay的時間, 能夠回答這些問題:從SFO出發的那些航班有顯著的延誤?

2

3

4

5

tripGraph.edges\

       .filter("src='SFO' and delay > 0")\

       .groupBy("src","dst")\

       .avg("delay")\

       .sort(desc("avg(delay)"))

查看輸出結果, 您能夠快速發如今本數據集中從SFO出發的顯著高於平均延誤水平的機場: Will Rogers World Airport (OKC), Jackson Hole (JAC), 和 Colorado Springs (COS) 。

SFO-significant-delays

經過 Databricks notebooks, 咱們能夠快速進行地圖上的可視化: 從SEA 出發的航班到那些州是有顯著的延誤的(高於正常值)?

SEA-delays-by-state-map

 

使用Motif finding理解飛行延誤

爲了更容易地理解城市機場和航線之間的複雜關係, 咱們使用Motif進一步挖掘機場airports (i.e. vertices)和航線flights (i.e. edges)之間的關係 . DataFrame的結果中column names經過motif keys給出。

例如, 提出問題 What delays might we blame on SFO?, 您能夠建立出簡化的motif,以下。

1

2

3

motifs =  tripGraphPrime.find("(a)-[ab]->(b); (b)-[bc]->(c)"     )\

.filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000")

display(motifs)

與 SFO 鏈接的城市 (b), 咱們看到全部的航線 [ab] 從origin city (a) 鏈接到SFO (b) 優先於飛行 [bc] 到其餘目的地城市 (c). 咱們過濾出航線 ([ab] or [bc]) 超過500分鐘而且第二航線(bc)在第一次飛行後大概一天內出現。

下面是一個從查詢中節略的子集,列分別是對應的motif keys。

a ab b bc c
Houston (IAH) IAH -> SFO (-4)
[1011126]
San Francisco (SFO) SFO -> JFK (536)
[1021507]
New York (JFK)
Tuscon (TUS) TUS -> SFO (-5)
[1011126]
San Francisco (SFO) SFO -> JFK (536)
[1021507]
New York (JFK)

經過這個motif finding查詢, 咱們快速肯定了that passengers in this dataset left Houston and Tuscon for San Francisco on time or a little early [1011126].  But for any of those passengers that were flying to New York through this connecting flight in SFO [1021507], they were delayed by 536 minutes.

使用PageRank發現最重要的機場

由於GraphFrames創建在GraphX之上, 這裏有幾個內置的算法咱們能夠當即利用這個優點。 PageRank在 Google Search Engine 中普遍使用,由 Larry Page建立。搜索Wikipedia的解釋:

PageRank 的工做原理是對到頁面的鏈接的數量和質量進行計數, 從而估計該頁面的重要性。 缺省的假定是:越是重要的網站接收到的其它網站的連接就越多。

雖然上面的例子是關於網頁的,但這一極好的理念能夠用於任何圖結構,而不論是來自網頁、, 自行車站點, 或機場 airports,而且這一界面很是簡單,就像調用一個方法同樣。 您可能注意到,GraphFrames將返回 PageRank 結果,做爲新的column追加到vertices DataFrame,在運行這個算法後簡單地繼續咱們的分析。

在數據集中,這裏有大量的不一樣機場的飛行和連接數量,咱們使用 PageRank 算法在Spark中遞歸地遍歷graph數據結構,計算出機場有多重要的一個估計值。

1

2

3

4

# Determining Airport ranking of importance using pageRank

ranks = tripGraph.pageRank(resetProbability = 0.15, maxIter =  5 )

display(ranks.vertices.orderBy(\

    ranks.vertices.pagerank.desc()).limit(20  ))

下面的圖表顯示,經過PageRank算法,Atlanta能夠考慮爲是最爲重要的機場,這是基於不一樣 vertices (i.e. airports)的connections (i.e. flights)質量做出的推斷 ; 與相應的事實是比較符合的(參見 Atlanta is the busiest airport in the world by passenger traffic)。

airport-ranking-pagerank-id

肯定flight connections

不一樣城市之間有多個航班,您可使用 GraphFrames.bfs (Breadth First Search,廣度優先搜索) 方法去找到兩個城市間的最短路徑。下面的查詢嘗試發現San Francisco (SFO) 和 Buffalo (BUF) 爲1的最大路徑長度 (i.e direct flight)。 結果集爲空 (i.e. no direct flights between SFO and BUF).

1

2

3

4

5

filteredPaths =  tripGraph.bfs(

       fromExpr = "id = 'SFO'" , toExpr =  "id = 'BUF'",

       maxPathLength = 1 )

display(filteredPaths)

所以擴展查詢爲 maxPathLength = 2, 有一個以上連接的 flight(在SFO和BUF)。

1

2

3

4

5

filteredPaths=tripGraph.bfs(

       fromExpr= "id = 'SFO'",

       toExpr   = "id = 'BUF'",

       maxPathLength= 2)

display(filteredPaths)

從SFO 到 BUF 的結果集表格簡略以下。

from v1 to
SFO MSP (Minneapolis) BUF
SFO EWR (Newark) BUF
SFO JFK (New York) BUF
SFO ORD (Chicago) BUF
SFO ATL (Atlanta) BUF
SFO LAS (Las Vegas) BUF
SFO BOS (Boston) BUF

使用D3可視化飛行路線

爲了實現一個功能強大的航線和連接的可視化效果,咱們利用Databricks notebook在 Airports D3 visualization 中的方法。經過連接GraphFrames, DataFrames, 以及 D3 可視化工具, 咱們可視化顯示全部的飛行連接,以下所示。藍色圓圈表明vertices (i.e. airports),圓圈的大小表明 邊的數量 (i.e. flights) ,即進出港的航線。黑線是邊 (i.e. flights) 以及相應到定點 (i.e. airports)的鏈接.  注意,有一些邊到了屏幕外面, 表明是到Hawaii 和 Alaska的頂點 (i.e. airports) 。

airports-d3-m

 

下一步: 本身試一試

你能夠看到完整的代碼: On-Time Flight Performance with GraphFrames notebook ,其中包括更多的擴展例子。你能夠 import 這個 notebook文件到您的 Databricks 帳戶中。執行 notebook 能夠採用這一些步驟: simple few steps

Graphframes是開源項目,更深刻的應用參考以下資源:

Graphframes的源碼工程:https://github.com/graphframes/graphframes

Graphframes的文檔工程:http://graphframes.github.io/user-guide.html

相關文章
相關標籤/搜索