HBase 寫優化之 BulkLoad 實現數據快速入庫

一、爲什麼要 BulkLoad 導入?傳統的 HTableOutputFormat 寫 HBase 有什麼問題?

咱們先看下 HBase 的寫流程: html

一般 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put對象寫入HBase,該方式在大數據量寫入時效率低下(HBase會block寫入,頻繁進行flush,split,compact等大量IO操做),並對HBase節點的穩定性形成必定的影響(GC時間過長,響應變慢,致使節點超時退出,並引發一系列連鎖反應),而HBase支持 bulk load 的入庫方式,它是利用hbase的數據信息按照特定格式存儲在hdfs內這一原理,直接在HDFS中生成持久化的HFile數據格式文件,而後上傳至合適位置,即完成巨量數據快速入庫的辦法。配合mapreduce完成,高效便捷,並且不佔用region資源,增添負載,在大數據量寫入時能極大的提升寫入效率,並下降對HBase節點的寫入壓力。
經過使用先生成HFile,而後再BulkLoad到Hbase的方式來替代以前直接調用HTableOutputFormat的方法有以下的好處:
(1)消除了對HBase集羣的插入壓力
(2)提升了Job的運行速度,下降了Job的執行時間
目前此種方式僅僅適用於只有一個列族的狀況,在新版 HBase 中,單列族的限制會消除。 java

二、bulkload 流程與實踐

bulkload 方式須要兩個Job配合完成:
(1)第一個Job仍是運行原來業務處理邏輯,處理的結果不直接調用HTableOutputFormat寫入到HBase,而是先寫入到HDFS上的一箇中間目錄下(如 middata)
(2)第二個Job以第一個Job的輸出(middata)作爲輸入,而後將其格式化HBase的底層存儲文件HFile
(3)調用BulkLoad將第二個Job生成的HFile導入到對應的HBase表中

下面給出相應的範例代碼: apache

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class GeneratePutHFileAndBulkLoadToHBase {

	public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
	{

		private Text wordText=new Text();
		private IntWritable one=new IntWritable(1);
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line=value.toString();
			String[] wordArray=line.split(" ");
			for(String word:wordArray)
			{
				wordText.set(word);
				context.write(wordText, one);
			}
			
		}
	}
	
	public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{

		private IntWritable result=new IntWritable();
		protected void reduce(Text key, Iterable<IntWritable> valueList,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			int sum=0;
			for(IntWritable value:valueList)
			{
				sum+=value.get();
			}
			result.set(sum);
			context.write(key, result);
		}
		
	}
	
	public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
	{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String wordCountStr=value.toString();
			String[] wordCountArray=wordCountStr.split("\t");
			String word=wordCountArray[0];
			int count=Integer.valueOf(wordCountArray[1]);
			
			//建立HBase中的RowKey
			byte[] rowKey=Bytes.toBytes(word);
			ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
			byte[] family=Bytes.toBytes("cf");
			byte[] qualifier=Bytes.toBytes("count");
			byte[] hbaseValue=Bytes.toBytes(count);
			// Put 用於列簇下的多列提交,若只有一個列,則可使用 KeyValue 格式
			// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
			Put put=new Put(rowKey);
			put.add(family, qualifier, hbaseValue);
			context.write(rowKeyWritable, put);
			
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
        Configuration hadoopConfiguration=new Configuration();
        String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();
		
        //第一個Job就是普通MR,輸出到指定的目錄
        Job job=new Job(hadoopConfiguration, "wordCountJob");
        job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(dfsArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
        //提交第一個Job
        int wordCountJobResult=job.waitForCompletion(true)?0:1;
        
        //第二個Job以第一個Job的輸出作爲輸入,只須要編寫Mapper類,在Mapper類中對一個job的輸出進行分析,並轉換爲HBase須要的KeyValue的方式。
        Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload");
        
        convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);
		//ReducerClass 無需指定,框架會自行根據 MapOutputValueClass 來決定是使用 KeyValueSortReducer 仍是 PutSortReducer
		//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
        convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
        convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
        
        //以第一個Job的輸出作爲第二個Job的輸入
        FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1]));
        FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2]));
        //建立HBase的配置對象
        Configuration hbaseConfiguration=HBaseConfiguration.create();
        //建立目標表對象
        HTable wordCountTable =new HTable(hbaseConfiguration, "word_count");
        HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
       
        //提交第二個job
        int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;
        
        //當第二個job結束以後,調用BulkLoad方式來將MR結果批量入庫
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);
        //第一個參數爲第二個Job的輸出目錄即保存HFile的目錄,第二個參數爲目標表
        loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable);
        
        //最後調用System.exit進行退出
        System.exit(convertWordCountJobOutputToHFileJobResult);
		
	}

}

好比原始的輸入數據的目錄爲:/rawdata/test/wordcount/20131212  app

中間結果數據保存的目錄爲:/middata/test/wordcount/20131212
最終生成的HFile保存的目錄爲:/resultdata/test/wordcount/20131212
運行上面的Job的方式以下:
hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212

三、說明與注意事項:

(1)HFile方式在全部的加載方案裏面是最快的,不過有個前提——數據是第一次導入,表是空的。若是表中已經有了數據。HFile再導入到hbase的表中會觸發split操做。 框架

(2)最終輸出結果,不管是map仍是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
不然報這樣的錯誤: ide

java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
(3)最終輸出部分,Value類型是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer,這個 SorterReducer 能夠不指定,由於源碼中已經作了判斷:
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(PutSortReducer.class);
} else {
	LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile文件,多列簇須要起多個 job,不過新版本的 Hbase 已經解決了這個限制。

(5) MR例子中最後生成HFile存儲在HDFS上,輸出路徑下的子目錄是各個列族。若是對HFile進行入庫HBase,至關於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。

(6)最後一個 Reduce 沒有 setNumReduceTasks 是由於,該設置由框架根據region個數自動配置的。 oop

(7)下邊配置部分,註釋掉的其實寫不寫都無所謂,由於看源碼就知道configureIncrementalLoad方法已經把固定的配置全配置完了,不固定的部分才須要手動配置。 源碼分析

public class HFileOutput {
        //job 配置
	public static Job configureJob(Configuration conf) throws IOException {
		Job job = new Job(configuration, "countUnite1");
		job.setJarByClass(HFileOutput.class);
                //job.setNumReduceTasks(2);  
		//job.setOutputKeyClass(ImmutableBytesWritable.class);
		//job.setOutputValueClass(KeyValue.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
 
		Scan scan = new Scan();
		scan.setCaching(10);
		scan.addFamily(INPUT_FAMILY);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan,
				HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
		//這裏若是不定義reducer部分,會自動識別定義成KeyValueSortReducer.class 和PutSortReducer.class
                job.setReducerClass(HFileOutputRedcuer.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
		HFileOutputFormat.configureIncrementalLoad(job, new HTable(
				configuration, outputTable));
		HFileOutputFormat.setOutputPath(job, new Path());
                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
		return job;
	}
 
	public static class HFileOutputMapper extends
			TableMapper<ImmutableBytesWritable, LongWritable> {
		public void map(ImmutableBytesWritable key, Result values,
				Context context) throws IOException, InterruptedException {
			//mapper邏輯部分
			context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
		}
	}
 
	public static class HFileOutputRedcuer extends
			Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
		public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
                        //reducer邏輯部分
			KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
					Bytes.toBytes(count));
			context.write(key, kv);
		}
	}
}

四、Refer:

一、Hbase幾種數據入庫(load)方式比較 大數據

http://blog.csdn.net/kirayuan/article/details/6371635 spa

二、MapReduce生成HFile入庫到HBase及源碼分析

http://blog.pureisle.net/archives/1950.html

三、MapReduce生成HFile入庫到HBase

http://shitouer.cn/2013/02/hbase-hfile-bulk-load/

相關文章
相關標籤/搜索