在大數據處理場景中,多表Join是很是常見的一類運算。可是對於分佈式系統來講,這是個很大的麻煩,因爲數據分佈在各個節點上,在作join操做以前必須先要shuffle,這會致使巨大的網絡傳輸IO,致使速度很慢。算法
下面,介紹一種map-side-join,該類join使用場景是一個大表和一個小表的鏈接操做,其中,「小表」是指文件足夠小,能夠加載到內存中。該算法能夠將join算子執行在Map端,無需經歷shuffle和reduce等階段,所以效率很是高。網絡
下面給出實例代碼分佈式
// Fact table val flights = sc.parallelize(List( ("SEA", "JFK", "DL", "418", "7:00"), ("SFO", "LAX", "AA", "1250", "7:05"), ("SFO", "JFK", "VX", "12", "7:05"), ("JFK", "LAX", "DL", "424", "7:10"), ("LAX", "SEA", "DL", "5737", "7:10"))) // Dimension table val airports = sc.parallelize(List( ("JFK", "John F. Kennedy International Airport", "New York", "NY"), ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"), ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"), ("SFO", "San Francisco International Airport", "San Francisco", "CA"))) // Dimension table val airlines = sc.parallelize(List( ("AA", "American Airlines"), ("DL", "Delta Airlines"), ("VX", "Virgin America")))
須要把三個表join成以下格式:ide
Seattle New York Delta Airlines 418 7:00
San Francisco Los Angeles American Airlines 1250 7:05
San Francisco New York Virgin America 12 7:05
New York Los Angeles Delta Airlines 424 7:10
Los Angeles Seattle Delta Airlines 5737 7:10大數據
其中fact表是很是巨大的,而兩個dimension表比較小,咱們能夠把小表加載到內存中spa
val airportsMap = sc.broadcast(airports.map{case(a, b, c, d) => (a, c)}.collectAsMap) val airlinesMap = sc.broadcast(airlines.collectAsMap)
下面是map-side-join:
scala
flights.map{case(a, b, c, d, e) => (airportsMap.value.get(a).get, airportsMap.value.get(b).get, airlinesMap.value.get(c).get, d, e)}.collect
運行結果的部分展現:
code
res: Array[(String, String, String, String, String)] = Array(
內存
(Seattle, New York, Delta Airlines, 418, 7:00),
(San Francisco, Los Angeles, American Airlines, 1250, 7:05),
(San Francisco, New York, Virgin America, 12, 7:05),
(New York, Los Angeles, Delta Airlines, 424, 7:10),
(Los Angeles, Seattle, Delta Airlines, 5737, 7:10))