hive不支持多個字符做爲分隔符的解決方案

題記:java

  近期在作某個大型銀行的大數據項目,當在處理非結構化數據時,卻發現他們給的數據並不符合hive和pig的處理要求,數據每行必須須要多個分割符才能完美處理,一下午也沒有想到完美的辦法解決,今天從新審視了一下整個過程。看來hive的命令行無法搞定了。因而乎,只能經過代碼來搞定。apache

一、從新實現hive的InputFormat了,別急放碼過來ide

package hiveStream;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(
			InputSplit genericSplit, JobConf job, Reporter reporter)
			throws IOException {
		 reporter.setStatus(genericSplit.toString());
	       return new MyRecordReader((FileSplit) genericSplit, job);
	}
	
}

二、仔細看看下面的方法,不解釋,本身領悟。oop

package hiveStream;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;


public class MyRecordReader implements RecordReader<LongWritable, Text>{
	
	private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader lineReader;
    int maxLineLength;
	
    public MyRecordReader(InputStream in, long offset, long endOffset,
            int maxLineLength) {
        this.maxLineLength = maxLineLength;
        this.start = offset;
        this.lineReader = new LineReader(in);
        this.pos = offset;
        this.end = endOffset;
    }
    
    public MyRecordReader(InputStream in, long offset, long endOffset,
            Configuration job) throws IOException {
        this.maxLineLength = job.getInt(
                "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
        this.lineReader = new LineReader(in, job);
        this.start = offset;
        this.end = endOffset;
    }
    
    // 構造方法
    public MyRecordReader(FileSplit inputSplit, Configuration job)
            throws IOException {
        maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                Integer.MAX_VALUE);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        final Path file = inputSplit.getPath();
        // 建立壓縮器
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);
        // 打開文件系統
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(file);
        boolean skipFirstLine = false;
 
        if (codec != null) {
            lineReader = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
            }
            lineReader = new LineReader(fileIn, job);
        }
 
        if (skipFirstLine) {
            start += lineReader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }
	
    @Override
	public void close() throws IOException {
    	if (lineReader != null)
            lineReader.close();
	}

	@Override
	public LongWritable createKey() {
		return new LongWritable();
	}

	@Override
	public Text createValue() {
		 return new Text();
	}

	@Override
	public long getPos() throws IOException {
		  return pos;
	}

	@Override
	public float getProgress() throws IOException {
	   if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
	}

	@Override
	public boolean next(LongWritable key, Text value) throws IOException {
		 while (pos < end) {
	            key.set(pos);
	            int newSize = lineReader.readLine(value, maxLineLength,
	                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
	                            maxLineLength));
	            // 把字符串中的"##"轉變爲"#"
	            String strReplace = value.toString().replaceAll("\\s+", "\001");
	            Text txtReplace = new Text();
	            txtReplace.set(strReplace);
	            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
	            if (newSize == 0)
	                return false;
	            pos += newSize;
	            if (newSize < maxLineLength)
	                return true;
	        }
	        return false;
	    }
	}

三、處理實例:以下大數據

     

數據處理要求:
    
    12 afd   fewf	fewfe  we
    76 vee   ppt	wfew  wefw
    83 tyutr   ppt	wfew  wefw
    45 vbe   ppt	wfew  wefw
    565 wee   ppt	wfew  wefw
    12 sde   ppt	wfew  wefw
注意:字段之間的空格不一致

一、建表:
    create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
注意:輸出咱可沒有重寫哦

二、加載數據:
    LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;

三、看看的成果:
    select * from micmiu_blog;

本身去試試,不解釋
相關文章
相關標籤/搜索