hive建立表指定分隔符,不支持多個字符做爲分隔符,若是想使用多個字符做爲分割符的話就須要實現InputFormat.主要重寫next方法,代碼以下 java
package hiveStream; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new MyRecordReader((FileSplit) genericSplit, job); } }
package hiveStream; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.LineReader; public class MyRecordReader implements RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader lineReader; int maxLineLength; // 構造方法 public MyRecordReader(FileSplit inputSplit, Configuration job) throws IOException { maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); // 建立壓縮器 compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打開文件系統 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); boolean skipFirstLine = false; if (codec != null) { lineReader = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new LineReader(fileIn, job); } if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public MyRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.start = offset; this.lineReader = new LineReader(in); this.pos = offset; this.end = endOffset; } public MyRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt( "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); this.lineReader = new LineReader(in, job); this.start = offset; this.end = endOffset; } @Override public void close() throws IOException { if (lineReader != null) lineReader.close(); } @Override public LongWritable createKey() { return new LongWritable(); } @Override public Text createValue() { return new Text(); } @Override public long getPos() throws IOException { return pos; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public boolean next(LongWritable key, Text value) throws IOException { while (pos < end) { key.set(pos); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 把字符串中的"##"轉變爲"#" String strReplace = value.toString().replace("##", "#"); Text txtReplace = new Text(); txtReplace.set(strReplace); value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); if (newSize == 0) return false; pos += newSize; if (newSize < maxLineLength) return true; } return false; } }
建表語句:自定義 outputformat/inputformat 後,在建表時須要指定 outputformat/inputformat sql
create external table testHiveInput( id int,name string,age int) row format delimited fields terminated by '|' stored as INPUTFORMAT 'hiveStream.MyHiveInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location '/user/hdfs/hiveInput';
測試數據: apache
1##Tom##22
2##Jerry##22
3##Jeny##22 ide
測試代碼( 經過jdbc來查詢數據): oop
public static void testHive() throws Exception { String sql = "select id,name,age from testHiveInput"; Class.forName("org.apache.hive.jdbc.HiveDriver"); String url = "jdbc:hive2://xxx.xxx.xxx.xxx:10000"; Connection conn = DriverManager.getConnection(url, "hive", "passwd"); Statement stmt = conn.createStatement(); stmt.execute("add jar /usr/lib/hive/lib/hiveInput.jar"); ResultSet rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getString("id")+" "+ rs.getString("name") + " " + rs.getString("age")); } }