HBase與MapReduce集成

即HBase做爲MapReduce的數據來源,MapReduce 分析,輸出數據存儲在HBase表中html

CLASSPATH

HBase, MapReduce, and the CLASSPATH
By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
官網bb了不少,意思是說,mapReduce 默認是沒有添加HBase的依賴包的,你能夠經過添加HBase-site這個配置文件到hadoop配置目錄下,可是這樣要複製到整個集羣;或者你能夠編輯Hadoop的CLASSPATH,但這樣又會使得你的Hadoop環境受到污染。並且須要重啓Hadoop集羣才能生效。
所以,最好的方法是讓HBase本身添加本身的依賴包到Hadoop的CLASSPATH,而後再使用程序。java

1.輸出MapReduce與HBase集成時候須要的HBase依賴包

bin/hbase mapredcp

2.因而咱們能夠,經過如下方法執行程序

#先將HBase的依賴包告訴世界 (空格) 而後執行mapreduce程序
$ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar

工具包hbase-server-VERSION.jar含了如下幾個功能(超級有用)

# 統計Cell數目
CellCounter: Count cells in HBase table.

# 
WALPlayer: Replay WAL files.

# ******大量的數據加載******重中之重,把TSV、CSV格式的文件經過 MapReduce 直接存儲成 hfile(以塊存儲的HBase文件) 而後加載(移動)到表中去,不走正常的路徑一條條插入
completebulkload: Complete a bulk data load.

# 從一個集羣拷貝到另外一個集羣
copytable: Export a table from local cluster to peer cluster.

# 導入導出數據從HBase >    HDFS 
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
# TSV table分隔 CSV 使用逗號分隔
importtsv: Import data in TSV format.

# 統計行數
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters.

MapReduce讀寫HBase範例程序編寫(參考官網)

package com.gci.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 需求分析,從數據表user讀取info:name到新表basic:info:name
 */

// extends Configured implements Tool 實現Tool接口的run方法,真正的入口的方法
public class Table_user2basic extends Configured implements Tool {

    public static final String sourceTable = "user";
    public static final String targetTable = "basic";

    // 一.Mapper class extends TableMapper<KEYOUT輸出的Key的類型, VALUEOUT輸出的Value的類型>
    // 原版的Mapper程序是有輸入的KV類型,和輸出的KV類型四個參數,源碼:extends Mapper<ImmutableBytesWritable,
    // Result, KEYOUT, VALUEOUT>
    // Put類型爲hbase中定義的類型,便於做爲Reducer的輸入類型,根據reducer輸入類型可知
    public static class ReadUserMapper extends TableMapper<Text, Put> {

        private Text mapOutputKey = new Text();

        @Override
        public void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
                throws IOException, InterruptedException {
            // get rowKey
            String rowKey = Bytes.toString(key.get());

            // set outputRowKey
            mapOutputKey.set(rowKey);

            // 經過rowKey建立put對象
            Put put = new Put(key.get());

            // 迭代以獲取cell數據
            for (Cell cell : value.rawCells()) {
                // add family 詳情請看HBase API 使用(讓info在前,避免了空指針異常)
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                    // add column:name
                    if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                        put.add(cell);
                    }
                }
            }
        }
    }

    // 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT>
    // 輸出key 類型爲ImmutableBytesWritable 實現writeableComparable的字節數組
    // 輸出 value 類型爲 Mutation 是 delete put increment append 的父類
    public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {

        @Override
        public void reduce(Text key, Iterable<Put> values,
                Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            // 從獲得的put中獲得數據
            for (Put put : values) {
                // 往外寫數據
                context.write(null, put);
            }
        }

    }

    // 三.Driver
    public int run(String[] arg0) throws Exception {

        // create job
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());

        // set run job class
        job.setJarByClass(this.getClass());

        // set job
        Scan scan = new Scan();
        scan.setCaching(500); // 每次獲取條目數 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false); // don't set to true for MR jobs
        // set other scan attrs

        // set input and set mapper
        TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
                scan, // Scan instance to control CF and attribute selection
                ReadUserMapper.class, // mapper class
                Text.class, // mapper output key
                Put.class, // mapper output value
                job);

        // set reducer and output
        TableMapReduceUtil.initTableReducerJob(targetTable, // output table
                WriteBasicReducer.class, // reducer class
                job);
        job.setNumReduceTasks(1); // 設置Reduce個數 at least one, adjust as required

        // 提交 submit job
        Boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // get configuration
        Configuration configuration = HBaseConfiguration.create();

        // submit job 提交job
        int status = ToolRunner.run(configuration, new Table_user2basic(), args);

        // exit program 結束程序
        System.exit(status);

    }
}
相關文章
相關標籤/搜索