這篇文章翻譯自 http://nathanmarz.com/blog/introducing-cascalog-a-clojure-based-query-language-for-hado.html。原文做者是寫 Storm 和 Cascalog 項目的發起人。翻譯這篇文章也爲了下次須要參考的時候能有個中文版本,畢竟中文的看起來更快一些。html
如下進入正文。git
好吧,咱們如今來看看 Cascalog 裏都有哪些東西,我會經過一系列實例來講明。這些實例都用了 Cascalog 項目中的 「playground」 ,因此我建議你把 Cascalog 的代碼下載下來,而後在 REPL 環境裏跟着我一塊兒作(跟着 README 作,只須要花幾分鐘就能搞定)。github
首先啓動 REPL 環境,加載 「playground」:數據庫
lein repl user=> (use 'cascalog.playground) (bootstrap)
這會把咱們運行實例須要的全部東西都加載進來。你能夠在 playground.clj 裏看到咱們等會查詢須要用到的數據集。咱們先來執行一條查詢來找出全部 25 歲的人:bootstrap
user=> (?<- (stdout) [?person] (age ?person 25))
這個查詢能夠當作 「Find all ?person for which ?person has an age that is equal to 25」。在做業運行的時候,你能夠看到 Hadoop 的 log,幾秒鐘以後這個查詢的結果就會被打印出來。函數
而後再來一個範圍查詢找出數據集中全部年齡小於 30 歲的人:oop
user=> (?<- (stdout) [?person] (age ?person ?age) (< ?age 30))
這也很簡單,此次咱們是把年齡綁定到 ?age,而後再加上一個約束說這個 ?age 要比 30 小。翻譯
而後再來一個查詢,此次咱們會在結果裏包含 ages 和 people:code
user=> (?<- (stdout) [?person ?age] (age ?person ?age) (< ?age 30))
咱們要作的只是在查詢的 vector 裏面加上 ?age。orm
再來一個查詢找出 Emily 關注的全部男性(male people):
user=> (?<- (stdout) [?person] (follows "emily" ?person) (gender ?person "m"))
你可能沒注意,這實際上是一個 Join 操做,兩個 ?person 是同一個東西,而因爲 「follows」 和 「gender」 是兩個獨立的數據源,Cascalog 會用一個 Join 操做來解析這個查詢。
<!--more-->
而後咱們更具體的看一下查詢的結構,以解析如下查詢爲例:
user=> (?<- (stdout) [?person ?a2] (age ?person ?age) (< ?age 30) (* 2 ?age :> ?a2))
咱們用的操做符是 ?<-
,這個操做符會定義個查詢而後執行。?<-
實際上是對 <-
(建立查詢的操做符) 和 ?-
(執行查詢的操做符) 的包裝。咱們會在後面建立更復雜的查詢的時候再來看看怎麼用。
首先,咱們要在查詢裏說明想把結果發送到哪裏,在這裏咱們用了 (stdout)
,(stdout)
會建立一個 Cascalog 的 tap,這個 tap 在查詢結束以後把內容寫到標準輸出。任意的 Cascalog tap 均可以做爲輸出,也就是說,你能夠把輸出的數據寫到任意的文件格式(如 Sequence file, 普通文本,等等)。
在定義完的輸出以後,還須要在一個 Clojure vector 裏定義查詢的結果變量,這裏咱們用了 ?person 和 ?a2。
接下來,要定義一些 「謂詞」 來定義和約束結果變量。一共有三種謂詞:
1 生成器(Generators):生成器是一個數據源,包含兩種:
<-
定以的查詢2 操做(Operations):全部變量的一個隱式關係,能夠是綁定新變量的函數,或者是一個過濾器(filter)。
3 累加器(Aggregators):count
,sum
,min
,max
,等等
謂詞有一個名字,一串輸入變量和一串輸出變量,咱們上面的謂詞有:
:>
關鍵字用來分隔輸入變量和輸出變量,若是沒有指定 :>
關鍵字,則變量會被當作操做(operations)的輸入或者生成器(generators)和累加器(aggregators)的輸出。
在 playground.clj 中,age
謂詞指向一個 tap,因此它是一個生成器,生成了 ?person 和 ?age。
謂詞 <
是一個 Clojure 函數,因爲咱們沒有指定輸出變量,這個謂詞將做爲一個過濾器,會過濾掉全部的 ?age 小於 30 的記錄。若是咱們在這裏指定:
(< ?age 30 :> ?young)
那麼 <
將做爲一個函數,並將一個 boolean 類型的變量綁定到 ?young,表示年齡是否小於 30。
謂詞的順序沒有關係。Cascalog 是純定義型的。
變量是以 ?
或 !
開始的符號,若是有時不須要輸出變量的值能夠用 _
符號來略過。查詢裏的其餘部分都會被求值而後做爲常量插入,這個功能叫 「常量替換」,到目前爲止咱們已經用了不少。若是把常量做爲輸出變量,會對函數的結果作一些過濾,好比:
(* 4 ?v2 :> 100)
這裏有兩個常量:4 和 100。4 替換了一個輸入變量,而 100 做爲過濾條件,將只保留乘 4 後等於 100 的 ?v2 的值。字符串,數字,其餘基本單元以及任意在 Hadoop serializers 註冊過的 Object 均可以做爲常量。
再回到例子,咱們來找出 follow 關係中關注比本身年齡小的人:
user=> (?<- (stdout) [?person1 ?person2] (age ?person1 ?age1) (follows ?person1 ?person2) (age ?person2 ?age2) (< ?age2 ?age1))
再執行一遍加上年齡差:
user=> (?<- (stdout) [?person1 ?person2 ?delta] (age ?person1 ?age1) (follows ?person1 ?person2) (age ?person2 ?age2) (- ?age2 ?age1 :> ?delta) (< ?delta 0))
如今來看一下咱們的第一個累加器,咱們來找出年齡小於 30 歲的人的數量:
user=> (?<- (stdout) [?count] (age _ ?a) (< ?a 30) (c/count ?count))
這個查詢會算出記錄中全部人的數量,咱們能夠按分組累計數量,好比,要找出每一個人關注的人的數量能夠這樣:
user=> (?<- (stdout) [?person ?count] (follows ?person _) (c/count ?count))
由於咱們在查詢的結果變量裏定義了 ?person,Cascalog 會按照 ?person 分組,而後對每一個分組執行 c/count
累加器。
你也能夠在一個查詢中使用多個累加器,它們會對同一個記錄的分組執行。例子:經過 count
和 sum
的組合來獲得一個國家的平均年齡:
user=> (?<- (stdout) [?country ?avg] (location ?person ?country _ _) (age ?person ?age) (c/count ?count) (c/sum ?age :> ?sum) (div ?sum ?count :> ?avg))
注意,咱們對最後累計後的結果用了 div
操做。依賴與累加器的輸出變量的操做都是在累加器運行完以後再執行的。
接下來來寫一個統計一組句子中每一個次出現的次數,首先用這個查詢定義一個自定義操做:
user=> (defmapcatop split [sentence] (seq (.split sentence "\\s+"))) user=> (?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word) (c/count ?count))
defmapcatop split
定義的操做把只有一個字段的 sentence 做爲輸入,輸出 0 個或多個元組。
deffilterop
定義返回布爾值的過濾操做,表示這個元組會不會被過濾掉。
defmapop
定義的函數只返回一個元組。
defaggregateop
定義一個累加器。
這些操做也能夠在 Cascalog 的 workflow API 中被直接使用。
咱們的 word count 的查詢還有一個問題,就是在會區分大小寫,咱們能夠這樣修改:
user=> (defn lowercase [w] (.toLowerCase w)) user=> (?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word1) (lowercase ?word1 :> ?word) (c/count ?count))
就如你所看到的,普通的 Clojure 函數也能夠被當作操做來使用。Clojure 函數在沒有輸入參數的時候是一個過濾器,在有輸入參數的時候是一個 map 操做。想要輸出 0 個或多個元組必須用 defmapcatop
。
這還有一個查詢,會統計按年齡和性別分組後各組人的數量:
user=> (defn agebucket [age] (find-first (partial <= age) [17 25 35 45 55 65 100 200])) user=> (?<- (stdout) [?bucket ?gender ?count] (age ?person ?age) (gender ?person ?gender) (agebucket ?age :> ?bucket) (c/count ?count))
Cascalog 有個叫 「非空變量」 的功能,可讓你更優雅的處理空值。咱們到目前爲止一直在用非空變量。以 ?
開頭的變量都是非空變量,以 !
開頭的變量是能夠爲空的變量,當非空變量被綁上控制時,Cascalog 會過濾掉。
咱們比較下面兩個查詢來看看非空變量的效果:
user=> (?<- (stdout) [?person ?city] (location ?person _ _ ?city)) user=> (?<- (stdout) [?person !city] (location ?person _ _ !city))
第二個查詢在結果集中會包含一些空值。
最後,咱們來看一看由子查詢組成的複雜查詢。先查出一對 follow 關係中的兩我的都 follow 2 個以上人的集合:
user=> (let [many-follows (<- [?person] (follows ?person _) (c/count ?c) (> ?c 2))] (?<- (stdout) [?person1 ?person2] (many-follows ?person1) (many-follows ?person2) (follows ?person1 ?person2)))
這裏咱們用 let
定義一個 many-follow 子查詢,這個子查詢使用 <-
來定義。而後就能夠在 let
的 body 裏使用 many-follow 子查詢。
還能夠運行包含多個輸出的查詢。若是咱們還想要上面查詢中 many-follow 子查詢的結果,能夠這樣寫:
user=> (let [many-follows (<- [?person] (follows ?person _) (c/count ?c) (> ?c 2)) active-follows (<- [?p1 ?p2] (many-follows ?p1) (many-follows ?p2) (follows ?p1 ?p2))] (?- (stdout) many-follows (stdout) active-follows))
這裏咱們先定義了兩個查詢,可是沒有執行。在最後才用查詢操做符 ?-
來順序執行兩個查詢。