題記:java
近期在作某個大型銀行的大數據項目,當在處理非結構化數據時,卻發現他們給的數據並不符合hive和pig的處理要求,數據每行必須須要多個分割符才能完美處理,一下午也沒有想到完美的辦法解決,今天從新審視了一下整個過程。看來hive的命令行無法搞定了。因而乎,只能經過代碼來搞定。apache
一、從新實現hive的InputFormat了,別急放碼過來ide
1 package hiveStream; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapred.FileSplit; 8 import org.apache.hadoop.mapred.InputSplit; 9 import org.apache.hadoop.mapred.JobConf; 10 import org.apache.hadoop.mapred.JobConfigurable; 11 import org.apache.hadoop.mapred.RecordReader; 12 import org.apache.hadoop.mapred.Reporter; 13 import org.apache.hadoop.mapred.TextInputFormat; 14 15 public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable{ 16 17 @Override 18 public RecordReader<LongWritable, Text> getRecordReader( 19 InputSplit genericSplit, JobConf job, Reporter reporter) 20 throws IOException { 21 reporter.setStatus(genericSplit.toString()); 22 return new MyRecordReader((FileSplit) genericSplit, job); 23 } 24 25 }
二、仔細看看下面的方法,不解釋,本身領悟。oop
1 package hiveStream; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FSDataInputStream; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.io.compress.CompressionCodec; 13 import org.apache.hadoop.io.compress.CompressionCodecFactory; 14 import org.apache.hadoop.mapred.FileSplit; 15 import org.apache.hadoop.mapred.RecordReader; 16 import org.apache.hadoop.util.LineReader; 17 18 19 public class MyRecordReader implements RecordReader<LongWritable, Text>{ 20 21 private CompressionCodecFactory compressionCodecs = null; 22 private long start; 23 private long pos; 24 private long end; 25 private LineReader lineReader; 26 int maxLineLength; 27 28 public MyRecordReader(InputStream in, long offset, long endOffset, 29 int maxLineLength) { 30 this.maxLineLength = maxLineLength; 31 this.start = offset; 32 this.lineReader = new LineReader(in); 33 this.pos = offset; 34 this.end = endOffset; 35 } 36 37 public MyRecordReader(InputStream in, long offset, long endOffset, 38 Configuration job) throws IOException { 39 this.maxLineLength = job.getInt( 40 "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); 41 this.lineReader = new LineReader(in, job); 42 this.start = offset; 43 this.end = endOffset; 44 } 45 46 // 構造方法 47 public MyRecordReader(FileSplit inputSplit, Configuration job) 48 throws IOException { 49 maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", 50 Integer.MAX_VALUE); 51 start = inputSplit.getStart(); 52 end = start + inputSplit.getLength(); 53 final Path file = inputSplit.getPath(); 54 // 建立壓縮器 55 compressionCodecs = new CompressionCodecFactory(job); 56 final CompressionCodec codec = compressionCodecs.getCodec(file); 57 // 打開文件系統 58 FileSystem fs = file.getFileSystem(job); 59 FSDataInputStream fileIn = fs.open(file); 60 boolean skipFirstLine = false; 61 62 if (codec != null) { 63 lineReader = new LineReader(codec.createInputStream(fileIn), job); 64 end = Long.MAX_VALUE; 65 } else { 66 if (start != 0) { 67 skipFirstLine = true; 68 --start; 69 fileIn.seek(start); 70 } 71 lineReader = new LineReader(fileIn, job); 72 } 73 74 if (skipFirstLine) { 75 start += lineReader.readLine(new Text(), 0, 76 (int) Math.min((long) Integer.MAX_VALUE, end - start)); 77 } 78 this.pos = start; 79 } 80 81 @Override 82 public void close() throws IOException { 83 if (lineReader != null) 84 lineReader.close(); 85 } 86 87 @Override 88 public LongWritable createKey() { 89 return new LongWritable(); 90 } 91 92 @Override 93 public Text createValue() { 94 return new Text(); 95 } 96 97 @Override 98 public long getPos() throws IOException { 99 return pos; 100 } 101 102 @Override 103 public float getProgress() throws IOException { 104 if (start == end) { 105 return 0.0f; 106 } else { 107 return Math.min(1.0f, (pos - start) / (float) (end - start)); 108 } 109 } 110 111 @Override 112 public boolean next(LongWritable key, Text value) throws IOException { 113 while (pos < end) { 114 key.set(pos); 115 int newSize = lineReader.readLine(value, maxLineLength, 116 Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 117 maxLineLength)); 118 // 把字符串中的"##"轉變爲"#" 119 String strReplace = value.toString().replaceAll("\\s+", "\001"); 120 Text txtReplace = new Text(); 121 txtReplace.set(strReplace); 122 value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); 123 if (newSize == 0) 124 return false; 125 pos += newSize; 126 if (newSize < maxLineLength) 127 return true; 128 } 129 return false; 130 } 131 }
三、處理實例:以下大數據
1 數據處理要求: 2 3 12 afd fewf fewfe we 4 76 vee ppt wfew wefw 5 83 tyutr ppt wfew wefw 6 45 vbe ppt wfew wefw 7 565 wee ppt wfew wefw 8 12 sde ppt wfew wefw 9 注意:字段之間的空格不一致 10 11 1、建表: 12 create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 13 注意:輸出咱可沒有重寫哦 14 15 2、加載數據: 16 LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog; 17 18 3、看看的成果: 19 select * from micmiu_blog; 20 21 本身去試試,不解釋