Cascalog 入門(1)—— 運行於 Hadoop 的基於 Clojure 的查詢語言

運行於 Hadoop 的基於 Clojure 的查詢語言

這篇文章翻譯自 http://nathanmarz.com/blog/introducing-cascalog-a-clojure-based-query-language-for-hado.html。原文做者是寫 StormCascalog 項目的發起人。翻譯這篇文章也爲了下次須要參考的時候能有個中文版本,畢竟中文的看起來更快一些。html

如下進入正文。git

主要特點

  • 簡單: 函數,過濾器(filter)和累加器(aggregators)都用統一的語法,Join 操做都是隱式的,看起來很天然
  • 表現力強: 邏輯表現力強,很容易在查詢中運行任意的 Clojure 代碼
  • 交互性: 能夠在 Clojure REPL 中運行查詢語句
  • 可擴展性: Cascalog 的查詢語句會被解析爲一系列 MapReduce Job
  • 查詢任意數據: 經過 Cascalog 的 「Tap」 抽象能夠查詢 HDFS 文件,數據庫數據和(或)本地數據
  • 處理空值: 空值可能很麻煩,Cascalog 有一個叫 「非空變量」 的功能能夠簡化空值的處理
  • 與 Cascading 的互操做: 爲 Cascalog 定以的操做能夠用到 Cascading
  • 與 Coljure 的互操做: 可使用 Clojure 函數做爲 Cascalog 操做和過濾器,並且由於 Cascalog 是一種 Clojure DSL,能夠講 Cascalog 運用到其餘的 Clojure 代碼

好吧,咱們如今來看看 Cascalog 裏都有哪些東西,我會經過一系列實例來講明。這些實例都用了 Cascalog 項目中的 「playground」 ,因此我建議你把 Cascalog 的代碼下載下來,而後在 REPL 環境裏跟着我一塊兒作(跟着 README 作,只須要花幾分鐘就能搞定)。github

基本查詢

首先啓動 REPL 環境,加載 「playground」:數據庫

lein repl
user=> (use 'cascalog.playground) (bootstrap)

這會把咱們運行實例須要的全部東西都加載進來。你能夠在 playground.clj 裏看到咱們等會查詢須要用到的數據集。咱們先來執行一條查詢來找出全部 25 歲的人:bootstrap

user=> (?<- (stdout) [?person] (age ?person 25))

這個查詢能夠當作 「Find all ?person for which ?person has an age that is equal to 25」。在做業運行的時候,你能夠看到 Hadoop 的 log,幾秒鐘以後這個查詢的結果就會被打印出來。函數

而後再來一個範圍查詢找出數據集中全部年齡小於 30 歲的人:oop

user=> (?<- (stdout) [?person] (age ?person ?age) (< ?age 30))

這也很簡單,此次咱們是把年齡綁定到 ?age,而後再加上一個約束說這個 ?age 要比 30 小。翻譯

而後再來一個查詢,此次咱們會在結果裏包含 ages 和 people:code

user=> (?<- (stdout) [?person ?age] (age ?person ?age)
               (< ?age 30))

咱們要作的只是在查詢的 vector 裏面加上 ?age。orm

再來一個查詢找出 Emily 關注的全部男性(male people):

user=> (?<- (stdout) [?person] (follows "emily" ?person)
               (gender ?person "m"))

你可能沒注意,這實際上是一個 Join 操做,兩個 ?person 是同一個東西,而因爲 「follows」 和 「gender」 是兩個獨立的數據源,Cascalog 會用一個 Join 操做來解析這個查詢。

查詢的結構

<!--more-->

而後咱們更具體的看一下查詢的結構,以解析如下查詢爲例:

user=> (?<- (stdout) [?person ?a2] (age ?person ?age)
              (< ?age 30) (* 2 ?age :> ?a2))

咱們用的操做符是 ?<-,這個操做符會定義個查詢而後執行。?<- 實際上是對 <-(建立查詢的操做符) 和 ?-(執行查詢的操做符) 的包裝。咱們會在後面建立更復雜的查詢的時候再來看看怎麼用。

首先,咱們要在查詢裏說明想把結果發送到哪裏,在這裏咱們用了 (stdout)(stdout) 會建立一個 Cascalog 的 tap,這個 tap 在查詢結束以後把內容寫到標準輸出。任意的 Cascalog tap 均可以做爲輸出,也就是說,你能夠把輸出的數據寫到任意的文件格式(如 Sequence file, 普通文本,等等)。

在定義完的輸出以後,還須要在一個 Clojure vector 裏定義查詢的結果變量,這裏咱們用了 ?person 和 ?a2。

接下來,要定義一些 「謂詞」 來定義和約束結果變量。一共有三種謂詞:

1 生成器(Generators):生成器是一個數據源,包含兩種:

  • Cascading Tap:好比 HDFS 上的一份數據
  • 一個由 <- 定以的查詢

2 操做(Operations):全部變量的一個隱式關係,能夠是綁定新變量的函數,或者是一個過濾器(filter)。

3 累加器(Aggregators):countsumminmax,等等

謂詞有一個名字,一串輸入變量和一串輸出變量,咱們上面的謂詞有:

  • (age ?person ?age)
  • (< ?age 30)
  • (* 2 ?age :> ?a2)

:> 關鍵字用來分隔輸入變量和輸出變量,若是沒有指定 :> 關鍵字,則變量會被當作操做(operations)的輸入或者生成器(generators)和累加器(aggregators)的輸出。

在 playground.clj 中,age 謂詞指向一個 tap,因此它是一個生成器,生成了 ?person 和 ?age。

謂詞 < 是一個 Clojure 函數,因爲咱們沒有指定輸出變量,這個謂詞將做爲一個過濾器,會過濾掉全部的 ?age 小於 30 的記錄。若是咱們在這裏指定:

(< ?age 30 :> ?young)

那麼 < 將做爲一個函數,並將一個 boolean 類型的變量綁定到 ?young,表示年齡是否小於 30。

謂詞的順序沒有關係。Cascalog 是純定義型的。

變量和常量替換

變量是以 ?! 開始的符號,若是有時不須要輸出變量的值能夠用 _ 符號來略過。查詢裏的其餘部分都會被求值而後做爲常量插入,這個功能叫 「常量替換」,到目前爲止咱們已經用了不少。若是把常量做爲輸出變量,會對函數的結果作一些過濾,好比:

(* 4 ?v2 :> 100)

這裏有兩個常量:4 和 100。4 替換了一個輸入變量,而 100 做爲過濾條件,將只保留乘 4 後等於 100 的 ?v2 的值。字符串,數字,其餘基本單元以及任意在 Hadoop serializers 註冊過的 Object 均可以做爲常量。

再回到例子,咱們來找出 follow 關係中關注比本身年齡小的人:

user=> (?<- (stdout) [?person1 ?person2] 
    (age ?person1 ?age1) (follows ?person1 ?person2)
    (age ?person2 ?age2) (< ?age2 ?age1))

再執行一遍加上年齡差:

user=> (?<- (stdout) [?person1 ?person2 ?delta] 
    (age ?person1 ?age1) (follows ?person1 ?person2)
    (age ?person2 ?age2) (- ?age2 ?age1 :> ?delta)
    (< ?delta 0))

累加器(Aggregators)

如今來看一下咱們的第一個累加器,咱們來找出年齡小於 30 歲的人的數量:

user=> (?<- (stdout) [?count] (age _ ?a) (< ?a 30)
              (c/count ?count))

這個查詢會算出記錄中全部人的數量,咱們能夠按分組累計數量,好比,要找出每一個人關注的人的數量能夠這樣:

user=> (?<- (stdout) [?person ?count] (follows ?person _)
              (c/count ?count))

由於咱們在查詢的結果變量裏定義了 ?person,Cascalog 會按照 ?person 分組,而後對每一個分組執行 c/count 累加器。

你也能夠在一個查詢中使用多個累加器,它們會對同一個記錄的分組執行。例子:經過 countsum 的組合來獲得一個國家的平均年齡:

user=> (?<- (stdout) [?country ?avg] 
   (location ?person ?country _ _) (age ?person ?age)
   (c/count ?count) (c/sum ?age :> ?sum)
   (div ?sum ?count :> ?avg))

注意,咱們對最後累計後的結果用了 div 操做。依賴與累加器的輸出變量的操做都是在累加器運行完以後再執行的。

自定義操做

接下來來寫一個統計一組句子中每一個次出現的次數,首先用這個查詢定義一個自定義操做:

user=> (defmapcatop split [sentence]
       (seq (.split sentence "\\s+")))

user=> (?<- (stdout) [?word ?count] (sentence ?s)
              (split ?s :> ?word) (c/count ?count))

defmapcatop split 定義的操做把只有一個字段的 sentence 做爲輸入,輸出 0 個或多個元組。

deffilterop 定義返回布爾值的過濾操做,表示這個元組會不會被過濾掉。

defmapop 定義的函數只返回一個元組。

defaggregateop 定義一個累加器。

這些操做也能夠在 Cascalog 的 workflow API 中被直接使用。

咱們的 word count 的查詢還有一個問題,就是在會區分大小寫,咱們能夠這樣修改:

user=> (defn lowercase [w] (.toLowerCase w))

user=> (?<- (stdout) [?word ?count] 
        (sentence ?s) (split ?s :> ?word1)
        (lowercase ?word1 :> ?word) (c/count ?count))

就如你所看到的,普通的 Clojure 函數也能夠被當作操做來使用。Clojure 函數在沒有輸入參數的時候是一個過濾器,在有輸入參數的時候是一個 map 操做。想要輸出 0 個或多個元組必須用 defmapcatop

這還有一個查詢,會統計按年齡和性別分組後各組人的數量:

user=> (defn agebucket [age] 
        (find-first (partial <= age) [17 25 35 45 55 65 100 200]))

user=> (?<- (stdout) [?bucket ?gender ?count] 
        (age ?person ?age) (gender ?person ?gender)
        (agebucket ?age :> ?bucket) (c/count ?count))

非空變量

Cascalog 有個叫 「非空變量」 的功能,可讓你更優雅的處理空值。咱們到目前爲止一直在用非空變量。以 ? 開頭的變量都是非空變量,以 ! 開頭的變量是能夠爲空的變量,當非空變量被綁上控制時,Cascalog 會過濾掉。

咱們比較下面兩個查詢來看看非空變量的效果:

user=> (?<- (stdout) [?person ?city] (location ?person _ _ ?city))

user=> (?<- (stdout) [?person !city] (location ?person _ _ !city))

第二個查詢在結果集中會包含一些空值。

子查詢

最後,咱們來看一看由子查詢組成的複雜查詢。先查出一對 follow 關係中的兩我的都 follow 2 個以上人的集合:

user=> (let [many-follows (<- [?person] (follows ?person _)
                               (c/count ?c) (> ?c 2))]
        (?<- (stdout) [?person1 ?person2] (many-follows ?person1)
         (many-follows ?person2) (follows ?person1 ?person2)))

這裏咱們用 let 定義一個 many-follow 子查詢,這個子查詢使用 <- 來定義。而後就能夠在 let 的 body 裏使用 many-follow 子查詢。

還能夠運行包含多個輸出的查詢。若是咱們還想要上面查詢中 many-follow 子查詢的結果,能夠這樣寫:

user=> (let [many-follows (<- [?person] (follows ?person _)
                             (c/count ?c) (> ?c 2))
      active-follows (<- [?p1 ?p2] (many-follows ?p1)
                       (many-follows ?p2) (follows ?p1 ?p2))]
    (?- (stdout) many-follows (stdout) active-follows))

這裏咱們先定義了兩個查詢,可是沒有執行。在最後才用查詢操做符 ?- 來順序執行兩個查詢。

相關文章
相關標籤/搜索