這篇文章翻譯自 http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html。以前翻譯過關於 cascalog 的文章(Cascalog 入門(1),Cascalog 入門(2))。Cascalog 是基於 Cascading,PigPen 是基於 Apache Pig,二者是比較類似的東西。如下進入正文:html
咱們今天很高興向全世界發佈了 PigPen,這是一個爲 Clojure 準備的 Map-Reduce,它會最終被編譯成 Apache Pig,可是你並不須要很是瞭解 Pig。java
注意:若是你對 Clojure 不是很熟悉,咱們強烈推薦你試下這裏,這裏 或者 這裏 的教程來了解一些 基礎。git
<!--more-->github
若是你會 Clojure,你就已經會 PigPen 了web
PigPen 的主要目標是要把語言帶出等式的行列。PigPen 的操做符設計的和 Clojure 裏儘量的類似,沒有特殊的用戶自定義函數(UDFs)。只須要定義函數(匿名的或者命名的),而後你就能像在 Clojure 程序裏同樣使用它們。express
這裏有個經常使用的 word count 的例子:apache
(require '[pigpen.core :as pig]) (defn word-count [lines] (->> lines (pig/mapcat #(-> % first (clojure.string/lower-case) (clojure.string/replace #"[^\w\s]" "") (clojure.string/split #"\s+"))) (pig/group-by identity) (pig/map (fn [[word occurrences]] [word (count occurrences)]))))
這段代碼定義了一個函數,這個函數返回一個 PigPen 的查詢表達式。這個查詢接受一系列的行做爲輸入,返回每一個單詞出現的次數。你能夠看到這只是一個 word count 的邏輯,並無設計到一些外部的東西,好比數據從哪裏來的,會產生哪些輸出。編程
固然。PigPen 的查詢是寫成函數的組合——數據輸入、輸出。只須要寫一次,不須要處處複製、粘貼。數據結構
如今咱們利用以上定義的 word-count 函數,加上 load 和 store 命令,組成一個 PigPen 的查詢:閉包
(defn word-count-query [input output] (->> (pig/load-tsv input) (word-count) (pig/store-tsv output)))
這個函數返回查詢的 PigPen 表示,他本身不會作什麼,咱們須要從本地執行它,或者生成一個腳本(以後會講)。
利用 PigPen,你能夠 mock 輸入數據來爲你的查詢寫單元測試。不再須要交叉着手指想象提交到 cluster 上後會發生什麼,也不須要截出部分文件來測試輸入輸出。
Mock 數據真的很容易,經過 pig/return 和 pig/constantly,你能夠在你的腳本里注入任意的數據做爲起始點。
一個經常使用的模式是利用 pig/take 來從實際數據源中抽樣出幾行,用 pig/return 把結果包一層,就獲得了 mock 數據。
(use 'clojure.test) (deftest test-word-count (let [data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])] (is (= (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]))))
pig/dump 操做符會在本地執行查詢。
向你的查詢傳參數很麻煩,全部函數範圍內的變量或者 let 的綁定在函數裏均可用。
(defn reusable-fn [lower-bound data] (let [upper-bound (+ lower-bound 10)] (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))
注意 lower-bound 和 upper-bound 在生成腳本的時候就有了,在 cluster 上執行函數的時候也能使用。
只要告訴 PigPen 哪裏會把一個查詢寫成一個 Pig 腳本:
(pig/write-script "word-count.pig" (word-count-query "input.tsv" "output.tsv"))
這樣你就能獲得一個能夠提交到 cluster 上運行的 Pig 腳本。這個腳本會用到 pigpen.jar,這是一個加入全部依賴的 uberjar,因此要保證這個 jar 也一塊兒被提交了。還能夠把你的整個 project 打包成一個 uberjar 而後提交,提交以前記得先重命名。怎麼打包成 uberjar 請參照教程。
以前看到,咱們能夠用 pig/dump 來本地運行查詢,返回 Clojure 數據:
=> (def data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])) #'pigpen-demo/data => (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]
若是你如今就像開始,請參照 getting started & tutorials。
Map-Reduce 對於處理單臺機器搞不定的數據是頗有用,有了 PigPen,你能夠像在本地處理數據同樣處理海量數據。Map-Reduce 經過把數據分散到可能成千上萬的集羣節點來達到這一目的,這些節點每一個都會處理少許的數據,全部的處理都是並行的,這樣完成一個任務就比單臺機器快得多。像 join 和 group 這樣的操做,須要多個節點數據集的協調,這種狀況會經過公共的 join key 把數據分到同一個分區計算,join key 的同一個值會送到同一個指定的機器。一旦機器上獲得了全部可能的值,就能作 join 的操做或者作其餘有意思的事。
想看看 PigPen 怎麼作 join 的話,就來看看 pig/cogroup 吧。cogroup 接受任意數量的數據集而後根據一個共同的 key 來分組。假設咱們有這樣的數據:
foo: {:id 1, :a "abc"} {:id 1, :a "def"} {:id 2, :a "abc"} bar: [1 42] [2 37] [2 3.14] baz: {:my_id "1", :c [1 2 3]]}
若是想要根據 id 分組,能夠這樣:
(pig/cogroup (foo by :id) (bar by first) (baz by #(-> % :my_id Long/valueOf)) (fn [id foos bars bazs] ...))
前三個參數是要 join 的數據集,每個都會指定一個函數來從數據源中選出 key。最後的一個參數是一個函數,用來把分組結果結合起來。在咱們的例子中,這個函數會被調用兩次:
[1 ({:id 1, :a "abc"}, {:id 1, :a "def"}) ([1 42]) ({:my_id "1", :c [1 2 3]]})] [2 ({:id 2, :a "abc"}) ([2 37] [2 3.14]) ()]
這把全部 id 爲 1 的值和 id 爲 2 的值結合在了一塊兒。不一樣的鍵值被獨立的分配到不一樣的機器。默認狀況下,key 能夠不在數據源中出現,可是有選項能夠指定必須出現。
Hadoop 提供了底層的接口作 map-reduce job,但即使如此仍是有限制的,即一次只會運行一輪 map-reduce,沒有數據流和複雜查詢的概念。Pig 在 Hadoop 上抽象出一層,但到目前爲止,它仍舊只是一門腳本語言,你仍是須要用 UDF 來對數據作一些有意思的事情。PigPen 更進一步的作了抽象,把 map-reduce 作成了一門語言。
若是你剛接觸 map-reduce,咱們推薦你看下這裏。
注意:PigPen 不是 一個 Clojure 對 Pig 腳本的封裝,頗有可能產生的腳本是人看不懂的。
PigPen 設計的和 Clojure 儘量保持一致。Map-Reduce 是函數式編程,那爲何不利用一門已存在的強大的函數式編程語言呢?這樣不光學習曲線低,並且大多數概念也能更容易的應用到大數據上。
在 PigPen 中,查詢被當作 expression tree 處理,每一個操做符都被表示須要的行爲信息的 map,這些 map 能夠嵌套在一塊兒組成一個複雜查詢的樹形表式。每一個命令包含了指向祖命令的引用。在執行的時候,查詢樹會被轉化成一個有向無環的查詢圖。這能夠很容易的合併重複的命令,優化相關命令的順序,而且能夠利用 debug 信息調試查詢。
去重 當咱們把查詢表示成操做圖的時候,去重是一件很麻煩的事。Clojure 提供了值相等的操做,即若是連個對象的內容相同,它們就相等。若是兩個操做有相同的表示,那它們徹底相同,因此在寫查詢的時候不用擔憂重複的命令,它們在執行以前都會被優化。
舉個例子,假設咱們有這樣兩個查詢:
(let [even-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter even?) (pig/store-clj "even-squares.clj")) odd-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter odd?) (pig/store-clj "odd-squares.clj"))] (pig/script even-squares odd-squares))
在這個查詢中,咱們從一個文件加載數據,計算每一個數的平方,而後分紅偶數和奇數,操做圖看起來是這樣: 在此輸入圖片描述
這符合咱們的查詢,可是作了不少額外的工做。咱們加載了 input.clj
兩次,全部數的平方也都計算了兩次。這看上去可能沒有不少工做,可是當你對不少數據作這樣的事情,簡單的操做累加起來就不少。爲了優化這個查詢,咱們能夠找出相同的操做。看第一眼發現咱們計算平方的操做多是一個候選,可是他們有不一樣的父節點,所以不能把他們合併在一塊兒。可是咱們能夠把加載函數合併,由於他們沒有父節點,並且他們加載相同的文件。
如今咱們的圖看起來是這樣:
如今咱們值加載一次數據,這會省一些時間,但仍是要計算兩次平方。由於咱們如今只有一個加載的命令,咱們的 map 操做如今相同,能夠合併:
這樣咱們就獲得了一個優化過的查詢,每一個操做都是惟一的。由於咱們每次只會合併一個命令,咱們不會修改查詢的邏輯。你能夠很容易的生成查詢,而不用擔憂重複的執行,PigPen 對重複的部分只會執行一次。
序列化 當咱們用 Clojure 處理完數據之後,數據必須序列化成二進制字節,Pig 才能在集羣的機器間傳數據。這對 PigPen 是一個很昂貴可是必須的過程。幸運的是一個腳本中常常有不少連續的操做能夠合成一個操做,這對於沒必要要的序列化和反序列化節省了不少時間。例如,任意連續的 map,filter 和 mapcat 操做均可以被重寫成一個單獨的 mapcat 操做。
咱們經過一些例子來講明:
在這個例子中,咱們從一個序列化的值(藍色)4開始,對它反序列化(橙色),執行咱們的 map 函數,而後再把它序列化。
如今咱們來試一個稍微複雜一點的(更現實的)例子。在這個例子中,咱們執行一個 map,一個 mapcat 和一個 filter 函數。
若是你之前沒用過 mapcat,我能夠告訴你這是對一個值運行一個函數而後返回一串值的操做。那個序列會被 flatten,每一個值都會傳給下一步使用。在 Clojure 裏,那是 map 和 concat 聯合以後的結果,在 Scala 裏,這叫作 flatMap,而在 C# 裏叫 selectMany。
在下圖中,左邊的流程是咱們優化以前的查詢,右邊的是優化以後的。和第一個例子同樣,咱們一樣從 4 開始,計算平方,而後對這個值作減一的操做,返回自己和加一的操做。Pig 會獲得這個值的集合而後作 flatten,使每一個值都成爲下一步的輸入。注意在和 Pig 交互的時候咱們要序列化和反序列化。第三步,也就是最後一步對數據進行過濾,在這個例子中咱們只保留奇數值。如圖所示,咱們在任意兩步之間都序列化和反序列化數據。
右邊的圖顯示了優化後的結果。每一個操做都返回了一個元素序列。map 操做返回一個只有單元素 16 的序列,mapcat 也同樣,過濾操做返回 0 元素或單元素的序列。經過是這些命令保持一致,咱們能夠很容易的把他們合併到一塊兒。咱們在一套命令中flattrn 了更多的值序列,可是在步驟之間沒有序列化的消耗。雖然卡起來更復雜,可是這個優化是每一個步驟都執行的更快了。
交互式開發,測試,以及可調試性是 PigPen 的關鍵功能。若是你有一個一次運行好幾天的 job,那你最不想看到的是跑了十一個小時後冒出來一個 bug。PigPen 有個基於 rx 的本地運行模式。這可讓咱們對查詢寫單元測試。這樣咱們能夠更有把握的知道運行的時候不會掛掉,而且能返回期待的值。更牛逼的是這個功能可讓咱們進行交互式的開發。
一般狀況下,咱們剛開始會從數據源中選一些記錄來作單元測試。由於 PigPen 在 REPL 中返回數據,咱們不須要額外構造測試數據。這樣,經過 REPL,咱們能夠根據須要對 mock 數據作 map,filter,join 和 reduce 的操做。每一個步驟均可以驗證結果是否是咱們想要的。這種方法相對於寫一長串腳本而後憑空想象能產生更可靠的數據。還有一個有用的地方是能夠把複雜的查詢寫成幾個較小的函數單元。Map-reduce 查詢隨着數據源的量級可能產生劇烈的增長或減小。當你把腳本做爲一個總體測試的時候,你可能要讀一大堆數據,最後產生一小撮數據。經過把查詢細化成較小的單元,你能夠對讀 100 行,產生 2 行這樣子來測試一個單元,而後測試第二個單元的時候能夠用這兩行做爲模板來產生 100 多個數據。
調試模式對於解決異常頗有用,啓用後會在正常輸出的同時,把腳本中每一個操做的結果寫到磁盤上。這對於像 Hadoop 這樣的環境頗有用,在這種狀況下,你無法單步跟蹤代碼,並且每一個步驟均可能花好幾個小時。調試模式還能夠可視化流程圖。這樣能夠可視化的把執行計劃的和實際操做的輸出關聯起來。
要啓用調試模式,請參考 pig/write-script 和 pig/generate-script 的選項,這會在指定的目錄下寫額外的調試輸出。
啓用調試模式的例子:
(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)
要啓用可視化模式,能夠看看 pig/show 和 pig/dump&show。
可視化的例子:
(pig/show my-pigpen-query) ;; Shows a graph of the query (pig/dump&show my-pigpen-query) ;; Shows a graph and runs it locally
PigPen 有個好用的功能是能夠很容易的建立本身的操做符。例如,咱們能夠定義像求差集和交集這樣的集合和多集合的操做符,這些只是像 co-group
這樣的操做符的變體,可是若是能定義,測試它們,而後不再去想這些邏輯怎麼實現的,那就更好了。
這對更復雜的操做也是頗有用的。對於集合數據咱們有 sum
,avg
,min
,max
,sd
和 quantiles
這些可重用的統計操做符,還有 pivot
這樣的操做符能夠把多維數據分組而後對每組計數。
這些操做自己都是簡單的操做,可是當你把它們從你的查詢中抽象出來以後,你的查詢也會變的簡單不少。這時候你能夠花更多的時間去想怎麼解決問題,而不是每次都重複寫基本的統計方法。
咱們選擇 Pig 是由於咱們不想把 Pig 已有的優化的邏輯重寫一遍,不考慮語言層面的東西的話,Pig 在移動大數據方面作得很好。咱們的策略是利用 Pig 的 DataByteArray 二進制格式來移動序列化的 Clojure 數據。在大多數狀況下,Pig 不須要知道數據的底層展示形式。Byte array 能夠很快的作比較,這樣對於 join 和 group 操做,Pig 只須要簡單的比較序列化的二進制,若是序列化的輸出一致,在 Clojure 中值就相等。不過這對於數據排序不適用。二進制的排序其實沒什麼用,並且和原始數據的排序結果也不同。要想排序,還得把數據轉化回去,並且只能對簡單類型排序。這也是 Pig 強加給 PigPen 的爲數很少的一個缺陷。
咱們在決定作 PigPen 以前也評估過其餘語言。第一個要求就是那必須是一門編程語言,並非一種腳本語言加上一堆 UDF。咱們簡單看過 Scalding,它看上去頗有前途,可是咱們的團隊主要是用的 Clojure。 能夠這麼說,PigPen 對於 Clojure 就像是 Scalding 對於 Scala。Cascalog 是用 Clojure 寫 map-reduce 一般會用的語言,可是從過去的經驗來看,Cascalog 對於平常工做其實沒什麼用,你須要學一套複雜的新語法和不少概念,經過變量名對齊來作隱式 join 也不是理想的方案,若是把操做符順序弄錯了會形成很大的性能問題,Cascalog 會 flatten 數據結果(這可能很浪費),並且組合查詢讓人感受很彆扭。
咱們也考慮過對 PigPen 用一門宿主語言。這樣也能在 Hive 之上構建相似的抽象,可是對每一箇中間產物都定義 schema 跟 Clojure 的理念不符。並且 Hive 相似與 SQL,使得從功能性語言翻譯更難。像 SQL 和 Hive 這樣的關係模型語言與像 Clojure 和 Pig 這樣的功能性語言之間有着巨大的差。最後,最直接的解決辦法就是在 Pig 之上作一層抽象。