即HBase做爲MapReduce的數據來源,MapReduce 分析,輸出數據存儲在HBase表中html
HBase, MapReduce, and the CLASSPATH
By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
官網bb了不少,意思是說,mapReduce 默認是沒有添加HBase的依賴包的,你能夠經過添加HBase-site這個配置文件到hadoop配置目錄下,可是這樣要複製到整個集羣;或者你能夠編輯Hadoop的CLASSPATH,但這樣又會使得你的Hadoop環境受到污染。並且須要重啓Hadoop集羣才能生效。
所以,最好的方法是讓HBase本身添加本身的依賴包到Hadoop的CLASSPATH,而後再使用程序。java
bin/hbase mapredcp
#先將HBase的依賴包告訴世界 (空格) 而後執行mapreduce程序 $ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar
# 統計Cell數目 CellCounter: Count cells in HBase table. # WALPlayer: Replay WAL files. # ******大量的數據加載******重中之重,把TSV、CSV格式的文件經過 MapReduce 直接存儲成 hfile(以塊存儲的HBase文件) 而後加載(移動)到表中去,不走正常的路徑一條條插入 completebulkload: Complete a bulk data load. # 從一個集羣拷貝到另外一個集羣 copytable: Export a table from local cluster to peer cluster. # 導入導出數據從HBase > HDFS export: Write table data to HDFS. exportsnapshot: Export the specific snapshot to a given FileSystem. import: Import data written by Export. # TSV table分隔 CSV 使用逗號分隔 importtsv: Import data in TSV format. # 統計行數 rowcounter: Count rows in HBase table. verifyrep: Compare the data from tables in two different clusters.
package com.gci.hadoop.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 需求分析,從數據表user讀取info:name到新表basic:info:name */ // extends Configured implements Tool 實現Tool接口的run方法,真正的入口的方法 public class Table_user2basic extends Configured implements Tool { public static final String sourceTable = "user"; public static final String targetTable = "basic"; // 一.Mapper class extends TableMapper<KEYOUT輸出的Key的類型, VALUEOUT輸出的Value的類型> // 原版的Mapper程序是有輸入的KV類型,和輸出的KV類型四個參數,源碼:extends Mapper<ImmutableBytesWritable, // Result, KEYOUT, VALUEOUT> // Put類型爲hbase中定義的類型,便於做爲Reducer的輸入類型,根據reducer輸入類型可知 public static class ReadUserMapper extends TableMapper<Text, Put> { private Text mapOutputKey = new Text(); @Override public void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context) throws IOException, InterruptedException { // get rowKey String rowKey = Bytes.toString(key.get()); // set outputRowKey mapOutputKey.set(rowKey); // 經過rowKey建立put對象 Put put = new Put(key.get()); // 迭代以獲取cell數據 for (Cell cell : value.rawCells()) { // add family 詳情請看HBase API 使用(讓info在前,避免了空指針異常) if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { // add column:name if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } } } } } // 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT> // 輸出key 類型爲ImmutableBytesWritable 實現writeableComparable的字節數組 // 輸出 value 類型爲 Mutation 是 delete put increment append 的父類 public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<Put> values, Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { // 從獲得的put中獲得數據 for (Put put : values) { // 往外寫數據 context.write(null, put); } } } // 三.Driver public int run(String[] arg0) throws Exception { // create job Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); // set run job class job.setJarByClass(this.getClass()); // set job Scan scan = new Scan(); scan.setCaching(500); // 每次獲取條目數 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs // set input and set mapper TableMapReduceUtil.initTableMapperJob(sourceTable, // input table scan, // Scan instance to control CF and attribute selection ReadUserMapper.class, // mapper class Text.class, // mapper output key Put.class, // mapper output value job); // set reducer and output TableMapReduceUtil.initTableReducerJob(targetTable, // output table WriteBasicReducer.class, // reducer class job); job.setNumReduceTasks(1); // 設置Reduce個數 at least one, adjust as required // 提交 submit job Boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { // get configuration Configuration configuration = HBaseConfiguration.create(); // submit job 提交job int status = ToolRunner.run(configuration, new Table_user2basic(), args); // exit program 結束程序 System.exit(status); } }