HBase寫入的各類方式總結匯總(代碼)

HBase寫入的各類方式總結匯總(代碼)

  本文主要是總結一下hbase幾種寫入常見的方式,以及涉及的應用場景,儘可能覆蓋平常業務中的使用場景,另外再總結一下其中涉及到的一些原理知識。也算是本身學習的彙總。hbase也接觸好久了,各類應用的場景也見到了不少。藉此機會好好總結一下。html

  hbase通常的插入過程都使用HTable對象,將數據封裝在Put對象中,Put在new建立的時候須要傳入rowkey,並將列族,列名,列值add進去。而後HTable調用put方法,經過rpc請求提交到Regionserver端。寫入的方式能夠分爲如下幾種java

  1. 單條put
  2. 批量put
  3. 使用Mapreduce
  4. bluckload

  

  HTable數據庫

  要向hbase中寫入就免不了要和HTable打交道,HTable負責向一張hbase表中讀或者寫數據,HTable對象是非線程安全的。多線程使用時須要注意,建立HTable對象時須要指定表名參數,HTable內部有一個LinkedList<Row>的隊列writeAsyncBuffer ,負責對寫入到hbase的數據在客戶端緩存,開啓緩存使用參數  table.setAutoFlushTo(false);  默認狀況不開啓每次put一條數據時,htable對象就會調用flushCommits方法向regserver中提交,開啓緩存則會比較隊列的大小,若是大於某個值則調用flushCommits,這個值默認是2m,能夠經過在hbase-site.xml中設置參數 "hbase.client.write.buffer"來調整,默認是2097152, 在關閉htable鏈接時,會隱式的調用flushCommits方法,保證數據徹底提交。提交時會根據rowkey定位該put應該提交到哪一個reginserver,而後每一個regionserver一組action發送出去,(多扯兩句,這裏和solr略微不一樣,solr能夠把請求發送到任一節點,節點判斷是否屬於當前節點,若是不符合則將請求發送全部節點,但同時也能夠實現和hbase相似的功能)apache

 

  單條put緩存

  最簡單基礎的寫入hbase,通常應用場景是線上業務運行時,記錄單條插入,如報文記錄,處理記錄,寫入後htable對象即釋放。每次提交就是一次rpc請求。安全

table.setAutoFlushTo(true);

複製代碼

1   /**
 2      * 插入一條記錄
 3      * rowkey 爲rk001 列族爲f1
 4      * 插入兩列  c1列   值爲001
 5      *          c2列   值爲002
 6      *
 7      */
 8     public void insertPut(){
 9         //Configuration 加載hbase的配置信息,HBaseConfiguration.create()是先new Configuration而後調用addResource方法將
10         //hbase-site.xml配置文件加載進來
11         Configuration conf = HBaseConfiguration.create();
12         try {
13             table = new HTable(conf,tableName);
14             table.setAutoFlushTo(true);//不顯示設置則默認是true
15 
16             String rowkey  = "rk001";
17             Put  put = new Put(rowkey.getBytes());
18             put.add(cf.getBytes(),"c1".getBytes(),"001".getBytes());
19             put.add(cf.getBytes(),"c2".getBytes(),"002".getBytes());
20             table.put(put);
21             table.close();//關閉hbase鏈接
22 
23         } catch (IOException e) {
24             e.printStackTrace();
25         }
26     }

複製代碼

  多條put多線程

  有了單條的put天然就想到這種方式實際上是低效的,每次只能提交一條記錄,有沒有上面方法能夠一次提交多條記錄呢?減小請求次數, 最簡單的方式使用List<Put>,這種方式操做時和單條put沒有區別,將put對象add到list中,而後調用put(List<Put>)方法,過程和單條put基本一致,應用場景通常在數據量稍多的環境下,經過批量提交減小請求次數架構

複製代碼

1     /**
 2      * 批量請求,一次提交兩條 
 3      */
 4     public void insertPuts() {
 5         Configuration conf = HBaseConfiguration.create();
 6         try {
 7             table = new HTable(conf, tableName);
 8             table.setAutoFlushTo(true);
 9             List<Put> lists = new ArrayList<Put>();
10 
11             String rowkey1 = "rk001";
12             Put put1 = new Put(rowkey1.getBytes());
13             put1.add(cf.getBytes(), "c1".getBytes(), "001".getBytes());
14             put1.add(cf.getBytes(), "c2".getBytes(), "002".getBytes());
15             lists.add(put1);
16 
17             String rowkey2 = "rk002";
18             Put put2 = new Put(rowkey2.getBytes());
19             put2.add(cf.getBytes(), "c1".getBytes(), "v2001".getBytes());
20             put2.add(cf.getBytes(), "c2".getBytes(), "v2002".getBytes());
21             lists.add(put2);
22 
23 
24             table.put(lists);
25             table.close();
26 
27         } catch (IOException e) {
28             e.printStackTrace();
29         }
30 
31 
32     }

複製代碼

 

  使用Mapreduceapp

  以上兩種方式通常用來處理小批量的數據,那麼在面對數據量多的時候應該如何處理呢,常見的作法使用多線程來並行向hbase中寫入,不過這須要咱們本身來控制任務的劃分,比較麻煩,另外值得注意的時HTable對象是線程不安全的,所以在多線程寫入時須要格外注意。而更加常見的作法是使用Mapreduce。HBase自己就是運行在hdfs上的數據庫,所以和Mapreduce有很好的融合。分佈式

  使用mapreduce來向hbase中寫入數據時,將輸入文件拆分紅一個個的塊,而後交給集羣,分佈式的去讀取塊,而後數據寫入到hbase中,而根據具體業務狀況的不一樣,在使用Mapreduce中也有略微的不一樣,先介紹一下最多見的處理過程,使用hbase官方提供的hbase和mapreduce整合的工具類TableMapReduceUtil,具體使用細節能夠參考HBase官方手冊 這裏只貼一下在map端讀入數據,而後直接寫hbase的情景,這種方式通常用於hive或者文件數據入hbase,不須要業務邏輯處理,保持原有的數據入庫,rowkey通常時某個字段或者若干個字段拼接而成,好比卡號信息入庫,使用卡號做爲rowkey(須要對卡號作散列處理,卡號通常爲62或者40開頭,會形成數據熱點問題)

複製代碼

1 package hbase.demo.mapreduce;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.Put;
 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.util.Tool;
19 
20 /**
21  * Created by BB on 2017/2/26.
22  */
23 public class InsertMR extends Configured implements Tool {
24 
25 
26     public static void main(String[] args) throws Exception {
27         InsertMR im = new InsertMR();
28         im.run(args);
29     }
30 
31     public int run(String[] strings) throws Exception {
32         String jobName = "insert data into hbase";
33         String outputTable = "OutTable";
34         String inputPath = "/usr/mapreduce/input";
35         String outputPath = "usr/maprduce/output";
36         Configuration conf = HBaseConfiguration.create();
37         Job job = Job.getInstance(conf, jobName);
38 
39         job.setJarByClass(InsertMR.class);
40 
41         job.setMapperClass(InsertMap.class);
42         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
43         job.setMapOutputValueClass(Put.class);
44 
45         job.setInputFormatClass(TextInputFormat.class);//hadoop 默認使用TextInputFormat
46 
47         //設置輸入輸出路徑
48         FileInputFormat.setInputPaths(job,new Path(inputPath));
49         FileOutputFormat.setOutputPath(job,new Path(outputPath));
50 
51 
52         TableMapReduceUtil.initTableReducerJob(
53                 outputTable,
54                 null,
55                 job);
56         job.setNumReduceTasks(0);
57         return job.waitForCompletion(true) ? 0 : 1;
58     }
59 
60 
61     public class InsertMap extends Mapper<Writable, Text, ImmutableBytesWritable, Put> {
62         @Override
63         protected void map(Writable key, Text value, Context context) {
64             try {
65 
66                 String line = value.toString();
67                 String[] items = line.split(",", -1);
68                 ImmutableBytesWritable outkey = new ImmutableBytesWritable(items[0].getBytes());
69                 String rk = items[0];//rowkey字段
70                 Put put = new Put(rk.getBytes());
71                 put.add("f1".getBytes(), "c1".getBytes(), items[0].getBytes());
72                 put.add("f1".getBytes(), "c2".getBytes(), items[1].getBytes());
73                 context.write(outkey, put);
74             } catch (Exception e) {
75 
76 
77             }
78         }
79 
80     }
81 
82 
83 }

複製代碼

  這種方式最終會調用Tableoutputformat類,核心的原理仍是使用htable的put方法,不過因爲使用了mapreduce分佈式提交到hbase,速度比單線程效率高出許多,可是這種方式也不是萬能的,put提交的熟讀太快時會給hbase形成比較大的壓力,容易發生gc形成節點掛掉,尤爲是初始化表到hbase時,通常都會有不少的歷史數據須要入庫,容易形成比較大的壓力,這種狀況下建議使用下面的方式bulkload方式入庫,減小給hbase壓力。上面這種方式是直接在map中生成put而後交給TableOutputformat去提交的,由於這裏幾乎不須要邏輯處理,若是須要作邏輯處理,那麼通常會在reduce端去生成put對象,在map端作業務邏輯處理,好比數據關聯,彙總之類的

 

  bulkload

  若是在寫入hbase的上述的方式仍是不能知足需求的話,就能夠考慮使用bulkload的方式了。上述幾種方式雖然實現的方式涉及到的東西不一樣,可是本質是同樣的,都是使用HTable對象調用put方法,而後HTable經過rpc提交到reginserver上,而後經過LSM過程以後最終寫入到磁盤上。HBase的數據最終會變成hfile文件落到磁盤上,那麼有沒有一種方式能夠繞過前面的這些過程,直接生成最終的hfile文件呢。確定是有的,bulkload寫入hbase的原理正是基於此。使用mapreduce來生成hbase的hfile文件,而後將文件塞到hbase存儲數據的目錄下,這樣作能夠減小了海量的數據請求時間,也徹底避免了regionserver的處理數據的壓力。因爲涉及到hbase存儲架構的原理,只大概講一下過程,在map端生成put對象,reduce使用hbase提供的KeyValueSortReducer便可,reduce端會將數據按照rowkey作排序,生成hfile文件,而後按照region的分佈對hfile作分割,將分割的hfile文件放到相應的region目錄下,這裏就不詳細贅述,直接上代碼

driver

複製代碼

1 package com.hbase.mapreudce.driver;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.hbase.HBaseConfiguration;
 9 import org.apache.hadoop.hbase.KeyValue;
10 import org.apache.hadoop.hbase.TableNotFoundException;
11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
13 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
14 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
15 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
16 import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
17 import org.apache.hadoop.io.Text;
18 import org.apache.hadoop.mapreduce.Job;
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
21 import org.apache.hadoop.util.GenericOptionsParser;
22 import org.apache.hadoop.util.Tool;
23 import org.apache.hadoop.util.ToolRunner;
24 import org.apache.log4j.Logger;
25 
26 import com.hbase.mapreudce.map.BaseBulkLoadBaseMapper;
27 import com.spdbccc.mapreduce.plus.util.ConnectUtil;
28 import com.spdbccc.mapreduce.plus.util.Util;
29 
30 public class BulkLoadHFileDriver extends Configured implements Tool {
31 
32     private static Logger logger = Logger.getLogger(BulkLoadHFileDriver.class);
33 
34     private String jobname;
35 
36     private Configuration conf;
37 
38     public static void main(String[] args) throws Exception {
39         BulkLoadHFileDriver bld = new BulkLoadHFileDriver();
40         bld.excute(args);
41     }
42 
43     public void excute(String[] args) throws Exception {
44         int rtn = ToolRunner.run(new BulkLoadHFileDriver(), args);
45         this.dobulkLoadFile(conf);
46 
47     }
48 
49     public int run(String[] args) throws Exception {
50         this.conf = HBaseConfiguration.create();
51         String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
52 
53         // conf.get("", "");
54         String tablename = conf.get("", "");
55         String inputPathstr = conf.get("", "");
56         String outputPathstr = conf.get("", "");
57 
58         Path outputPath = Util.getTempPath(conf, outputPathstr, true);
59 
60         Job job = Job.getInstance(conf, "HFile bulk load test");
61         job.setJarByClass(BulkLoadHFileDriver.class);
62 
63         job.setMapperClass(BaseBulkLoadBaseMapper.class);
64         job.setReducerClass(KeyValueSortReducer.class);
65 
66         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
67         job.setMapOutputValueClass(KeyValue.class);
68 
69         job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
70 
71         FileInputFormat.addInputPath(job, new Path(inputPathstr));
72         FileOutputFormat.setOutputPath(job, outputPath);
73 
74         HFileOutputFormat2.configureIncrementalLoad(job, ConnectUtil.getHTable(conf, tablename));
75 
76         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
77         loader.doBulkLoad(new Path(dfsArgs[0]), ConnectUtil.getHTable(conf, tablename));
78 
79         return job.waitForCompletion(true) ? 0 : 1;
80     }
81 
82     private void dobulkLoadFile(Configuration conf) throws Exception {
83         String tablename = conf.get("", "");
84         String hfiledirpathstr = conf.get("", "");
85 
86         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
87         loader.doBulkLoad(new Path(hfiledirpathstr), ConnectUtil.getHTable(conf, tablename));
88 
89     }
90 
91 }

複製代碼

map

複製代碼

1 package com.hbase.mapreudce.map;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.hbase.KeyValue;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.hbase.util.Bytes;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.Writable;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.log4j.Logger;
13 
14 public class BaseBulkLoadBaseMapper extends
15 Mapper<Writable, Text, ImmutableBytesWritable, KeyValue> {
16 
17      
18     private static Logger logger = Logger.getLogger(BaseBulkLoadBaseMapper.class);
19     
20      @Override
21      protected void map(Writable key, Text value, Context context)
22              throws IOException, InterruptedException {
23             String line = value.toString();
24             String[] items = line.split(",", -1);
25             ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
26                     items[0].getBytes());
27 
28             KeyValue kv = new KeyValue(Bytes.toBytes(items[0]),
29                     Bytes.toBytes(items[1]), Bytes.toBytes(items[2]),
30                     System.currentTimeMillis(), Bytes.toBytes(items[3]));
31             if (null != kv) {
32                 context.write(rowkey, kv);
33             }
34          
35          
36          
37      }
  
47 }
相關文章
相關標籤/搜索