在進行數據傳輸中,批量加載數據到HBase集羣有多種方式,好比經過HBase API進行批量寫入數據、使用Sqoop工具批量導數到HBase集羣、使用MapReduce批量導入等。這些方式,在導入數據的過程當中,若是數據量過大,可能耗時會比較嚴重或者佔用HBase集羣資源較多(如磁盤IO、HBase Handler數等)。今天這篇博客筆者將爲你們分享使用HBase BulkLoad的方式來進行海量數據批量寫入到HBase集羣。apache
在使用BulkLoad以前,咱們先來了解一下HBase的存儲機制。HBase存儲數據其底層使用的是HDFS來做爲存儲介質,HBase的每一張表對應的HDFS目錄上的一個文件夾,文件夾名以HBase表進行命名(若是沒有使用命名空間,則默認在default目錄下),在表文件夾下存放在若干個Region命名的文件夾,Region文件夾中的每一個列簇也是用文件夾進行存儲的,每一個列簇中存儲就是實際的數據,以HFile的形式存在。路徑格式以下:app
/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>
按照HBase存儲數據按照HFile格式存儲在HDFS的原理,使用MapReduce直接生成HFile格式的數據文件,而後在經過RegionServer將HFile數據文件移動到相應的Region上去。流程以下圖所示:dom
HFile文件的生成,可使用MapReduce來進行實現,將數據源準備好,上傳到HDFS進行存儲,而後在程序中讀取HDFS上的數據源,進行自定義封裝,組裝RowKey,而後將封裝後的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。實現代碼以下:tcp
/** * Read DataSource from hdfs & Gemerator hfile. * * @author smartloli. * * Created by Aug 19, 2018 */ public class GemeratorHFile2 { static class HFileImportMapper2 extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { protected final String CF_KQ = "cf"; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println("line : " + line); String[] datas = line.split(" "); String row = new Date().getTime() + "_" + datas[1]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row)); KeyValue kv = new KeyValue(Bytes.toBytes(row), this.CF_KQ.getBytes(), datas[1].getBytes(), datas[2].getBytes()); context.write(rowkey, kv); } } public static void main(String[] args) { if (args.length != 1) { System.out.println("<Usage>Please input hbase-site.xml path.</Usage>"); return; } Configuration conf = new Configuration(); conf.addResource(new Path(args[0])); conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID()); String tableName = "person"; String input = "hdfs://nna:9000/tmp/person.txt"; String output = "hdfs://nna:9000/tmp/pres"; System.out.println("table : " + tableName); HTable table; try { try { FileSystem fs = FileSystem.get(URI.create(output), conf); fs.delete(new Path(output), true); fs.close(); } catch (IOException e1) { e1.printStackTrace(); } Connection conn = ConnectionFactory.createConnection(conf); table = (HTable) conn.getTable(TableName.valueOf(tableName)); Job job = Job.getInstance(conf); job.setJobName("Generate HFile"); job.setJarByClass(GemeratorHFile2.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HFileImportMapper2.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, new Path(output)); HFileOutputFormat2.configureIncrementalLoad(job, table); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } }
在HDFS目錄/tmp/person.txt中,準備數據源以下:ide
1 smartloli 100 2 smartloli2 101 3 smartloli3 102
而後,將上述代碼編譯打包成jar,上傳到Hadoop集羣進行執行,執行命令以下:工具
hadoop jar GemeratorHFile2.jar /data/soft/new/apps/hbaseapp/hbase-site.xml
若是在執行命令的過程當中,出現找不到類的異常信息,多是本地沒有加載HBase依賴JAR包,在當前用戶中配置以下環境變量信息:oop
export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:classpath
而後,執行source命令使配置的內容當即生生效。學習
在成功提交任務後,Linux控制檯會打印執行任務進度,也能夠到YARN的資源監控界面查看執行進度,結果以下所示:大數據
等待任務的執行,執行完成後,在對應HDFS路徑上會生成相應的HFile數據文件,以下圖所示:this
而後,在使用BulkLoad的方式將生成的HFile文件導入到HBase集羣中,這裏有2種方式。一種是寫代碼實現導入,另外一種是使用HBase命令進行導入。
經過LoadIncrementalHFiles類來實現導入,具體代碼以下:
/** * Use BulkLoad inport hfile from hdfs to hbase. * * @author smartloli. * * Created by Aug 19, 2018 */ public class BulkLoad2HBase { public static void main(String[] args) throws Exception { if (args.length != 1) { System.out.println("<Usage>Please input hbase-site.xml path.</Usage>"); return; } String output = "hdfs://cluster1/tmp/pres"; Configuration conf = new Configuration(); conf.addResource(new Path(args[0])); HTable table = new HTable(conf, "person"); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(output), table); } }
執行上述代碼,運行結果以下:
先將生成好的HFile文件遷移到目標集羣(即HBase集羣所在的HDFS上),而後在使用HBase命令進行導入,執行命令以下:
# 先使用distcp遷移hfile hadoop distcp -Dmapreduce.job.queuename=queue_1024_01 -update -skipcrccheck -m 10 /tmp/pres hdfs://nns:9000/tmp/pres # 使用bulkload方式導入數據 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/pres person
最後,咱們能夠到指定的RegionServer節點上查看導入的日誌信息,以下所示爲導入成功的日誌信息:
2018-08-19 16:30:34,969 INFO [B.defaultRpcServer.handler=7,queue=1,port=16020] regionserver.HStore: Successfully loaded store file hdfs://cluster1/tmp/pres/cf/7b455535f660444695589edf509935e9 into store cf (new location: hdfs://cluster1/hbase/data/default/person/2d7483d4abd6d20acdf16533a3fdf18f/cf/d72c8846327d42e2a00780ac2facf95b_SeqId_4_)
使用BulkLoad方式導入數據後,能夠進入到HBase集羣,使用HBase Shell來查看數據是否導入成功,預覽結果以下:
本篇博客爲了演示實戰效果,將生成HFile文件和使用BulkLoad方式導入HFile到HBase集羣的步驟進行了分解,實際狀況中,能夠將這兩個步驟合併爲一個,實現自動化生成與HFile自動導入。若是在執行的過程當中出現RpcRetryingCaller的異常,能夠到對應RegionServer節點查看日誌信息,這裏面記錄了出現這種異常的詳細緣由。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!
另外,博主出書了《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。