Google的工程師爲了方便本身對MapReduce的實現開發了一個叫作Sawzall的工具,而後發表了幾篇論文放在網上,Apache根據論文開發出了相似Sawzall的Pig語言,因此說Pig就是Sawzall的山寨版。php
Pig 是一個高級過程語言,適合於使用 Hadoop 和 MapReduce 平臺來查詢大型半結構化數據集。經過容許對分佈式數據集進行相似SQL的查詢,Pig能夠簡化Hadoop的使用。它提供的SQL-LIKE語言叫Pig Latin,該語言的編譯器會把類SQL的數據分析請求轉換爲一系列通過優化處理的MapReduce運算。Pig爲複雜的海量數據並行計算提供了一個簡單的操做和編程接口。html
在MapReduce框架中有map和reduce兩個函數,若是親手開發一個MapReduce應用,須要從編寫代碼,編譯,部署,放在Hadoop上執行這個MapReduce程序須要耗費必定時間,有了Pig後不只僅能夠簡化你對MapReduce的開發,並且還能夠對不一樣的數據之間進行轉換。java
Pig能夠純本地運行,下載Pig包後解壓,執行「bin/pig -x local」命令直接運行,很是簡單,這就是local模式,可是生產環境中每每不這樣使用,而是將Pig與hdfs/hadoop集羣環境進行對接。因此說Pig就是對mapreduce算法(框架)實現的一套shell腳本,相似SQL語句,在Pig中稱之爲Pig Latin,在這套腳本中能夠對加載進來的數據進行排序、過濾、求和、分組(group by)、關聯(Joining),Pig也能夠由用戶自定義一些函數對數據集進行操做,即UDF(user-defined functions)。linux
通過Pig Latin的轉換後變成了一道MapReduce的做業,經過MapReduce多個線程、進程或者獨立系統並行執行處理的結果集進行分類和概括。Map() 和 Reduce()兩個函數會並行運行,即便不是在同一的系統的同一時刻也在同時運行一套任務,當全部的處理都完成以後,結果將被排序,格式化,而且保存到一個文件。Pig利用MapReduce將計算分紅兩個階段,第一個階段分解成爲小塊而且分佈到每個存儲數據的節點上進行執行,對計算的壓力進行分散,第二個階段聚合第一個階段執行的這些結果,這樣能夠達到很是高的吞吐量,經過很少的代碼和工做量就可以驅動上千臺機器並行計算,充分利用計算機的資源,克服了運行中的瓶頸。正則表達式
1.2 應用場景算法
Apache Pig是一個基於Hadoop的大規模數據分析平臺,用Pig對TB級別的數據進行查詢很是輕鬆,而且這些海量的數據都是非結構化的數據,例如:一堆文件多是log4j輸出日誌而且存放於跨越多個計算機的多個磁盤上,用來記錄上千臺在線服務器的健康狀態日誌,交易日至,IP訪問記錄,應用服務日誌等等。一般須要統計或者抽取這些記錄,或者查詢異常記錄,對這些記錄造成一些報表,將數據轉化爲有價值的信息,這樣的話查詢會較爲複雜,此時相似MySQL這樣的產品就不必定能知足對速度、執行效率上的需求,而用Apache的Pig能夠實現這樣的目標。shell
1.3 與hive比較數據庫
雖然pig與hive都是一種大規模數據分析工具,但Hive彷佛有點數據庫的影子,而Pig則是一個對MapReduce實現的工具(腳本)。二者都擁有本身的表達語言,其目的是將MapReduce的實現進行簡化,而且讀寫操做數據最終都是存儲在HDFS分佈式文件系統上。看起來Pig和Hive有些相似的地方,但也有些不一樣,它們的比較以下圖所示。apache
1. Language編程
在Hive中能夠執行插入/刪除等操做,可是Pig沒有發現有能夠插入數據的方法;
2. Schemas
Hive中至少還有一個「表」的概念,可是Pig中基本沒有表的概念,所謂的表創建在Pig Latin腳本中,更不用提metadata了;
2. Partitions
Pig中沒有表的概念,因此說到分區對於Pig來講基本免談,hive中有分區的概念;
3. Server
Hive能夠依託於Thrift啓動一個服務器,提供遠程調用,Pig沒有;
4. Shell
在Pig能夠執行ls 、cat這樣很經典的linux shell命令,可是在Hive中沒有;
5. Web Interface
Hive有,Pig無;
6. JDBC/ODBC
Pig無,Hive有。
2 產品架構
Pig由5個主要的部分構成,以下圖所示:
1.Pig本身實現的一套框架對輸入、輸出的人機交互部分的實現,就是Pig Latin;
2.Zebra是Pig與HDFS/Hadoop的中間層、Zebra是MapReduce做業編寫的客戶端,Zerbra用結構化的語言實現了對hadoop物理存儲元數據的管理,也是對Hadoop的數據抽象層,在Zebra中有2個核心的類 TableStore(寫)/TableLoad(讀)對Hadoop上的數據進行讀寫操做;
3.Pig中的Streaming主要分爲4個組件:Pig Latin、邏輯層(Logical Layer)、物理層(Physical Layer)、Streaming具體實現(Implementation),Streaming會建立一個Map/Reduce做業,並把它發送給合適的集羣,同時監視這個做業的在集羣環境中的整個執行過程;
4.MapReduce是每臺機器上進行分佈式計算的框架(算法);
5.HDFS最終存儲數據的部分。
Hive最新版本是apache-hive-0.13.1-bin.tar.gz,下載地址爲http://apache.fayea.com/apache-mirror/hive/hive-0.13.1/。
下載後,選個目錄,解壓,命令以下:
# tar -zxvf hive-0.10.0.tar.gz
關係(relation)、包(bag)、元組(tuple)、字段(field)、數據(data)的關係
一個關係(relation)是一個包(bag),更具體地說,是一個外部的包(outer bag)。
一個包(bag)是一個元組(tuple)的集合。在pig中表示數據時,用大括號{}括起來的東西表示一個包——不管是在教程中的實例演示,仍是在pig交互模式下的輸出,都遵循這樣的約定。
一個元組(tuple)是若干字段(field)的一個有序集(ordered set)。在pig中表示數據時,用小括號()括起來的東西表示一個元組。
一個字段是一塊數據(data)。
「元組」這個詞很抽象,能夠把它想像成關係型數據庫表中的一行,它含有一個或多個字段,其中,每個字段能夠是任何數據類型,而且能夠有或者沒有數據。
「關係」能夠比喻成關係型數據庫的一張表,而上面說了,「元組」能夠比喻成數據表中的一行,在關係型數據庫中,同一張表中的每一行都有固定的字段數,但pig中的「關係」與「元組」之間,「關係」並不要求每個「元組」都含有相同數量的字段,而且也不會要求各「元組」中在相同位置處的字段具備相同的數據類型。
一個計算多維度組合下平均值的實際例子
假設有數據文件:a.txt(各數值之間是以tab分隔的):
[root@localhost pig]$ cat a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
問題以下:怎樣求出在第2、3、4列的全部組合的狀況下,最後兩列的平均值分別是多少?
例如,第2、3、4列有一個組合爲(1,2,3),即第一行和最後一行數據。對這個維度組合來講,最後兩列的平均值分別爲:
(4.2+1.4)/2=2.8
(9.8+0.2)/2=5.0
而對於第2、3、4列的其餘全部維度組合,都分別只有一行數據,所以最後兩列的平均值其實就是它們自身。
特別地,組合(7,9,9)有兩行記錄:第3、四行,可是第三行數據的最後兩列沒有值,所以它不該該被用於平均值的計算,也就是說,在計算平均值時,第三行是無效數據。因此(7,9,9)組合的最後兩列的平均值爲2.6和6.2。
如今用pig來算一下,而且輸出最終的結果。
先進入本地調試模式(pig -x local),再依次輸入以下pig代碼:
A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
B = GROUP A BY (col2, col3, col4);
C = FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);
DUMP C;
pig輸出結果以下:
((1,2,3),2.8,5.0)
((1,2,5),7.7,5.9)
((3,0,5),3.5,2.1)
((7,9,9),2.6,6.2)
下面,依次來看看每一句pig代碼分別獲得了什麼樣的數據。
①加載a.txt文件,並指定每一列的數據類型分別爲chararray(字符串),int,int,int,double,double。同時,給予每一列別名,分別爲col1,col2,……,col6。這個別名在後面的數據處理中會用到——若是你不指定別名,那麼在後面的處理中,就只能使用索引($0,$1,……)來標識相應的列了,這樣可讀性會變差,所以,在列固定的狀況下,仍是指定別名的好。
將數據加載以後,保存到變量A中,A的數據結構以下:
A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}
可見,A是用大括號括起來的東西。根據本文前面的說法,A是一個包(bag)。A與想像中的樣子是一致的,也就是與前面打印出來的a.txt文件的內容是同樣的,仍是一行一行的相似於「二維表」的數據。
②按照A的第2、3、4列,對A進行分組。pig會找出全部第2、3、4列的組合,並按照升序進行排列,而後將它們與對應的包A整合起來,獲得以下的數據結構:
B: {group: (col2: int,col3: int,col4: int),A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}}
可見,A的第2、3、4列的組合被pig賦予了一個別名:group,同時B的每一行其實就是由一個group和若干個A組成的——注意,是若干個A。這裏之因此只顯示了一個A,是由於這裏表示的是數據結構,而不表示具體數據有多少組。
實際的數據爲:
((1,2,3),{(a,1,2,3,4.2,9.8),(a,1,2,3,1.4,0.2)})
((1,2,5),{(a,1,2,5,7.7,5.9)})
((3,0,5),{(a,3,0,5,3.5,2.1)})
((7,9,9),{(b,7,9,9,,),(a,7,9,9,2.6,6.2)})
可見,與前面所說的同樣,組合(1,2,3)對應了兩行數據,組合(7,9,9)也對應了兩行數據。
③計算每一種組合下的最後兩列的平均值。
③根據上面獲得的B的數據,能夠把B想像成一行一行的數據(只不過這些行不是對稱的),FOREACH的做用是對B的每一行數據進行遍歷,而後進行計算。
GENERATE能夠理解爲要生成什麼樣的數據,這裏的group就是上一步操做中B的第一項數據(即pig爲A的第2、3、4列的組合賦予的別名),因此它告訴了在數據集C的每一行裏,第一項就是B中的group——相似於(1,2,5)這樣的元組)。
而AVG(A.col5)這樣的計算,則是調用了pig的一個求平均值的函數AVG,用於對A的名爲col5的列求平均值。在加載數據到A的時候,咱們已經給每一列起了個別名,col5就是倒數第二列。
這裏的A.col5,指的是B的每一行中的A,而不是包含所有數據的那個A。拿B的第一行來舉例:
((1,2,3),{(a,1,2,3,4.2,9.8),(a,1,2,3,1.4,0.2)})
遍歷到B的這一行時,要計算AVG(A.col5),pig會找到(a,1,2,3,4.2,9.8)中的4.2,以及(a,1,2,3,1.4,0.2)中的1.4,加起來除以2,就獲得了平均值。
同理,咱們也知道了AVG(A.col6)是怎麼算出來的。但還有一點要注意的:對(7,9,9)這個組,它對應的數據(b,7,9,9,,)裏最後兩列是無值的,這是由於咱們的數據文件對應位置上不是有效數字,而是兩個「-」,pig在加載數據的時候自動將它置爲空了,而且計算平均值的時候,也不會把這一組數據考慮在內(至關於忽略這組數據的存在)。
因此C的數據結構是這樣的:
C: {group: (col2: int,col3: int,col4: int),double,double}
④DUMP C就是將C中的數據輸出到控制檯。若是要輸出到文件,須要使用:
STORE C INTO 'output';
這樣pig就會在當前目錄下新建一個「output」目錄(該目錄必須事先不存在),並把結果文件放到該目錄下。
若是要實現相同的功能,用Java或C++寫一個Map-Reduce應用程序須要很長時間,僅僅寫一個build.xml或者Makefile,所需的時間就是寫這段pig代碼的幾十倍了!
正由於pig有如此優點,它才獲得了普遍應用。
在SQL語句中,要統計表中數據的行數,很簡單:
SELECT COUNT(*) FROM table_name WHERE condition
在pig中,也有一個COUNT函數,與SQL不一樣,下面舉例說明。
假設要計算數據文件a.txt的行數:
[root@localhost pig]$ cat a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
正確的作法是:
A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
B = GROUP A ALL;
C = FOREACH B GENERATE COUNT(A.col2);
DUMP C;
輸出結果:
(6)
代表有6行數據。
在這個例子中,統計COUNT(A.col2)和COUNT(A)的結果是同樣的,可是,若是col2這一列中含有空值:
[root@localhost pig]$ cat a.txt
a 1 2 3 4.2 9.8
a 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
則如下pig程序及執行結果爲:
grunt> A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
grunt> B = GROUP A ALL;
grunt> C = FOREACH B GENERATE COUNT(A.col2);
grunt> DUMP C;
(5)
可見,結果爲5行。那是由於你LOAD數據的時候指定了col2的數據類型爲int,而a.txt的第二行數據是空的,所以數據加載到A之後,有一個字段就是空的:
grunt> DUMP A;
(a,1,2,3,4.2,9.8)
(a,,0,5,3.5,2.1)
(b,7,9,9,,)
(a,7,9,9,2.6,6.2)
(a,1,2,5,7.7,5.9)
(a,1,2,3,1.4,0.2)
在COUNT的時候,null的字段不會被計入在內,因此結果是5。
從字面上看,flatten就是「弄平」的意思,下面經過一個具體實例瞭解flatten在pig中的做用。
仍是採用前面的a.txt數據文件來講明:
[root@localhost pig]$ cat a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
若是按照前文的作法,計算多維度組合下的最後兩列的平均值,則:
grunt> A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B GENERATE group, AVG(A.col5), AVG(A.col6);
grunt> DUMP C;
((1,2,3),2.8,5.0)
((1,2,5),7.7,5.9)
((3,0,5),3.5,2.1)
((7,9,9),2.6,6.2)
可見,輸出結果中,每一行的第一項是一個tuple(元組),來試試看FLATTEN的做用:
grunt> A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B GENERATE FLATTEN(group), AVG(A.col5), AVG(A.col6);
grunt> DUMP C;
(1,2,3,2.8,5.0)
(1,2,5,7.7,5.9)
(3,0,5,3.5,2.1)
(7,9,9,2.6,6.2)
被FLATTEN的group原本是一個元組,如今變成了扁平的結構了。按照pig文檔的說法,FLATTEN用於對元組(tuple)和包(bag)「解嵌套」(un-nest):
在有的時候,不「解嵌套」的數據結構是不利於觀察的,輸出這樣的數據可能不利於外圍數程序的處理(例如,pig將數據輸出到磁盤後,還須要用其餘程序作後續處理,而對一個元組,輸出的內容裏是含括號的,這就在處理流程上又要多一道去括號的工序),所以,FLATTEN提供了一個在某些狀況下能夠清楚、方便地分析數據的機會。
用於GROUP的key若是多於一個字段(正如本文前面的例子),則GROUP以後的數據的key是一個元組(tuple),不然它就是與用於GROUP的key相同類型的東西。
GROUP的結果是一個關係(relation),在這個關係中,每一組包含一個元組(tuple),這個元組包含兩個字段:(1)第一個字段被命名爲「group」——這一點很是容易與GROUP關鍵字相混淆,但請區分開來。該字段的類型與用於GROUP的key類型相同。(2)第二個字段是一個包(bag),它的類型與被GROUP的關係的類型相同。
用含有null的字段來GROUP,結果也會將null計入,不會忽略。
假設有數據文件 a.txt 內容爲:
1 2 5
1 3
1 3
6 9 8
其中,每兩列數據之間是用tab分割的,第二行的第2列、第三行的第3列沒有內容(也就是說,加載到Pig裏以後,對應的數據會變成null),若是把這些數據按第1、第2列來GROUP的話,第1、2列中含有null的行會被忽略嗎?
來作一下試驗:
A = LOAD 'a.txt' AS (col1:int, col2:int, col3:int);
B = GROUP A BY (col1, col2);
DUMP B;
輸出結果爲:
((1,2),{(1,2,5)})
((1,3),{(1,3,)})
((1,),{(1,,3)})
((6,9),{(6,9,8)})
從上面的結果(第三行)可見,原數據中第1、2列裏含有null的行也被計入在內了,也就是說,GROUP操做是不會忽略null的,這與COUNT有所不一樣。
假設你有以下數據文件:
[root@localhost ~]# cat 3.txt
1 9
2 2
3 3
4 0
1 9
1 9
4 0
如今要找出第1、2列的組合中,每一種的個數分別爲多少,例如,(1,9)組合有3個,(4,0)組合有兩個,依此類推。
顯而易見,只須要用GROUP就能夠輕易完成這個任務。因而寫出以下代碼:
A = LOAD '3.txt' AS (col1: int, col2: int);
B = GROUP A ALL;
C = FOREACH B GENERATE group, COUNT(A);
DUMP C;
惋惜,結果不是咱們想要的:
(all,7)
咱們的本意是按全部列來GROUP,因而使用了GROUP ALL,可是這實際上變成了統計行數,下面的代碼就是一段標準的統計數據行數的代碼:
A = LOAD '3.txt' AS (col1: int, col2: int);
B = GROUP A ALL;
C = FOREACH B GENERATE COUNT(A);
DUMP C;
所以,上面的 C = FOREACH B GENERATE group, COUNT(A) 也無非就是多打印了一個group的名字(all)而已——group的名字被設置爲「all」,這是Pig幫你作的。
正確的作法很簡單,只須要按全部字段GROUP,就能夠了:
A = LOAD '3.txt' AS (col1: int, col2: int);
B = GROUP A BY (col1, col2);
C = FOREACH B GENERATE group, COUNT(A);
DUMP C;
結果以下:
((1,9),3)
((2,2),1)
((3,3),1)
((4,0),2)
這與前面分析的正確結果是同樣的。
仍是假設有以下數據:
[root@localhost pig]$ cat a.txt
a 1 2 3 4.2 9.8
a 3 0 5 3.5 2.1
b 7 9 9 - -
a 7 9 9 2.6 6.2
a 1 2 5 7.7 5.9
a 1 2 3 1.4 0.2
若是按照如下方式來加載數據:
A = LOAD 'a.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double);
那麼獲得的A的數據結構爲:
grunt> DESCRIBE A;
A: {col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double}
若是你要把A看成一個元組(tuple)來加載:
A = LOAD 'a.txt' AS (T : tuple (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double));
也就是想要獲得這樣的數據結構:
grunt> DESCRIBE A;
A: {T: (col1: chararray,col2: int,col3: int,col4: int,col5: double,col6: double)}
那麼,上面的方法將獲得一個空的A:
grunt> DUMP A;
()
()
()
()
()
()
那是由於數據文件a.txt的結構不適合於這樣加載成元組(tuple)。
若是有數據文件b.txt:
[root@localhost pig]$ cat b.txt
(a,1,2,3,4.2,9.8)
(a,3,0,5,3.5,2.1)
(b,7,9,9,-,-)
(a,7,9,9,2.6,6.2)
(a,1,2,5,7.7,5.9)
(a,1,2,3,1.4,0.2)
則使用上面所說的加載方法及結果爲:
grunt> A = LOAD 'b.txt' AS (T : tuple (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double));
grunt> DUMP A;
((a,1,2,3,4.2,9.8))
((a,3,0,5,3.5,2.1))
((b,7,9,9,,))
((a,7,9,9,2.6,6.2))
((a,1,2,5,7.7,5.9))
((a,1,2,3,1.4,0.2))
可見,加載的數據的結構確實被定義成了元組(tuple)。
在多維度組合下,如何計算某個維度組合裏的不重複記錄的條數
以數據文件 c.txt 爲例:
[root@localhost pig]$ cat c.txt
a 1 2 3 4.2 9.8 100
a 3 0 5 3.5 2.1 200
b 7 9 9 - - 300
a 7 9 9 2.6 6.2 300
a 1 2 5 7.7 5.9 200
a 1 2 3 1.4 0.2 500
問題:如何計算在第2、3、4列的全部維度組合下,最後一列不重複的記錄分別有多少條?例如,第2、3、4列有一個維度組合是(1,2,3),在這個維度維度下,最後一列有兩種值:100 和 500,所以不重複的記錄數爲2。同理可求得其餘的記錄條數。
pig代碼及輸出結果以下:
grunt> A = LOAD 'c.txt' AS (col1:chararray, col2:int, col3:int, col4:int, col5:double, col6:double, col7:int);
grunt> B = GROUP A BY (col2, col3, col4);
grunt> C = FOREACH B {D = DISTINCT A.col7; GENERATE group, COUNT(D);};
grunt> DUMP C;
((1,2,3),2)
((1,2,5),1)
((3,0,5),1)
((7,9,9),1)
來看看每一步分別生成了什麼樣的數據:
①LOAD不用說了,就是加載數據;
②GROUP也不用說了,和前文所說的同樣。GROUP以後獲得了這樣的數據:
grunt> DUMP B;
((1,2,3),{(a,1,2,3,4.2,9.8,100),(a,1,2,3,1.4,0.2,500)})
((1,2,5),{(a,1,2,5,7.7,5.9,200)})
((3,0,5),{(a,3,0,5,3.5,2.1,200)})
((7,9,9),{(b,7,9,9,,,300),(a,7,9,9,2.6,6.2,300)})
其實到這裏,咱們肉眼就能夠看出來最後要求的結果是什麼了,固然,必需要由pig代碼來完成,要否則怎麼應對海量數據?
③這裏的FOREACH與前面有點不同,這就是所謂的「嵌套的FOREACH」。
而後再解釋一下:FOREACH是對B的每一行進行遍歷,其中,B的每一行裏含有一個包(bag),每個包中含有若干元組(tuple)A,所以,FOREACH 後面的大括號裏的操做,實際上是對所謂的「內部包」(inner bag)的操做(詳情請參看FOREACH的說明),在這裏,指定了對A的col7這一列進行去重,去重的結果被命名爲D,而後再對D計數(COUNT),就獲得了咱們想要的結果。
④輸出結果數據,與前文所述的差很少。
這樣就達成了目的。
另外,DISTINCT操做用於去重,正由於它要把數據集合到一塊兒,才知道哪些數據是重複的,所以,它會產生reduce過程。同時,在map階段,它也會利用combiner來先去除一部分重複數據以加快處理速度。
pig中能夠嵌套使用shell進行輔助處理,下面,就以一個實際的例子來講明。
假設在某一步pig處理後,獲得了相似於下面 b.txt 中的數據:
[root@localhost pig]$ cat b.txt
1 5 98 = 7
34 8 6 3 2
62 0 6 = 65
問題:如何將數據中第4列中的「=」符號所有替換爲9999?
pig代碼及輸出結果以下:
grunt> A = LOAD 'b.txt' AS (col1:int, col2:int, col3:int, col4:chararray, col5:int);
grunt> B = STREAM A THROUGH `awk '{if($4 == "=") print $1"\t"$2"\t"$3"\t9999\t"$5; else print $0}'`;
grunt> DUMP B;
(1,5,98,9999,7)
(34,8,6,3,2)
(62,0,6,9999,65)
來看看這段代碼是如何作到的:
①加載數據。
②經過「STREAM … THROUGH …」的方式,能夠調用一個shell語句,用該shell語句對A的每一行數據進行處理。此處的shell邏輯爲:當某一行數據的第4列爲「=」符號時,將其替換爲「9999」;不然就照原樣輸出這一行。
③輸出B,可見結果正確。
如何統計一個字符串中包含的指定字符數?這能夠不算是個Pig的問題了,能夠把它認爲是一個shell的問題。從本文前面部分咱們已經知道,Pig中能夠用 STREAM ... THROUGH 來調用shell進行輔助數據處理。
假設有文本文件:
[root@localhost ~]$ cat 1.txt
123 abcdef:243789174
456 DFJKSDFJ:3646:555558888
789 yKDSF:00000%0999:2343324:11111:33333
如今要統計:每一行中,第二列裏所包含的冒號(「:」)分別爲多少?代碼以下:
A = LOAD '1.txt' AS (col1: chararray, col2: chararray);
B = STREAM A THROUGH `awk -F":" '{print NF-1}'` AS (colon_count: int);
DUMP B;
結果爲:
(1)
(2)
(4)
假設pig腳本輸出的文件是經過外部參數指定的,則此參數不能寫死,須要傳入。在pig中,使用傳入的參數以下所示:
STORE A INTO '$output_dir';
則這個「output_dir」就是個傳入的參數。在調用這個pig腳本的shell腳本中,能夠這樣傳入參數:
pig -param output_dir="/home/my_ourput_dir/" my_pig_script.pig
這裏傳入的參數「output_dir」的值爲「/home/my_output_dir/」。
與GROUP操做符同樣,COGROUP也是用來分組的,不一樣的是,COGROUP能夠按多個關係中的字段進行分組。
仍是以一個實例來講明,假設有如下兩個數據文件:
[root@localhost pig]$ cat a.txt
uidk 12 3
hfd 132 99
bbN 463 231
UFD 13 10
[root@localhost pig]$ cat b.txt
908 uidk 888
345 hfd 557
28790 re 00000
如今用pig作以下操做及獲得的結果爲:
grunt> A = LOAD 'a.txt' AS (acol1:chararray, acol2:int, acol3:int);
grunt> B = LOAD 'b.txt' AS (bcol1:int, bcol2:chararray, bcol3:int);
grunt> C = COGROUP A BY acol1, B BY bcol2;
grunt> DUMP C;
(re,{},{(28790,re,0)})
(UFD,{(UFD,13,10)},{})
(bbN,{(bbN,463,231)},{})
(hfd,{(hfd,132,99)},{(345,hfd,557)})
(uidk,{(uidk,12,3)},{(908,uidk,888)})
每一行輸出的第一項都是分組的key,第二項和第三項分別都是一個包(bag),其中,第二項是根據前面的key找到的A中的數據包,第三項是根據前面的key找到的B中的數據包。
來看看第一行輸出:「re」做爲group的key時,其找不到對應的A中的數據,所以第二項就是一個空的包「{}」,「re」這個key在B中找到了對應的數據(28790 re 00000),所以第三項就是包{(28790,re,0)}。
其餘輸出數據也相似。
假設有以下數據:
[root@localhost]# cat a.txt
1 3 4 7
1 3 5 4
2 7 0 5
9 8 6 6
如今咱們要統計第1、2列的不一樣組合有多少種,對本例來講,組合有三種:
1 3
2 7
9 8
也就是說咱們要的答案是3。
先寫出所有的Pig代碼:
A = LOAD 'a.txt' AS (col1:int, col2:int, col3:int, col4:int);
B = GROUP A BY (col1, col2);
C = GROUP B ALL;
D = FOREACH C GENERATE COUNT(B);
DUMP D;
而後再來看看這些代碼是如何計算出上面的結果的:
①第一行代碼加載數據,沒什麼好說的。
②第二行代碼,獲得第1、2列數據的全部組合。B的數據結構爲:
grunt> DESCRIBE B;
B: {group: (col1: int,col2: int),A: {col1: int,col2: int,col3: int,col4: int}}
把B DUMP出來,獲得:
((1,3),{(1,3,4,7),(1,3,5,4)})
((2,7),{(2,7,0,5)})
((9,8),{(9,8,6,6)})
很是明顯,(1,3),(2,7),(9,8)的全部組合已經被排列出來了,這裏獲得了若干行數據。下一步要作的是統計這樣的數據一共有多少行,也就獲得了第1、2列的組合有多少組。
③第三和第四行代碼,就實現了統計數據行數的功能。
這裏須要特別說明的是:
a)爲何倒數第二句代碼中是COUNT(B),而不是COUNT(group)?
咱們是對C進行FOREACH,因此要先看看C的數據結構:
grunt> DESCRIBE C;
C: {group: chararray,B: {group: (col1: int,col2: int),A: {col1: int,col2: int,col3: int,col4: int}}}
可見,能夠把C想像成一個map的結構,key是一個group,value是一個包(bag),它的名字是B,這個包中有N個元素,每個元素都對應到②中所說的一行。根據②的分析,咱們就是要統計B中元素的個數,所以,這裏是COUNT(B)。
b)COUNT函數的做用是統計一個包(bag)中的元素的個數:
COUNT
Computes the number of elements in a bag.
從C的數據結構看,B是一個bag,因此COUNT函數是能夠用於它的。
若是你試圖把COUNT應用於一個非bag的數據結構上,會發生錯誤,例如:
java.lang.ClassCastException: org.apache.pig.data.BinSedesTuple cannot be cast to org.apache.pig.data.DataBag
這是把Tuple傳給COUNT函數時發生的錯誤。
假設有int a = 3 和 int b = 2兩個數,在大多數編程語言裏,a/b獲得的是1,想獲得正確結果1.5的話,須要轉換爲float再計算。在Pig中其實和這種狀況同樣,下面就拿幾行數據來作個實驗:
[root@localhost ~]# cat a.txt
3 2
4 5
在Pig中:
grunt> A = LOAD 'a.txt' AS (col1:int, col2:int);
grunt> B = FOREACH A GENERATE col1/col2;
grunt> DUMP B;
(1)
(0)
可見,不加類型轉換的計算結果是取整以後的值。
那麼,轉換一下試試:
grunt> A = LOAD 'a.txt' AS (col1:int, col2:int);
grunt> B = FOREACH A GENERATE (float)(col1/col2);
grunt> DUMP B;
(1.0)
(0.0)
這樣轉換仍是不行的,這與大多數編程語言的結果一致——它只是把取整以後的數再轉換爲浮點數,所以固然不行。
grunt> A = LOAD 'a.txt' AS (col1:int, col2:int);
grunt> B = FOREACH A GENERATE (float)col1/col2;
grunt> DUMP B;
(1.5)
(0.8)
或者這樣也行:
grunt> A = LOAD 'a.txt' AS (col1:int, col2:int);
grunt> B = FOREACH A GENERATE col1/(float)col2;
grunt> DUMP B;
(1.5)
(0.8)
所以,在pig作除法運算的時候,須要注意這一點。
假設有兩個數據文件爲:
[root@localhost ~]# cat 1.txt
0 3
1 5
0 8
[root@localhost ~]# cat 2.txt
1 6
0 9
如今要求出:在第一列相同的狀況下,第二列的和分別爲多少?
例如,第一列爲 1 的時候,第二列有5和6兩個值,和爲11。同理,第一列爲0的時候,第二列的和爲 3+8+9=20。
計算此問題的Pig代碼以下:
A = LOAD '1.txt' AS (a: int, b: int);
B = LOAD '2.txt' AS (c: int, d: int);
C = UNION A, B;
D = GROUP C BY $0;
E = FOREACH D GENERATE FLATTEN(group), SUM(C.$1);
DUMP E;
輸出爲:
(0,20)
(1,11)
咱們來看看每一步分別作了什麼:
①第1行、第2行代碼分別加載數據到關係A、B中,沒什麼好說的。
②第3行代碼,將關係A、B合併起來了。合併後的數據結構爲:
grunt> DESCRIBE C;
C: {a: int,b: int}
其數據爲:
grunt> DUMP C;
(0,3)
(1,5)
(0,8)
(1,6)
(0,9)
③第4行代碼按第1列(即$0)進行分組,分組後的數據結構爲:
grunt> DESCRIBE D;
D: {group: int,C: {a: int,b: int}}
其數據爲:
grunt> DUMP D;
(0,{(0,9),(0,3),(0,8)})
(1,{(1,5),(1,6)})
④最後一行代碼,遍歷D,將D中每一行裏的全部bag(即C)的第2列(即$1)進行累加,就獲得了想要的結果。
下面介紹在Pig中使用正則表達式對字符串進行匹配的方法:
假設你有以下數據文件:
[root@localhost ~]# cat a.txt
1 http://ui.qq.com/abcd.html
5 http://tr.qq.com/743.html
8 http://vid.163.com/trees.php
9 http:auto.qq.com/us.php
如今要找出該文件中,第二列符合「*//*.qq.com/*」模式的全部行(此處只有前兩行符合條件),怎麼作?
Pig代碼以下:
A = LOAD 'a.txt' AS (col1: int, col2: chararray);
B = FILTER A BY col2 matches '.*//.*\\.qq\\.com/.*';
DUMP B;
matches關鍵字對 col2 進行了正則匹配,它使用的是Java格式的正則表達式匹配規則。
.表示任意字符,*表示字符出現任意次數;\.對.進行了轉義,表示匹配.這個字符;/就是表示匹配/這個字符。
這裏須要注意的是,在引號中,用於轉義的字符\須要打兩個才能表示一個,因此上面的\\.就是與正則中的\.是同樣的,即匹配 . 這個字符。因此,若是你要匹配數字的話,應該用這種寫法(\d表示匹配數字,在引號中必須用\\d):
B = FILTER A BY (col matches '\\d.*');
(1,http://ui.qq.com/abcd.html)
(5,http://tr.qq.com/743.html)
可見結果是正確的。
在處理數據時,若是你想提取出一個日期字符串的年份,例如提取出「2011-10-26」中的「2011」,能夠用內置函數 SUBSTRING 來實現:
SUBSTRING
Returns a substring from a given string.
Syntax
SUBSTRING(string, startIndex, stopIndex)
下面舉一個例子。假設有數據文件:
[root@localhost ~]# cat a.txt
2010-05-06 abc
2008-06-18 uio
2011-10-11 tyr
2010-12-23 fgh
2011-01-05 vbn
第一列是日期,如今要找出全部不重複的年份有哪些,能夠這樣作:
A = LOAD 'a.txt' AS (dateStr: chararray, flag: chararray);
B = FOREACH A GENERATE SUBSTRING(dateStr, 0, 4);
C = DISTINCT B;
DUMP C;
輸出結果爲:
(2008)
(2010)
(2011)
可見達到了想要的效果。
上面的代碼太簡單了,沒必要多言,惟一須要說明一下的是 SUBSTRING 函數,它的第一個參數是要截取的字符串,第二個參數是起始索引(從0開始),第三個參數是結束索引。
假設有如下數據文件:
[root@localhost ~]# cat 1.txt
abc 123
cde 456
fgh 789
ijk 200
如今要把第一列和第二列做爲字符串拼接起來,例如第一行會變成「abc123」,那麼使用CONCAT這個求值函數(eval function)就能夠作到:
A = LOAD '1.txt' AS (col1: chararray, col2: int);
B = FOREACH A GENERATE CONCAT(col1, (chararray)col2);
DUMP B;
輸出結果爲:
(abc123)
(cde456)
(fgh789)
(ijk200)
注意這裏故意在加載數據的時候把第二列指定爲int類型,這是爲了說明數據類型不一致的時候CONCAT會出錯:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1045: Could not infer the matching function for org.apache.pig.builtin.CONCAT as multiple or none of them fit. Please use an explicit cast.
因此在後面CONCAT的時候,對第二列進行了類型轉換。
另外,若是數據文件內容爲:
[root@localhost ~]# cat 1.txt
5 123
7 456
8 789
0 200
那麼,若是對兩列整數CONCAT:
A = LOAD '1.txt' AS (col1: int, col2: int);
B = FOREACH A GENERATE CONCAT(col1, col2);
一樣也會出錯:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1045: Could not infer the matching function for org.apache.pig.builtin.CONCAT as multiple or none of them fit. Please use an explicit cast.
要拼接幾個字符串用CONCAT 套 CONCAT 就能夠了。
假設有如下兩個數據文件:
[root@localhost ~]# cat 1.txt
123
456
789
200
以及:
[root@localhost ~]# cat 2.txt
200
333
789
如今要找出兩個文件中,相同數據的行數,也就是所謂的求兩個數據集的重合。
用關係操做符JOIN,能夠達到這個目的。在處理海量數據時,常常會有求重合的需求。因此JOIN是Pig中一個極其重要的操做。
在本例中,兩個文件中有兩個相同的數據行:789以及200,所以,結果應該是2。
咱們先來看看正確的代碼:
A = LOAD '1.txt' AS (a: int);
B = LOAD '2.txt' AS (b: int);
C = JOIN A BY a, B BY b;
D = GROUP C ALL;
E = FOREACH D GENERATE COUNT(C);
DUMP E;
①第1、二行是加載數據。
②第三行按A的第1列、B的第二列進行「結合」,JOIN以後,a、b兩列不相同的數據就被剔除掉了。C的數據結構爲:
C: {A::a: int,B::b: int}
C的數據爲:
(200,200)
(789,789)
③因爲咱們要統計的是數據行數,因此上面的Pig代碼中的第4、5行就進行了計數的運算。
④若是文件 2.txt 多一行數據「200」,結果爲3行。這個時候C的數據爲:
(200,200)
(200,200)
(789,789)
因此若是你要去除重複的,還須要用DISTINCE對C處理一下:
A = LOAD '1.txt' AS (a: int);
B = LOAD '2.txt' AS (b: int);
C = JOIN A BY a, B BY b;
uniq_C = DISTINCT C;
D = GROUP uniq_C ALL;
E = FOREACH D GENERATE COUNT(uniq_C);
DUMP E;
這樣獲得的結果就是2。
尤爲須要注意的是,若是JOIN的兩列具備不一樣的數據類型,是會失敗的。例如如下代碼:
A = LOAD '1.txt' AS (a: int);
B = LOAD '2.txt' AS (b: chararray);
C = JOIN A BY a, B BY b;
D = GROUP C ALL;
E = FOREACH D GENERATE COUNT(C);
DUMP E;
在語法上是沒有錯誤的,可是一運行就會報錯:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1107: Cannot merge join keys, incompatible types
這是由於a、b具備不一樣的類型:int和chararray。
不能對同一個關係(relation)進行JOIN,以下所示:
假設有以下文件:
[root@localhost ~]# cat 1.txt
1 a
2 e
3 v
4 n
我想對第一列這樣JOIN:
A = LOAD '1.txt' AS (col1: int, col2: chararray);
B = JOIN A BY col1, A BY col1;
那麼當你試圖 DUMP B 的時候,會報以下的錯:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1108: Duplicate schema alias: A::col1 in "B"
這是由於Pig會弄不清JOIN以後的字段名——兩個字段均爲A::col1,使得一個關係(relation)中出現了重複的名字,這是不容許的。
要解決這個問題,只需將數據LOAD兩次,而且給它們起不一樣的名字就能夠了:
grunt> A = LOAD '1.txt' AS (col1: int, col2: chararray);
grunt> B = LOAD '1.txt' AS (col1: int, col2: chararray);
grunt> C = JOIN A BY col1, B BY col1;
grunt> DESCRIBE C;
C: {A::col1: int,A::col2: chararray,B::col1: int,B::col2: chararray}
grunt> DUMP C;
(1,a,1,a)
(2,e,2,e)
(3,v,3,v)
(4,n,4,n)
從上面C的schema,能夠看出來,若是對同一個關係A的第一列進行JOIN,會致使schema中出現相同的字段名,因此固然會出錯。
初次使用JOIN時,通常人使用的都是所謂的「內部的JOIN」(inner JOIN),也即相似於 C = JOIN A BY col1, B BY col2 這樣的JOIN。Pig也支持「外部的JOIN」(outer JOIN),下面就舉一個例子。
假設有文件:
[root@localhost ~]# cat 1.txt
1 a
2 e
3 v
4 n
以及:
[root@localhost ~]# cat 2.txt
9 a
2 e
3 v
0 n
如今來對這兩個文件的第一列做一個outer JOIN:
grunt> A = LOAD '1.txt' AS (col1: int, col2: chararray);
grunt> B = LOAD '2.txt' AS (col1: int, col2: chararray);
grunt> C = JOIN A BY col1 LEFT OUTER, B BY col1;
grunt> DESCRIBE C;
C: {A::col1: int,A::col2: chararray,B::col1: int,B::col2: chararray}
grunt> DUMP C;
(1,a,,)
(2,e,2,e)
(3,v,3,v)
(4,n,,)
在outer JOIN中,「OUTER」關鍵字是能夠省略的。從上面的結果,能夠看到:若是換成一個inner JOIN,則兩個輸入文件的第1、第四行都不會出如今結果中(由於它們的第一列不相同),而在LEFT OUTER JOIN中,文件1.txt的第1、四行卻被輸出了,因此這就是LEFT OUTER JOIN的特色:對左邊的記錄來講,即便它與右邊的記錄不匹配,它也會被包含在輸出數據中。
同理可知RIGHT OUTER JOIN的功能——把上面的 LEFT 換成 RIGHT,結果以下:
(,,0,n)
(2,e,2,e)
(3,v,3,v)
(,,9,a)
可見,與左邊的記錄不匹配的右邊的記錄被保存了下來,而左邊的記錄沒有保存下來(兩個逗號代表其爲空),這就是RIGHT OUTER JOIN的效果,與預期的同樣。
關於OUTERJOIN的用處,舉一個例子:能夠用來求「不在某數據集中的那些數據(即:不重合的數據)」。仍是以上面的兩個數據文件爲例,如今我要求出 1.txt 中,第一列不在 2.txt 中的第一列的那些記錄,1和4這兩個數字在 2.txt 的第一列裏沒有出現,而2和3出現了,所以,咱們要找的記錄就是:
1 a
4 n
要實現這個效果,Pig代碼及結果爲:
grunt> A = LOAD '1.txt' AS (col1: int, col2: chararray);
grunt> B = LOAD '2.txt' AS (col1: int, col2: chararray);
grunt> C = JOIN A BY col1 LEFT OUTER, B BY col1;
grunt> DESCRIBE C;
C: {A::col1: int,A::col2: chararray,B::col1: int,B::col2: chararray}
grunt> D = FILTER C BY (B::col1 is null);
grunt> E = FOREACH D GENERATE A::col1 AS col1, A::col2 AS col2;
grunt> DUMP E;
(1,a)
(4,n)
可見,確實找出了「不重合的記錄」。在做海量數據分析時,這種功能是極爲有用的。
最後來一個總結:
假設有兩個數據集(在1.txt和2.txt中),分別都只有1列,則以下代碼:
A = LOAD '1.txt' AS (col1: chararray);
B = LOAD '2.txt' AS (col1: chararray);
C = JOIN A BY col1 LEFT OUTER, B BY col1;
D = FILTER C BY (B::col1 is null);
E = FOREACH D GENERATE A::col1 AS col1;
DUMP E;
計算結果爲:在A中,但不在B中的記錄。
使用三目運算符「 ? : 」必須加括號。
假設有如下數據文件:
[root@localhost ~]# cat a.txt
5 8 9
6 0
4 3 1
其中,第二行的第二列數據是有缺失的,所以,加載數據以後,它會成爲null。順便廢話一句,在處理海量數據時,數據有缺失是常常遇到的現象。
如今,若是要把全部缺失的數據填爲 -1, 可使用三目運算符來操做:
A = LOAD 'a.txt' AS (col1:int, col2:int, col3:int);
B = FOREACH A GENERATE col1, ((col2 is null)? -1 : col2), col3;
DUMP B;
輸出結果爲:
(5,8,9)
(6,-1,0)
(4,3,1)
((col2 is null)? -1 : col2) 的含義不用解釋你也知道,就是當col2爲null的時候將其置爲-1,不然就保持原來的值,可是注意,它最外面是用括號括起來的,若是去掉括號,寫成 (col2 is null)? -1 : col2,那麼就會有語法錯誤:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1000: Error during parsing. Encountered " "is" "is "" at line 1, column 36.
Was expecting one of (後面省略)
錯誤提示有點不直觀。因此,有時候使用三目運算符是必需要使用括號的。
數據文件以下:
[root@localhost ~]# cat 1.txt
1 (4,9)
5
8 (3,0)
5 (9,2)
6
這些數據的佈局比較怪,要把它加載成什麼樣的schema呢?答:第一列爲一個int,第二列爲一個tuple,此tuple又含兩個int,而且有幾行第二列數據是有缺失的。
問題:怎樣求在第一列數據相同的狀況下,第二列數據中的第一個整數的和分別爲多少?
例如,第一列爲1的數據只有一行(即第一行),所以,第二列的第一個整數的和就是4。
可是對最後一行,也就是第一列爲6時,因爲其第二列數據缺失,咱們但願它輸出的結果是0。
先來看看Pig代碼:
A = LOAD '1.txt' AS (a:int, b:tuple(x:int, y:int));
B = FOREACH A GENERATE a, FLATTEN(b);
C = GROUP B BY a;
D = FOREACH C GENERATE group, SUM(B.x);
DUMP D;
結果爲:
(1,4)
(5,9)
(6,)
(8,3)
(5,9) 這一行是由數據文件 1.txt 的第 2、4行計算獲得的,其中,第2行數據有缺失,但這並不影響求和計算,由於另外一行數據沒有缺失。你能夠這樣想:一個包(bag)中有多個數,當其中一個爲null,而其餘不爲null時,把它們相加會自動忽略null。
然而,第三行 (6,) 是否是太刺眼了?沒錯,由於數據文件 1.txt 的最後一行缺失了第二列,因此,在 SUM(B.x) 中的 B.x 爲null就會致使計算結果爲null,從而什麼也輸出不了。
這就與咱們指望的輸出有點不一樣了。咱們但願這種缺失的數據不要空着,而是輸出0。
想法1:
D = FOREACH C GENERATE group, ((IsEmpty(B.x)) ? 0 : SUM(B.x));
輸出結果爲:
(1,4)
(5,9)
(6,)
(8,3)
可見行不通。從這個結果咱們知道,IsEmpty(B.x) 爲false,即B.x不是empty的,因此不能這樣作。
想法2:
D = FOREACH C GENERATE group, ((B.x is null) ? 0 : SUM(B.x));
輸出結果仍是與上面同樣!仍然行不通。這更奇怪了:B.x既非empty,也非null,那麼它是什麼狀況?按照個人理解,當group爲6時,它應該是一個非空的包(bag),裏面有一個null的東西,因此,這個包不是empty的,它也非null。我不知道這樣理解是否正確,可是它看上去就像是這樣的。
想法3:
D = FOREACH C GENERATE group, SUM(B.x) AS s;
E = FOREACH D GENERATE group, ((s is null) ? -1 : s);
DUMP E;
輸出結果爲:
(1,4)
(5,9)
(6,-1)
(8,3)
可見達到了咱們想要的結果。這與本文前面部分的作法是一致的,即:先獲得含null的結果,再把這個結果中的null替換爲指定的值。
「Scalars can be only used with projections」錯誤的緣由
在這裏,用一個簡單的例子給你們用演示一下產生這個錯誤的緣由之一。
假設有以下數據文件:
[root@localhost ~]$ cat 1.txt
a 1
b 8
c 3
c 3
d 6
d 3
c 5
e 7
如今要統計:在第1列的每一種組合下,第二列爲3和6的數據分別有多少條?
例如,當第1列爲 c 時,第二列爲3的數據有2條,爲6的數據有0條;當第1列爲d時,第二列爲3的數據有1條,爲6的數據有1條。其餘的依此類推。
Pig代碼以下:
A = LOAD '1.txt' AS (col1:chararray, col2:int);
B = GROUP A BY col1;
C = FOREACH B {
D = FILTER A BY col2 == 3;
E = FILTER A BY col2 == 6;
GENERATE group, COUNT(D), COUNT(E);
};
DUMP C;
輸出結果爲:
(a,0,0)
(b,0,0)
(c,2,0)
(d,1,1)
(e,0,0)
可見結果是正確的。
那麼,若是我在上面的代碼中,把「D = FILTER A BY col2 == 3」不當心寫成了「D = FILTER B BY col2 == 3」,就確定會獲得「Scalars can be only used with projections」的錯誤提示。
嵌套的(nested)FOREACH和內部的(inner)FOREACH是一個意思,正如上面所示,一個FOREACH能夠對每一條記錄施以多種不一樣的關係操做,而後再GENERATE獲得想要的結果,這就是嵌套的/內部的FOREACH。
如何在Pig中使用中文做爲FILTER的條件?
數據文件 data.txt 內容爲(每一列之間以TAB爲分隔符):
1 北京市 a
2 上海市 b
3 北京市 c
4 北京市 f
5 天津市 e
Pig腳本文件 test.pig 內容爲:
A = LOAD 'data.txt' AS (col1: int, col2: chararray, col3: chararray);
B = FILTER A BY (col2 == '北京市');
DUMP B;
首先,這兩個文件的編碼都是UTF-8(無BOM),在Linux命令行下,直接以本地模式執行Pig腳本 test.pig:
pig -x local test.pig
獲得的輸出結果爲:
(1,北京市,a)
(3,北京市,c)
(4,北京市,f)
可見結果是正確的。
可是,若是在grunt交互模式下,把 test.pig 的內容粘貼進去執行,是得不到任何輸出結果的:
grunt> A = LOAD 'data.txt' AS (col1: int, col2: chararray, col3: chararray);
grunt> B = FILTER A BY (col2 == '北京市');
grunt> DUMP B;
可使用中文做爲FILTER的條件,只要不在交互模式下執行你的Pig腳本便可。
1. 要將Pig job的優先級設爲HIGH,只需在Pig腳本的開頭加上一句:
set job.priority HIGH;
便可將Pig job的優先級設爲高了。
2. 設置Pig job的job name
在Pig腳本開頭加上一句:
set job.name 'My-Job-Name';
那麼,執行該Pig腳本以後,在Hadoop的Job Tracker中看到的「Name」就是「My-Job-Name」了。
若是不設置,顯示的name是相似於「Job6245768625829738970.jar」這樣的東西,job多的時候徹底沒有標識度,建議必定要設置一個特殊的job name。
3. UDF是區分大小寫的
由於UDF是由Java類來實現的,因此區分大小寫,就這麼簡單。
4. Pig中的下面operator(操做符)會觸發reduce過程:
①GROUP:因爲GROUP操做會將全部具備相同key的記錄收集到一塊兒,因此數據若是正在map中處理的話,就會觸發shuffle→reduce的過程。
②ORDER:因爲須要將全部相等的記錄收集到一塊兒(才能排序),因此ORDER會觸發reduce過程。同時,除了那個Pig job以外,Pig還會添加一個額外的M-R job到數據流程中,由於Pig須要對數據集作採樣,以肯定數據的分佈狀況,從而解決數據分佈嚴重不均的狀況下job效率過於低下的問題。
③DISTINCT:因爲須要將記錄收集到一塊兒,才能肯定它們是否是重複的,所以DISTINCT會觸發reduce過程。固然,DISTINCT也會利用combiner在map階段就把重複的記錄移除。
④JOIN:JOIN用於求重合,因爲求重合的時候,須要將具備相同key的記錄收集到一塊兒,所以,JOIN會觸發reduce過程。
⑤LIMIT:因爲須要將記錄收集到一塊兒,才能統計出它返回的條數,所以,LIMIT會觸發reduce過程。
⑥COGROUP:與GROUP相似(參看本文前面的部分),所以它會觸發reduce過程。
⑦CROSS:計算兩個或多個關係的叉積。
在master上運行Mapreduce沒有問題,可是在slave節點上運行會報以下錯誤:
[root@fk01 mapreduce]# hadoop jar hadoop-mapreduce-examples-2.2.0.jar
wordcount /input /ouput3
14/08/21 10:41:18 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
14/08/21 10:41:18 INFO client.RMProxy: Connecting to ResourceManager at
/0.0.0.0:8032
14/08/21 10:41:19 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:20 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:21 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:22 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:23 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:24 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
14/08/21 10:41:25 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
......................
解決辦法爲在yare-site.xml裏添加以下信息:
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>
在Hadoop2.x.0和pig0.13.0運行過程當中,hadoop和pig grunt均運行正常可是dump數據報下面的錯誤:
ERROR 1066: Unable to open iterator for alias actor
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias actor
at org.apache.pig.PigServer.openIterator(PigServer.java:880)
at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:774)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:372)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:198)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:69)
at org.apache.pig.Main.run(Main.java:541)
at org.apache.pig.Main.main(Main.java:156)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.io.IOException: Job terminated with anomalous status FAILED
at org.apache.pig.PigServer.openIterator(PigServer.java:872)
... 12 more
致使問題出現的緣由是pig已經編譯的jar文件和hadoop的版本不兼容致使的,能夠採用從新編譯的方法解決問題,以下所示:
(1) cd /${PIG_HOME}
(2) mv pig-0.10.1-withouthadoop.jar pig-0.10.1-withouthadoop.jar.bak
(3) mv pig-0.10.1.jar pig-0.10.1.jar.bak
(4) ant clean jar-withouthadoop -Dhadoopversion=23
編譯完成後將在${PIG_HOME}/build目錄下生成:
pig-0.12.0-SNAPSHOT-core.jar, pig-0.12.0-SNAPSHOT-withouthadoop.jar
(5) 將上一步生成的兩個文件Copy至${PIG_HOME}下,並進行更名:
pig-0.12.0-SNAPSHOT-core.jar --> pig-0.12.0.jar
pig-0.12.0-SNAPSHOT-withouthadoop.jar --> pig-0.12.0-withouthadoop.jar
本文主要參考下面的博文:
http://www.codelast.com/?p=3621