題記:java
近期在作某個大型銀行的大數據項目,當在處理非結構化數據時,卻發現他們給的數據並不符合hive和pig的處理要求,數據每行必須須要多個分割符才能完美處理,一下午也沒有想到完美的辦法解決,今天從新審視了一下整個過程。看來hive的命令行無法搞定了。因而乎,只能經過代碼來搞定。apache
一、從新實現hive的InputFormat了,別急放碼過來ide
package hiveStream; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable{ @Override public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new MyRecordReader((FileSplit) genericSplit, job); } }
二、仔細看看下面的方法,不解釋,本身領悟。oop
package hiveStream; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.LineReader; public class MyRecordReader implements RecordReader<LongWritable, Text>{ private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader lineReader; int maxLineLength; public MyRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.start = offset; this.lineReader = new LineReader(in); this.pos = offset; this.end = endOffset; } public MyRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt( "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); this.lineReader = new LineReader(in, job); this.start = offset; this.end = endOffset; } // 構造方法 public MyRecordReader(FileSplit inputSplit, Configuration job) throws IOException { maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); // 建立壓縮器 compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打開文件系統 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); boolean skipFirstLine = false; if (codec != null) { lineReader = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new LineReader(fileIn, job); } if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } @Override public void close() throws IOException { if (lineReader != null) lineReader.close(); } @Override public LongWritable createKey() { return new LongWritable(); } @Override public Text createValue() { return new Text(); } @Override public long getPos() throws IOException { return pos; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public boolean next(LongWritable key, Text value) throws IOException { while (pos < end) { key.set(pos); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 把字符串中的"##"轉變爲"#" String strReplace = value.toString().replaceAll("\\s+", "\001"); Text txtReplace = new Text(); txtReplace.set(strReplace); value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); if (newSize == 0) return false; pos += newSize; if (newSize < maxLineLength) return true; } return false; } }
三、處理實例:以下大數據
數據處理要求: 12 afd fewf fewfe we 76 vee ppt wfew wefw 83 tyutr ppt wfew wefw 45 vbe ppt wfew wefw 565 wee ppt wfew wefw 12 sde ppt wfew wefw 注意:字段之間的空格不一致 一、建表: create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 注意:輸出咱可沒有重寫哦 二、加載數據: LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog; 三、看看的成果: select * from micmiu_blog; 本身去試試,不解釋