Pig自定義過濾UDF和加載UDF

        Pig是一種數據流編程語言,由一系列操做和變換構成,每個操做或者變換都對輸入進行處理,而後產生輸出結果,總體操做表示一個數據流。Pig的執行環境將數據流翻譯爲可執行的內部表示,在Pig內部,這些變換操做被轉換爲一系列的MapReduce做業。java

      Pig自身有許多個方法,有時候須要咱們本身定製特定的處理方法即UDF。apache

      UDF具體的步驟以下編程

第一步,繼承計算類或者過濾類或者加載類或者存儲類,重寫裏面的須要實現的方法,將寫好的類進行打包生成jar文件。諸如命名爲example.jarless

第二步,進入Pig的grunt中,利用register將打包的文件註冊進入Pig中。進入Pig的grunt中,當前本地路徑就是用戶輸入Pig時候所在的路徑。打包文件必定要加上它所在的路徑。如register example.jar。編程語言

第三步,直接使用該自定義的UDF,在使用的過程當中須要加上該類的權限定包名,若是這裏example.jar的包結構爲com.whut.FilterFunct。則引用的時候就是com.whut.FilterFunct(參數)。注意類的名稱就是使用時候的方法名,必需要區分大小寫。ide

第四步,爲本身的UDF定義別名,這樣使用的時候就不準要加包名了,如函數

define Goog com.whut.FilterFunct()。這樣使用的時候就直接利用Goog了。grunt

自定義過濾UDF:oop

        過濾UDF須要繼承FilterFunc。實現其exec方法。該方法返回的是boolean型。在對溫度統計的時候,就能夠利用過濾UDF來過濾是否正確的氣溫。this


package whut;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
//刪除記錄中不符合要求的記錄
//pig的自定義函數,過濾函數
public class IsGoodQuality extends FilterFunc{
    @Override
    public Boolean exec(Tuple tuple) throws IOException {
        // TODO Auto-generated method stub
        if(tuple ==null ||tuple.size()==0)
        return false;
        try{
            Object obj=tuple.get(0);
            if(obj==null)
                return false;
            //這裏強制轉換爲一個×××
            int i=(Integer)obj;
            return i==0 ||i==1 || i==2 || i==3;
        }catch(ExecException e)
        {
            throw new IOException(e);
        }
    }
}

        這裏的參數是一個元組,能夠包含多個輸入參數,在方法中直接利用get(索引位置)來直接獲取。

自定義加載函數UDF

      在Pig中常常會使用到加載外部文件,通常使用Load進行加載,如Load 'input/tempdata' as (a:chararray,b:int) 。這裏默認使用了內部加載存儲函數,PigStorage。

Load 'input/tempdata' using PigStorage()  as (a:chararray,b:int)。這裏PigStorage默認的每一行的字段分割符是製表符,固然也能夠傳遞一個本身的字段分割符號。有時候每一行是一串字符串,想從中取出某一個字段,則就須要本身定義一個加載函數。如下面這個文件爲例子。

aaaaa1990aaaaaa0039a
bbbbb1991bbbbbb0045a
ccccc1992cccccc0011c
ddddd1993dddddd0043d
eeeee1994eeeeee0047e
aaaaa1990aaaaaa0037a
bbbbb1991bbbbbb0027a
ccccc1992cccccc0032c
ddddd1993dddddd0090d
eeeee1994eeeeee0091e
aaaaa1980aaaaaa0041a
bbbbb1981bbbbbb0050a
ccccc1992cccccc0020c
ddddd1993dddddd0033d
eeeee1984eeeeee0061e
aaaaa1980aaaaaa0054a
bbbbb1991bbbbbb0075a
ccccc1982cccccc0011c
ddddd1993dddddd0003d
eeeee1974eeeeee0041e
aaaaa1990aaaaaa0039a
bbbbb1961bbbbbb0041a
ccccc1972cccccc0070c
ddddd1993dddddd0042d
eeeee1974eeeeee0043e
aaaaa1990aaaaaa0034a
bbbbb1971bbbbbb0025a
ccccc1992cccccc0056c
ddddd1993dddddd0037d
eeeee1984eeeeee0038e
aaaaa1990aaaaaa0049a
bbbbb1991bbbbbb0011a
ccccc1962cccccc0012c
ddddd1993dddddd0023d
eeeee1984eeeeee0031e
aaaaa1980aaaaaa0094a
bbbbb1971bbbbbb0045a
ccccc1992cccccc0041c
ddddd1993dddddd0003d
eeeee1984eeeeee0081e
aaaaa1960aaaaaa0099a
bbbbb1971bbbbbb0050a
ccccc1952cccccc0055c
ddddd1963dddddd0043d
eeeee1994eeeeee0041e
aaaaa1990aaaaaa0031a
bbbbb1991bbbbbb0020a
ccccc1952cccccc0030c
ddddd1983dddddd0013d
eeeee1974eeeeee0061e
aaaaa1980aaaaaa0071a
bbbbb1961bbbbbb0060a
ccccc1992cccccc0080c
ddddd1953dddddd0033d
eeeee1964eeeeee0051e
aaaaa1960aaaaaa0024a
bbbbb1951bbbbbb0035a
ccccc1952cccccc0048c
ddddd1953dddddd0053d
eeeee1954eeeeee0048e

       爲了從中取出年份和溫度,則就須要本身定義加載函數,這裏每一列序號以0開始。自定義加載函數須要繼承LoadFunc。具體的代碼以下。

package whut;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
class Range
{
    //列的索引以0開始
    //字段分割的列的位置
    private int start;
    private int end;
    //根據輸入來解析
    //字符串格式必須是(2~3,5~6)
    public static List<Range> parse(String cutStr)throws Exception
    {
        List<Range> rangeList=new ArrayList<Range>();
        //首先要判斷是否格式正確
        boolean state=cutStr.matches("\\d+~\\d+(,\\d+~\\d+)*");
        if(!state)
        {
          throw new Exception("InputForat Error:\n" +
            "Usage:number~number,number~number;Such 2~7,10~19");
        }
        //先截取幾個字段的列起止位置如2~8  
        String[] splits=cutStr.split(",");
        //遍歷長度設置Range
        for(int i=0;i<splits.length;i++)
        {
            Range range=new Range();
            String sub=splits[i];
            String[] subSplits=sub.split("~");
            int subStart=Integer.parseInt(subSplits[0]);
            int subEnd=Integer.parseInt(subSplits[1]);
            if(subStart>subEnd)
            throw new Exception("InputForat Error:\n" +
                    "Detail:first number must less than second number");
            range.setStart(subStart);
            range.setEnd(subEnd);
            rangeList.add(range);
        }
        return rangeList;
    }
    public int getStart() {
        return start;
    }
    public void setStart(int start) {
        this.start = start;
    }
    public int getEnd() {
        return end;
    }
    public void setEnd(int end) {
        this.end = end;
    }
              
    public String getSubString(String inStr)
    {
        String res=inStr.substring(start, end);
        return res;
    }
}
//定義加載函數,從每一行字符串提出年份,溫度
public class LineLoadFunc extends LoadFunc{
              
    private static final Log LOG=LogFactory.getLog(LineLoadFunc.class);
    //負責產生元組的各個字段
    private final TupleFactory tupleFactory=TupleFactory.getInstance();
    //負責讀取輸入記錄
    private RecordReader reader;
    //存每一個字段的集合
    private List<Range> ranges;
    //傳遞參數設置列的位置分割
    public LineLoadFunc(String cutPattern)throws Exception
    {
        ranges=Range.parse(cutPattern);
    }
    //設置文件的加載位置
    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
    //設置加載文件的輸入文件格式
    //爲每個分片創建一個RecordReader
    @Override
    public InputFormat getInputFormat() throws IOException {
        return new TextInputFormat();
    }
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split)
            throws IOException {
        this.reader=reader;
    }
    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        try{
            if(!reader.nextKeyValue())
                return null;
            //TextInputFormat
            //key:LongWritable,value:Text
            Text value=(Text)reader.getCurrentValue();
            String line=value.toString();
            //設置每個元組有幾個字段
            Tuple tuple=tupleFactory.newTuple(ranges.size());
            for(int i=0;i<ranges.size();i++)
            {
                Range range=ranges.get(i);
                if(range.getEnd()>line.length())
                {
                    throw new ExecException("InputFormat:Error\n" +
                            "field length more than total length");
                }
                //必須使用DataByteArray來構造字段的類型
                tuple.set(i, new DataByteArray(range.getSubString(line)));
            }
            return tuple;
        }catch(InterruptedException e)
        {
            throw new ExecException();
        }
    }
}


           具體使用的方法就是按照剛纔所說的步驟進行的。

相關文章
相關標籤/搜索