如何給Apache Pig自定義UDF函數?

近日因爲工做所需,須要使用到Pig來分析線上的搜索日誌數據,散仙本打算使用hive來分析的,但因爲種種緣由,沒有用成,而Pig(pig0.12-cdh)散仙一直沒有接觸過,因此只能臨陣磨槍了,花了兩天時間,大體看完了pig官網的文檔,在看文檔期間,也是邊實戰邊學習,這樣以來,對pig的學習,會更加容易,固然本篇不是介紹如何快速學好一門框架或語言的文章,正如標題所示,散仙打算介紹下如何在Pig中,使用用戶自定義的UDF函數,關於學習經驗,散仙會在後面的文章裏介紹。 



一旦你學會了UDF的使用,就意味着,你能夠以更加靈活的方式來使用Pig,使它擴展一些爲咱們的業務場景定製的特殊功能,而這些功能,在通用的pig裏是沒有的,舉個例子: 

你從HDFS上讀取的數據格式,若是使用默認的PigStorage()來加載,存儲可能只支持有限的數據編碼和類型,若是咱們定義了一種特殊的編碼存儲或序列化方式,那麼當咱們使用默認的Pig來加載的時候,就會發現加載不了,這時候咱們的UDF就派上用場了,咱們只須要自定義一個LoadFunction和一個StoreFunction就能夠解決,這種問題。 
html


本篇散仙根據官方文檔的例子,來實戰一下,並在hadoop集羣上使用Pig測試經過: 
咱們先來看下定義一個UDF擴展類,須要幾個步驟:
java

序號 步驟 說明node

1 在eclipse裏新建一個java工程,並導入pig的核心包 java項目數據庫

2 新建一個包,繼承特定的接口或類,重寫自定義部分 核心業務apache

3 編寫完成後,使用ant打包成jar 編譯時須要pig依賴,但不用把pig的jar包打入UDF中app

4 把打包完成後的jar上傳到HDFS上 pig運行時候須要加載使用框架

5 在pig腳本里,註冊咱們自定義的udf的jar包 注入運行時環境eclipse

6 編寫咱們的核心業務pig腳本運行 測試是否運行成功ide




項目工程截圖以下: 

函數


核心代碼以下: 

Java代碼  收藏代碼

  1. package com.pigudf;  

  2.   

  3. import java.io.IOException;  

  4.   

  5. import org.apache.pig.EvalFunc;  

  6. import org.apache.pig.data.Tuple;  

  7. import org.apache.pig.impl.util.WrappedIOException;  

  8. /** 

  9.  * 自定義UDF類,對字符串轉換大寫 

  10.  * @author qindongliang 

  11.  * */  

  12. public class MyUDF extends EvalFunc<String> {  

  13.   

  14.     @Override  

  15.     public String exec(Tuple input) throws IOException {  

  16.           

  17.          //判斷是否爲null或空,就跳過  

  18.         if(input==null||input.size()==0){  

  19.             return null;  

  20.         }  

  21.         try{  

  22.             //獲取第一個元素  

  23.             String str=(String) input.get(0);  

  24.             //轉成大寫返回  

  25.             return str.toUpperCase();  

  26.               

  27.         }catch(Exception e){  

  28.             throw WrappedIOException.wrap("Caught exception processing input row ",e);  

  29.         }  

  30.     }  

  31.       

  32.   

  33. }  



關於打包的ant腳本,散仙會在文末上傳附件,下面看下造的一些測試數據(注意,文件必定要上傳到HDFS上,除非你是local模式): 



Java代碼  收藏代碼

  1. grunt> cat s.txt  

  2. zhang san,12  

  3. Song,34  

  4. long,34  

  5. abC,12  

  6. grunt>   




咱們在看下,操做文件和jar包是放在一塊兒的: 

Java代碼  收藏代碼

  1. grunt> ls  

  2. hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295  

  3. hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36  

  4. grunt>   



最後,咱們看下pig腳本的定義: 

Pig代碼  收藏代碼

  1. --註冊自定義的jar包  

  2. REGISTER pudf.jar;   

  3. --加載測試文件的數據,逗號做爲分隔符  

  4. a = load 's.txt' using PigStorage(',');     

  5. --遍歷數據,對name列轉成大寫  

  6. b =  foreach a generate com.pigudf.MyUDF((chararray)$0);   

  7. --啓動MapReduce的Job進行數據分析  

  8. dump b  


最後,咱們看下結果,只要過程不出現異常和任務失敗,就證實咱們的udf使用成功: 

Java代碼  收藏代碼

  1. Counters:  

  2. Total records written : 4  

  3. Total bytes written : 64  

  4. Spillable Memory Manager spill count : 0  

  5. Total bags proactively spilled: 0  

  6. Total records proactively spilled: 0  

  7.   

  8. Job DAG:  

  9. job_1419419533357_0147  

  10.   

  11.   

  12. 2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!  

  13. 2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS  

  14. 2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.  

  15. 2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  

  16. 2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  

  17. (ZHANG SAN,12)  

  18. (SONG,34)  

  19. (LONG,34)  

  20. (ABC,12)  


結果沒問題,咱們的UDF加載執行成功,若是咱們還想將咱們的輸出結果直接寫入到HDFS上,能夠在pig腳本的末尾,去掉dump命令,加入 store e into '/tmp/dongliang/result/'; 將結果存儲到HDFS上,固然咱們能夠自定義存儲函數,將結果寫入數據庫,Lucene,Hbase等關係型或一些NOSQL數據庫裏。

相關文章
相關標籤/搜索