Pig是一種數據流編程語言,由一系列操做和變換構成,每個操做或者變換都對輸入進行處理,而後產生輸出結果,總體操做表示一個數據流。Pig的執行環境將數據流翻譯爲可執行的內部表示,在Pig內部,這些變換操做被轉換爲一系列的MapReduce做業。java
Pig自身有許多個方法,有時候須要咱們本身定製特定的處理方法即UDF。apache
UDF具體的步驟以下:編程
第一步,繼承計算類或者過濾類或者加載類或者存儲類,重寫裏面的須要實現的方法,將寫好的類進行打包生成jar文件。諸如命名爲example.jarless
第二步,進入Pig的grunt中,利用register將打包的文件註冊進入Pig中。進入Pig的grunt中,當前本地路徑就是用戶輸入Pig時候所在的路徑。打包文件必定要加上它所在的路徑。如register example.jar。編程語言
第三步,直接使用該自定義的UDF,在使用的過程當中須要加上該類的權限定包名,若是這裏example.jar的包結構爲com.whut.FilterFunct。則引用的時候就是com.whut.FilterFunct(參數)。注意類的名稱就是使用時候的方法名,必需要區分大小寫。ide
第四步,爲本身的UDF定義別名,這樣使用的時候就不準要加包名了,如函數
define Goog com.whut.FilterFunct()。這樣使用的時候就直接利用Goog了。grunt
自定義過濾UDF:oop
過濾UDF須要繼承FilterFunc。實現其exec方法。該方法返回的是boolean型。在對溫度統計的時候,就能夠利用過濾UDF來過濾是否正確的氣溫。this
package whut; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.pig.FilterFunc; import org.apache.pig.FuncSpec; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; //刪除記錄中不符合要求的記錄 //pig的自定義函數,過濾函數 public class IsGoodQuality extends FilterFunc{ @Override public Boolean exec(Tuple tuple) throws IOException { // TODO Auto-generated method stub if(tuple ==null ||tuple.size()==0) return false; try{ Object obj=tuple.get(0); if(obj==null) return false; //這裏強制轉換爲一個××× int i=(Integer)obj; return i==0 ||i==1 || i==2 || i==3; }catch(ExecException e) { throw new IOException(e); } } }
這裏的參數是一個元組,能夠包含多個輸入參數,在方法中直接利用get(索引位置)來直接獲取。
自定義加載函數UDF
在Pig中常常會使用到加載外部文件,通常使用Load進行加載,如Load 'input/tempdata' as (a:chararray,b:int) 。這裏默認使用了內部加載存儲函數,PigStorage。
即Load 'input/tempdata' using PigStorage() as (a:chararray,b:int)。這裏PigStorage默認的每一行的字段分割符是製表符,固然也能夠傳遞一個本身的字段分割符號。有時候每一行是一串字符串,想從中取出某一個字段,則就須要本身定義一個加載函數。如下面這個文件爲例子。
aaaaa1990aaaaaa0039a bbbbb1991bbbbbb0045a ccccc1992cccccc0011c ddddd1993dddddd0043d eeeee1994eeeeee0047e aaaaa1990aaaaaa0037a bbbbb1991bbbbbb0027a ccccc1992cccccc0032c ddddd1993dddddd0090d eeeee1994eeeeee0091e aaaaa1980aaaaaa0041a bbbbb1981bbbbbb0050a ccccc1992cccccc0020c ddddd1993dddddd0033d eeeee1984eeeeee0061e aaaaa1980aaaaaa0054a bbbbb1991bbbbbb0075a ccccc1982cccccc0011c ddddd1993dddddd0003d eeeee1974eeeeee0041e aaaaa1990aaaaaa0039a bbbbb1961bbbbbb0041a ccccc1972cccccc0070c ddddd1993dddddd0042d eeeee1974eeeeee0043e aaaaa1990aaaaaa0034a bbbbb1971bbbbbb0025a ccccc1992cccccc0056c ddddd1993dddddd0037d eeeee1984eeeeee0038e aaaaa1990aaaaaa0049a bbbbb1991bbbbbb0011a ccccc1962cccccc0012c ddddd1993dddddd0023d eeeee1984eeeeee0031e aaaaa1980aaaaaa0094a bbbbb1971bbbbbb0045a ccccc1992cccccc0041c ddddd1993dddddd0003d eeeee1984eeeeee0081e aaaaa1960aaaaaa0099a bbbbb1971bbbbbb0050a ccccc1952cccccc0055c ddddd1963dddddd0043d eeeee1994eeeeee0041e aaaaa1990aaaaaa0031a bbbbb1991bbbbbb0020a ccccc1952cccccc0030c ddddd1983dddddd0013d eeeee1974eeeeee0061e aaaaa1980aaaaaa0071a bbbbb1961bbbbbb0060a ccccc1992cccccc0080c ddddd1953dddddd0033d eeeee1964eeeeee0051e aaaaa1960aaaaaa0024a bbbbb1951bbbbbb0035a ccccc1952cccccc0048c ddddd1953dddddd0053d eeeee1954eeeeee0048e
爲了從中取出年份和溫度,則就須要本身定義加載函數,這裏每一列序號以0開始。自定義加載函數須要繼承LoadFunc。具體的代碼以下。
package whut; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; class Range { //列的索引以0開始 //字段分割的列的位置 private int start; private int end; //根據輸入來解析 //字符串格式必須是(2~3,5~6) public static List<Range> parse(String cutStr)throws Exception { List<Range> rangeList=new ArrayList<Range>(); //首先要判斷是否格式正確 boolean state=cutStr.matches("\\d+~\\d+(,\\d+~\\d+)*"); if(!state) { throw new Exception("InputForat Error:\n" + "Usage:number~number,number~number;Such 2~7,10~19"); } //先截取幾個字段的列起止位置如2~8 String[] splits=cutStr.split(","); //遍歷長度設置Range for(int i=0;i<splits.length;i++) { Range range=new Range(); String sub=splits[i]; String[] subSplits=sub.split("~"); int subStart=Integer.parseInt(subSplits[0]); int subEnd=Integer.parseInt(subSplits[1]); if(subStart>subEnd) throw new Exception("InputForat Error:\n" + "Detail:first number must less than second number"); range.setStart(subStart); range.setEnd(subEnd); rangeList.add(range); } return rangeList; } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public String getSubString(String inStr) { String res=inStr.substring(start, end); return res; } } //定義加載函數,從每一行字符串提出年份,溫度 public class LineLoadFunc extends LoadFunc{ private static final Log LOG=LogFactory.getLog(LineLoadFunc.class); //負責產生元組的各個字段 private final TupleFactory tupleFactory=TupleFactory.getInstance(); //負責讀取輸入記錄 private RecordReader reader; //存每一個字段的集合 private List<Range> ranges; //傳遞參數設置列的位置分割 public LineLoadFunc(String cutPattern)throws Exception { ranges=Range.parse(cutPattern); } //設置文件的加載位置 @Override public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); } //設置加載文件的輸入文件格式 //爲每個分片創建一個RecordReader @Override public InputFormat getInputFormat() throws IOException { return new TextInputFormat(); } @Override public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { this.reader=reader; } @Override public Tuple getNext() throws IOException { // TODO Auto-generated method stub try{ if(!reader.nextKeyValue()) return null; //TextInputFormat //key:LongWritable,value:Text Text value=(Text)reader.getCurrentValue(); String line=value.toString(); //設置每個元組有幾個字段 Tuple tuple=tupleFactory.newTuple(ranges.size()); for(int i=0;i<ranges.size();i++) { Range range=ranges.get(i); if(range.getEnd()>line.length()) { throw new ExecException("InputFormat:Error\n" + "field length more than total length"); } //必須使用DataByteArray來構造字段的類型 tuple.set(i, new DataByteArray(range.getSubString(line))); } return tuple; }catch(InterruptedException e) { throw new ExecException(); } } }
具體使用的方法就是按照剛纔所說的步驟進行的。