近日因爲工做所需,須要使用到Pig來分析線上的搜索日誌數據,散仙本打算使用hive來分析的,但因爲種種緣由,沒有用成,而Pig(pig0.12-cdh)散仙一直沒有接觸過,因此只能臨陣磨槍了,花了兩天時間,大體看完了pig官網的文檔,在看文檔期間,也是邊實戰邊學習,這樣以來,對pig的學習,會更加容易,固然本篇不是介紹如何快速學好一門框架或語言的文章,正如標題所示,散仙打算介紹下如何在Pig中,使用用戶自定義的UDF函數,關於學習經驗,散仙會在後面的文章裏介紹。
一旦你學會了UDF的使用,就意味着,你能夠以更加靈活的方式來使用Pig,使它擴展一些爲咱們的業務場景定製的特殊功能,而這些功能,在通用的pig裏是沒有的,舉個例子:
你從HDFS上讀取的數據格式,若是使用默認的PigStorage()來加載,存儲可能只支持有限的數據編碼和類型,若是咱們定義了一種特殊的編碼存儲或序列化方式,那麼當咱們使用默認的Pig來加載的時候,就會發現加載不了,這時候咱們的UDF就派上用場了,咱們只須要自定義一個LoadFunction和一個StoreFunction就能夠解決,這種問題。
html
本篇散仙根據官方文檔的例子,來實戰一下,並在hadoop集羣上使用Pig測試經過:
咱們先來看下定義一個UDF擴展類,須要幾個步驟:java
序號 步驟 說明node
1 在eclipse裏新建一個java工程,並導入pig的核心包 java項目數據庫
2 新建一個包,繼承特定的接口或類,重寫自定義部分 核心業務apache
3 編寫完成後,使用ant打包成jar 編譯時須要pig依賴,但不用把pig的jar包打入UDF中app
4 把打包完成後的jar上傳到HDFS上 pig運行時候須要加載使用框架
5 在pig腳本里,註冊咱們自定義的udf的jar包 注入運行時環境eclipse
6 編寫咱們的核心業務pig腳本運行 測試是否運行成功ide
項目工程截圖以下:
函數
核心代碼以下:
package com.pigudf;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;
/**
* 自定義UDF類,對字符串轉換大寫
* @author qindongliang
* */
public class MyUDF extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
//判斷是否爲null或空,就跳過
if(input==null||input.size()==0){
return null;
}
try{
//獲取第一個元素
String str=(String) input.get(0);
//轉成大寫返回
return str.toUpperCase();
}catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ",e);
}
}
}
關於打包的ant腳本,散仙會在文末上傳附件,下面看下造的一些測試數據(注意,文件必定要上傳到HDFS上,除非你是local模式):
grunt> cat s.txt
zhang san,12
Song,34
long,34
abC,12
grunt>
咱們在看下,操做文件和jar包是放在一塊兒的:
grunt> ls
hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3> 1295
hdfs://dnode1:8020/tmp/udf/s.txt<r 3> 36
grunt>
最後,咱們看下pig腳本的定義:
--註冊自定義的jar包
REGISTER pudf.jar;
--加載測試文件的數據,逗號做爲分隔符
a = load 's.txt' using PigStorage(',');
--遍歷數據,對name列轉成大寫
b = foreach a generate com.pigudf.MyUDF((chararray)$0);
--啓動MapReduce的Job進行數據分析
dump b
最後,咱們看下結果,只要過程不出現異常和任務失敗,就證實咱們的udf使用成功:
Counters:
Total records written : 4
Total bytes written : 64
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_1419419533357_0147
2014-12-30 18:10:24,394 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2014-12-30 18:10:24,395 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-12-30 18:10:24,396 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2014-12-30 18:10:24,405 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2014-12-30 18:10:24,405 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(ZHANG SAN,12)
(SONG,34)
(LONG,34)
(ABC,12)
結果沒問題,咱們的UDF加載執行成功,若是咱們還想將咱們的輸出結果直接寫入到HDFS上,能夠在pig腳本的末尾,去掉dump命令,加入 store e into '/tmp/dongliang/result/'; 將結果存儲到HDFS上,固然咱們能夠自定義存儲函數,將結果寫入數據庫,Lucene,Hbase等關係型或一些NOSQL數據庫裏。