Hbase的bulkload代碼

      Apache HBase是一個分佈式的、面向列的開源數據庫,它可讓咱們隨機的、實時的訪問大數據。可是怎樣有效的將數據導入到HBase呢?HBase有多種導入數據的方法,最直接的方法就是在MapReduce做業中使用TableOutputFormat做爲輸出,或者使用標準的客戶端API,可是這些都不是很是有效的方法。java

    Bulkload利用MapReduce做業輸出HBase內部數據格式的表數據,而後將生成的StoreFiles直接導入到集羣中。與使用HBase API相比,使用Bulkload導入數據佔用更少的CPU和網絡資源。數據庫

    Bulkload過程主要包括三部分:apache

    1.從數據源(一般是文本文件或其餘的數據庫)提取數據並上傳到HDFS網絡

    這一步不在HBase的考慮範圍內,無論數據源是什麼,只要在進行下一步以前將數據上傳到HDFS便可。app

    2.利用一個MapReduce做業準備數據分佈式

    這一步須要一個MapReduce做業,而且大多數狀況下還須要咱們本身編寫Map函數,而Reduce函數不須要咱們考慮,由HBase提供。該做業須要使用rowkey(行鍵)做爲輸出Key,KeyValue、Put或者Delete做爲輸出Value。MapReduce做業須要使用HFileOutputFormat2來生成HBase數據文件。爲了有效的導入數據,須要配置HFileOutputFormat2使得每個輸出文件都在一個合適的區域中。爲了達到這個目的,MapReduce做業會使用Hadoop的TotalOrderPartitioner類根據表的key值將輸出分割開來。HFileOutputFormat2的方法configureIncrementalLoad()會自動的完成上面的工做。函數

    3.告訴RegionServers數據的位置並導入數據工具

    這一步是最簡單的,一般須要使用LoadIncrementalHFiles(更爲人所熟知是completebulkload工具),將文件在HDFS上的位置傳遞給它,它就會利用RegionServer將數據導入到相應的區域。oop

    下圖簡單明確的說明了整個過程大數據

       注意:在進行BulkLoad以前,要在HBase中建立與程序中同名且結構相同的空表

       Java實現以下:

 1 BulkLoadDriver.java  2 
 3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.conf.Configured;  5 import org.apache.hadoop.fs.FileSystem;  6 import org.apache.hadoop.fs.Path;  7 import org.apache.hadoop.hbase.HBaseConfiguration;  8 import org.apache.hadoop.hbase.TableName;  9 import org.apache.hadoop.hbase.client.Connection; 10 import org.apache.hadoop.hbase.client.ConnectionFactory; 11 import org.apache.hadoop.hbase.client.Put; 12 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 13 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 /**
21 * Created by shaobo on 15-6-9. 22 */
23 public class BulkLoadDriver extends Configured implements Tool { 24 private static final String DATA_SEPERATOR = "\\s+"; 25 private static final String TABLE_NAME = "temperature";//表名
26 private static final String COLUMN_FAMILY_1="date";//列組1
27 private static final String COLUMN_FAMILY_2="tempPerHour";//列組2
28 public static void main(String[] args) { 29           try { 30                 int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args); 31                 if(response == 0) { 32                    System.out.println("Job is successfully completed..."); 33                 } else { 34                           System.out.println("Job failed..."); 35                          } 36  } catch(Exception exception) { 37                         exception.printStackTrace(); 38  } 39                 } 40  public int run(String[] args) throws Exception { 41  String outputPath = args[1]; 42 /**
43 * 設置做業參數 44 */
45 Configuration configuration = getConf(); 46 configuration.set("data.seperator", DATA_SEPERATOR); 47 configuration.set("hbase.table.name", TABLE_NAME); 48 configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1); 49 configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2); 50 Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME); 51 job.setJarByClass(BulkLoadDriver.class); 52 job.setInputFormatClass(TextInputFormat.class); 53 job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定輸出鍵類
54 job.setMapOutputValueClass(Put.class);//指定輸出值類
55 job.setMapperClass(BulkLoadMapper.class);//指定Map函數
56 FileInputFormat.addInputPaths(job, args[0]);//輸入路徑
57 FileSystem fs = FileSystem.get(configuration); 58 Path output = new Path(outputPath); 59 if (fs.exists(output)) { 60 fs.delete(output, true);//若是輸出路徑存在,就將其刪除
61 } 62 FileOutputFormat.setOutputPath(job, output);//輸出路徑
63 Connection connection = ConnectionFactory.createConnection(configuration); 64 TableName tableName = TableName.valueOf(TABLE_NAME); 65 HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName)); 66 job.waitForCompletion(true); 67 if (job.isSuccessful()){ 68 HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//導入數據
69 return 0; 70 } else { 71 return 1; 72 } 73 } 74 }

 

 1 BulkLoadMapper.java  2 
 3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.hbase.client.Put;  5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  6 import org.apache.hadoop.hbase.util.Bytes;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Mapper; 10 /**
11 * Created by shaobo on 15-6-9. 12 */
13 public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 14 private String hbaseTable; 15 private String dataSeperator; 16 private String columnFamily1; 17 private String columnFamily2; 18 public void setup(Context context) { 19 Configuration configuration = context.getConfiguration();//獲取做業參數
20 hbaseTable = configuration.get("hbase.table.name"); 21 dataSeperator = configuration.get("data.seperator"); 22 columnFamily1 = configuration.get("COLUMN_FAMILY_1"); 23 columnFamily2 = configuration.get("COLUMN_FAMILY_2"); 24 } 25 public void map(LongWritable key, Text value, Context context){ 26 try { 27 String[] values = value.toString().split(dataSeperator); 28 ImmutableBytesWritable rowKey = new ImmutableBytesWritable(values[0].getBytes()); 29 Put put = new Put(Bytes.toBytes(values[0])); 30 put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("month"), Bytes.toBytes(values[1])); 31 put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("day"), Bytes.toBytes(values[2])); 32 for (int i = 3; i < values.length; ++i){ 33 put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("hour : " + i), Bytes.toBytes(values[i])); 34 } 35 context.write(rowKey, put); 36 } catch(Exception exception) { 37 exception.printStackTrace(); 38 } 39 } 40 
41 }
 1 HFileLoader.java  2 
 3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.Path;  5 import org.apache.hadoop.hbase.HBaseConfiguration;  6 import org.apache.hadoop.hbase.client.HTable;  7 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  8 /**
 9 * Created by shaobo on 15-6-9. 10 */
11 public class HFileLoader { 12 public static void doBulkLoad(String pathToHFile, String tableName){ 13 try { 14 Configuration configuration = new Configuration(); 15 HBaseConfiguration.addHbaseResources(configuration); 16 LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); 17 HTable hTable = new HTable(configuration, tableName);//指定表名
18 loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//導入數據
19 System.out.println("Bulk Load Completed.."); 20 } catch(Exception exception) { 21 exception.printStackTrace(); 22 } 23 } 24 
25 }

程序編譯打包,提交到Hadoop運行

HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar BulkLoad.jar inputpath outputpath1

上述命令用法可參考 44. HBase, MapReduce, and the CLASSPATH

做業運行狀況:

12/10/16 14:31:07 INFO mapreduce.HFileOutputFormat2: Looking up current regions for table temperature(表名)
12/10/16 14:31:07 INFO mapreduce.HFileOutputFormat2: Configuring 1 reduce partitions to match current region count
12/10/16 14:31:07 INFO mapreduce.HFileOutputFormat2: Writing partition information to /home/shaobo/hadoop/tmp/partitions_5d464f1e-d412-4dbe-bb98-367f8431bdc9
12/10/16 14:31:07 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
12/10/16 14:31:07 INFO compress.CodecPool: Got brand-new compressor [.deflate]
12/10/16 14:31:08 INFO mapreduce.HFileOutputFormat2: Incremental table temperature(表名) output configured.
12/10/16 14:31:08 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
12/10/16 14:31:15 INFO input.FileInputFormat: Total input paths to process : 2
12/10/16 14:31:15 INFO mapreduce.JobSubmitter: number of splits:2
12/10/16 14:31:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1434262360688_0002
12/10/16 14:31:17 INFO impl.YarnClientImpl: Submitted application application_1434262360688_0002
12/10/16 14:31:17 INFO mapreduce.Job: The url to track the job: http://shaobo-ThinkPad-E420:8088/proxy/application_1434262360688_0002/
12/10/16 14:31:17 INFO mapreduce.Job: Running job: job_1434262360688_0002
12/10/16 14:31:28 INFO mapreduce.Job: Job job_1434262360688_0002 running in uber mode : false
12/10/16 14:31:28 INFO mapreduce.Job: map 0% reduce 0%
12/10/16 14:32:24 INFO mapreduce.Job: map 49% reduce 0%
12/10/16 14:32:37 INFO mapreduce.Job: map 67% reduce 0%
12/10/16 14:32:43 INFO mapreduce.Job: map 100% reduce 0%
12/10/16 14:33:39 INFO mapreduce.Job: map 100% reduce 67%
12/10/16 14:33:42 INFO mapreduce.Job: map 100% reduce 70%
12/10/16 14:33:45 INFO mapreduce.Job: map 100% reduce 88%
12/10/16 14:33:48 INFO mapreduce.Job: map 100% reduce 100%
12/10/16 14:33:52 INFO mapreduce.Job: Job job_1434262360688_0002 completed successfully
...
...
...
12/10/16 14:34:02 WARN mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://localhost:9000/user/output/_SUCCESS
12/10/16 14:34:03 INFO hfile.CacheConfig: CacheConfig:disabled
12/10/16 14:34:03 INFO hfile.CacheConfig: CacheConfig:disabled
12/10/16 14:34:07 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://localhost:9000/user/output/date/c64cd2524fba48738bab26630d550b61 first=AQW00061705 last=USW00094910
12/10/16 14:34:07 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://localhost:9000/user/output/tempPerHour/43af29456913444795a820544691eb3d first=AQW00061705 last=USW00094910
Bulk Load Completed..
Job is successfully completed...

BulLoad過程的第三步也能夠在用MapReduce做業生成HBase數據文件後在命令行中進行,不必定要與MapReduce過程寫在一塊兒。

$ hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] outputpath tablename1

若在提交做業是產生以下異常:

12/10/16 11:41:06 INFO mapreduce.Job: Job job_1434420992867_0003 failed with state FAILED due to: Application application_1434420992867_0003 failed 2 times due to AM Container for appattempt_1434420992867_0003_000002 exited with exitCode: -1000
For more detailed output, check application tracking page:http://cdh1:8088/proxy/application_1434420992867_0003/Then, click on links to logs of each attempt.
Diagnostics: Rename cannot overwrite non empty destination directory /data/yarn/nm/usercache/hdfs/filecache/16
java.io.IOException: Rename cannot overwrite non empty destination directory /data/yarn/nm/usercache/hdfs/filecache/16
at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:716)
at org.apache.hadoop.fs.FilterFs.renameInternal(FilterFs.java:228)
at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:659)
at org.apache.hadoop.fs.FileContext.rename(FileContext.java:909)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:364)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Failing this attempt. Failing the application.
12/10/16 11:41:06 INFO mapreduce.Job: Counters: 0

相關文章
相關標籤/搜索