hive多分隔符的解決方案

題記:java

  近期在作某個大型銀行的大數據項目,當在處理非結構化數據時,卻發現他們給的數據並不符合hive和pig的處理要求,數據每行必須須要多個分割符才能完美處理,一下午也沒有想到完美的辦法解決,今天從新審視了一下整個過程。看來hive的命令行無法搞定了。因而乎,只能經過代碼來搞定。apache

 

一、從新實現hive的InputFormat了,別急放碼過來ide

 1 package hiveStream;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapred.FileSplit;
 8 import org.apache.hadoop.mapred.InputSplit;
 9 import org.apache.hadoop.mapred.JobConf;
10 import org.apache.hadoop.mapred.JobConfigurable;
11 import org.apache.hadoop.mapred.RecordReader;
12 import org.apache.hadoop.mapred.Reporter;
13 import org.apache.hadoop.mapred.TextInputFormat;
14 
15 public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{
16 
17     @Override
18     public RecordReader<LongWritable, Text> getRecordReader(
19             InputSplit genericSplit, JobConf job, Reporter reporter)
20             throws IOException {
21          reporter.setStatus(genericSplit.toString());
22            return new MyRecordReader((FileSplit) genericSplit, job);
23     }
24     
25 }
View Code

二、仔細看看下面的方法,不解釋,本身領悟。oop

  1 package hiveStream;
  2 
  3 import java.io.IOException;
  4 import java.io.InputStream;
  5 
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.FSDataInputStream;
  8 import org.apache.hadoop.fs.FileSystem;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.io.compress.CompressionCodec;
 13 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 14 import org.apache.hadoop.mapred.FileSplit;
 15 import org.apache.hadoop.mapred.RecordReader;
 16 import org.apache.hadoop.util.LineReader;
 17 
 18 
 19 public class MyRecordReader implements RecordReader<LongWritable, Text>{
 20     
 21     private CompressionCodecFactory compressionCodecs = null;
 22     private long start;
 23     private long pos;
 24     private long end;
 25     private LineReader lineReader;
 26     int maxLineLength;
 27     
 28     public MyRecordReader(InputStream in, long offset, long endOffset,
 29             int maxLineLength) {
 30         this.maxLineLength = maxLineLength;
 31         this.start = offset;
 32         this.lineReader = new LineReader(in);
 33         this.pos = offset;
 34         this.end = endOffset;
 35     }
 36     
 37     public MyRecordReader(InputStream in, long offset, long endOffset,
 38             Configuration job) throws IOException {
 39         this.maxLineLength = job.getInt(
 40                 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
 41         this.lineReader = new LineReader(in, job);
 42         this.start = offset;
 43         this.end = endOffset;
 44     }
 45     
 46     // 構造方法
 47     public MyRecordReader(FileSplit inputSplit, Configuration job)
 48             throws IOException {
 49         maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
 50                 Integer.MAX_VALUE);
 51         start = inputSplit.getStart();
 52         end = start + inputSplit.getLength();
 53         final Path file = inputSplit.getPath();
 54         // 建立壓縮器
 55         compressionCodecs = new CompressionCodecFactory(job);
 56         final CompressionCodec codec = compressionCodecs.getCodec(file);
 57         // 打開文件系統
 58         FileSystem fs = file.getFileSystem(job);
 59         FSDataInputStream fileIn = fs.open(file);
 60         boolean skipFirstLine = false;
 61  
 62         if (codec != null) {
 63             lineReader = new LineReader(codec.createInputStream(fileIn), job);
 64             end = Long.MAX_VALUE;
 65         } else {
 66             if (start != 0) {
 67                 skipFirstLine = true;
 68                 --start;
 69                 fileIn.seek(start);
 70             }
 71             lineReader = new LineReader(fileIn, job);
 72         }
 73  
 74         if (skipFirstLine) {
 75             start += lineReader.readLine(new Text(), 0,
 76                     (int) Math.min((long) Integer.MAX_VALUE, end - start));
 77         }
 78         this.pos = start;
 79     }
 80     
 81     @Override
 82     public void close() throws IOException {
 83         if (lineReader != null)
 84             lineReader.close();
 85     }
 86 
 87     @Override
 88     public LongWritable createKey() {
 89         return new LongWritable();
 90     }
 91 
 92     @Override
 93     public Text createValue() {
 94          return new Text();
 95     }
 96 
 97     @Override
 98     public long getPos() throws IOException {
 99           return pos;
100     }
101 
102     @Override
103     public float getProgress() throws IOException {
104        if (start == end) {
105             return 0.0f;
106         } else {
107             return Math.min(1.0f, (pos - start) / (float) (end - start));
108         }
109     }
110 
111     @Override
112     public boolean next(LongWritable key, Text value) throws IOException {
113          while (pos < end) {
114                 key.set(pos);
115                 int newSize = lineReader.readLine(value, maxLineLength,
116                         Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
117                                 maxLineLength));
118                 // 把字符串中的"##"轉變爲"#"
119                 String strReplace = value.toString().replaceAll("\\s+", "\001");
120                 Text txtReplace = new Text();
121                 txtReplace.set(strReplace);
122                 value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
123                 if (newSize == 0)
124                     return false;
125                 pos += newSize;
126                 if (newSize < maxLineLength)
127                     return true;
128             }
129             return false;
130         }
131     }
View Code

三、處理實例:以下大數據

 1 數據處理要求:
 2      
 3     12 afd   fewf   fewfe  we
 4     76 vee   ppt    wfew  wefw
 5     83 tyutr   ppt  wfew  wefw
 6     45 vbe   ppt    wfew  wefw
 7     565 wee   ppt   wfew  wefw
 8     12 sde   ppt    wfew  wefw
 9 注意:字段之間的空格不一致
10  
11 1、建表:
12     create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
13 注意:輸出咱可沒有重寫哦
14  
15 2、加載數據:
16     LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
17  
18 3、看看的成果:
19     select * from micmiu_blog;
20  
21 本身去試試,不解釋
View Code
相關文章
相關標籤/搜索