HBase的原型是Google的BigTable論文,受到了該論文思想的啓發,目前做爲Hadoop的子項目來開發維護,用於支持結構化的數據存儲。 html
官方網站:http://hbase.apache.org java
-- 2006年Google發表BigTable白皮書 node
-- 2006年開始開發HBase linux
-- 2008年北京成功開奧運會,程序員默默地將HBase弄成了Hadoop的子項目 程序員
-- 2010年HBase成爲Apache頂級項目 算法
-- 如今不少公司二次開發出了不少發行版本,你也開始使用了。 sql
HBase是一個高可靠性、高性能、面向列、可伸縮的分佈式存儲系統,利用HBASE技術可在廉價PC Server上搭建起大規模結構化存儲集羣。 shell
HBase的目標是存儲並處理大型的數據,更具體來講是僅需使用普通的硬件配置,就可以處理由成千上萬的行和列所組成的大型數據。 數據庫
HBase是Google Bigtable的開源實現,可是也有不少不一樣之處。好比:Google Bigtable利用GFS做爲其文件存儲系統,HBase利用Hadoop HDFS做爲其文件存儲系統;Google運行MAPREDUCE來處理Bigtable中的海量數據,HBase一樣利用Hadoop MapReduce來處理HBase中的海量數據;Google Bigtable利用Chubby做爲協同服務,HBase利用Zookeeper做爲對應。 apache
1)海量存儲
Hbase適合存儲PB級別的海量數據,在PB級別的數據以及採用廉價PC存儲的狀況下,能在幾十到百毫秒內返回數據。這與Hbase的極易擴展性息息相關。正式由於Hbase良好的擴展性,才爲海量數據的存儲提供了便利。
2)列式存儲
這裏的列式存儲其實說的是列族存儲,Hbase是根據列族來存儲數據的。列族下面能夠有很是多的列,列族在建立表的時候就必須指定。
3)極易擴展
Hbase的擴展性主要體如今兩個方面,一個是基於上層處理能力(RegionServer)的擴展,一個是基於存儲的擴展(HDFS)。
經過橫向添加RegionSever的機器,進行水平擴展,提高Hbase上層的處理能力,提高Hbsae服務更多Region的能力。
備註:RegionServer的做用是管理region、承接業務的訪問,這個後面會詳細的介紹經過橫向添加Datanode的機器,進行存儲層擴容,提高Hbase的數據存儲能力和提高後端存儲的讀寫能力。
4)高併發
因爲目前大部分使用Hbase的架構,都是採用的廉價PC,所以單個IO的延遲其實並不小,通常在幾十到上百ms之間。這裏說的高併發,主要是在併發的狀況下,Hbase的單個IO延遲降低並很少。能得到高併發、低延遲的服務。
5)稀疏
稀疏主要是針對Hbase列的靈活性,在列族中,你能夠指定任意多的列,在列數據爲空的狀況下,是不會佔用存儲空間的。
Hbase架構如圖1所示:
圖1 HBase架構圖
從圖中能夠看出Hbase是由Client、Zookeeper、Master、HRegionServer、HDFS等幾個組件組成,下面來介紹一下幾個組件的相關功能:
1)Client
Client包含了訪問Hbase的接口,另外Client還維護了對應的cache來加速Hbase的訪問,好比cache的.META.元數據的信息。
2)Zookeeper
HBase經過Zookeeper來作master的高可用、RegionServer的監控、元數據的入口以及集羣配置的維護等工做。具體工做以下:
經過Zoopkeeper來保證集羣中只有1個master在運行,若是master異常,會經過競爭機制產生新的master提供服務
經過Zoopkeeper來監控RegionServer的狀態,當RegionSevrer有異常的時候,經過回調的形式通知Master RegionServer上下線的信息
經過Zoopkeeper存儲元數據的統一入口地址
3)Hmaster
master節點的主要職責以下:
爲RegionServer分配Region
維護整個集羣的負載均衡
維護集羣的元數據信息
發現失效的Region,並將失效的Region分配到正常的RegionServer上
當RegionSever失效的時候,協調對應Hlog的拆分
4)HregionServer
HregionServer直接對接用戶的讀寫請求,是真正的"幹活"的節點。它的功能歸納以下:
管理master爲其分配的Region
處理來自客戶端的讀寫請求
負責和底層HDFS的交互,存儲數據到HDFS
負責Region變大之後的拆分
負責Storefile的合併工做
5)HDFS
HDFS爲Hbase提供最終的底層數據存儲服務,同時爲HBase提供高可用(Hlog存儲在HDFS)的支持,具體功能歸納以下:
提供元數據和表數據的底層分佈式存儲服務
數據多副本,保證的高可靠和高可用性
功能
1.監控RegionServer
2.處理RegionServer故障轉移
3.處理元數據的變動
4.處理region的分配或轉移
5.在空閒時間進行數據的負載均衡
6.經過Zookeeper發佈本身的位置給客戶端
功能
1.負責存儲HBase的實際數據
2.處理分配給它的Region
3.刷新緩存到HDFS
4.維護Hlog
5.執行壓縮
6.負責處理Region分片
1.Write-Ahead logs
HBase的修改記錄,當對HBase讀寫數據的時候,數據不是直接寫進磁盤,它會在內存中保留一段時間(時間以及數據量閾值能夠設定)。但把數據保存在內存中可能有更高的機率引發數據丟失,爲了解決這個問題,數據會先寫在一個叫作Write-Ahead logfile的文件中,而後再寫入內存中。因此在系統出現故障的時候,數據能夠經過這個日誌文件重建。
2.Region
Hbase表的分片,HBase表會根據RowKey值被切分紅不一樣的region存儲在RegionServer中,在一個RegionServer中能夠有多個不一樣的region。
3.Store
HFile存儲在Store中,一個Store對應HBase表中的一個列族。
4.MemStore
顧名思義,就是內存存儲,位於內存中,用來保存當前的數據操做,因此當數據保存在WAL中以後,RegsionServer會在內存中存儲鍵值對。
5.HFile
這是在磁盤上保存原始數據的實際的物理文件,是實際的存儲文件。StoreFile是以Hfile的形式存儲在HDFS的。
首先保證Zookeeper集羣的正常部署,並啓動之:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
Hadoop集羣的正常部署並啓動:
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
解壓HBase到指定目錄:
[atguigu@hadoop102 software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
修改HBase對應的配置文件。
1)hbase-env.sh修改內容:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
2)hbase-site.xml修改內容:
<configuration> <property> <name>hbase.rootdir</name> <value>hdfs://hadoop102:9000/hbase</value> </property>
<property> <name>hbase.cluster.distributed</name> <value>true</value> </property>
<!-- 0.98後的新變更,以前版本沒有.port,默認端口爲60000 --> <property> <name>hbase.master.port</name> <value>16000</value> </property>
<property> <name>hbase.zookeeper.quorum</name> <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value> </property>
<property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/module/zookeeper-3.4.10/zkData</value> </property> </configuration> |
3)regionservers:
hadoop102 hadoop103 hadoop104 |
4)軟鏈接hadoop配置文件到hbase:
[atguigu@hadoop102 module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml
/opt/module/hbase/conf/core-site.xml
[atguigu@hadoop102 module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
/opt/module/hbase/conf/hdfs-site.xml
[atguigu@hadoop102 module]$ xsync hbase/
1.啓動方式1
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start master
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start regionserver
提示:若是集羣之間的節點時間不一樣步,會致使regionserver沒法啓動,拋出ClockOutOfSyncException異常。
修復提示:
a、同步時間服務
請參看幫助文檔:《尚硅谷大數據技術之Hadoop入門》
b、屬性:hbase.master.maxclockskew設置更大的值
<property> <name>hbase.master.maxclockskew</name> <value>180000</value> <description>Time difference of regionserver from master</description> </property> |
2.啓動方式2
[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
對應的中止服務:
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
啓動成功後,能夠經過"host:port"的方式來訪問HBase管理頁面,例如:
1.進入HBase客戶端命令行
[atguigu@hadoop102 hbase]$ bin/hbase shell
2.查看幫助命令
hbase(main):001:0> help
3.查看當前數據庫中有哪些表
hbase(main):002:0> list
1.建立表
hbase(main):002:0> create 'student','info'
2.插入數據到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3.掃描查看錶數據
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4.查看錶結構
hbase(main):011:0> describe 'student'
5.更新指定字段的數據
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6.查看"指定行"或"指定列族:列"的數據
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7.統計表數據行數
hbase(main):021:0> count 'student'
8.刪除數據
刪除某rowkey的所有數據:
hbase(main):016:0> deleteall 'student','1001'
刪除某rowkey的某一列數據:
hbase(main):017:0> delete 'student','1002','info:sex'
9.清空表數據
hbase(main):018:0> truncate 'student'
提示:清空表的操做順序爲先disable,而後再truncate。
10.刪除表
首先須要先讓該表爲disable狀態:
hbase(main):019:0> disable 'student'
而後才能drop這個表:
hbase(main):020:0> drop 'student'
提示:若是直接drop表,會報錯:ERROR: Table student is enabled. Disable it first.
11.變動表信息
將info列族中的數據存放3個版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
與nosql數據庫們同樣,RowKey是用來檢索記錄的主鍵。訪問HBASE table中的行,只有三種方式:
1.經過單個RowKey訪問
2.經過RowKey的range(正則)
3.全表掃描
RowKey行鍵 (RowKey)能夠是任意字符串(最大長度是64KB,實際應用中長度通常爲 10-100bytes),在HBASE內部,RowKey保存爲字節數組。存儲時,數據按照RowKey的字典序(byte order)排序存儲。設計RowKey時,要充分排序存儲這個特性,將常常一塊兒讀取的行存儲放到一塊兒。(位置相關性)
列族:HBASE表中的每一個列,都歸屬於某個列族。列族是表的schema的一部 分(而列不是),必須在使用表以前定義。列名都以列族做爲前綴。例如 courses:history,courses:math都屬於courses 這個列族。
由{rowkey, column Family:columu, version} 惟一肯定的單元。cell中的數據是沒有類型的,所有是字節碼形式存貯。
關鍵字:無類型、字節碼
HBASE 中經過rowkey和columns肯定的爲一個存貯單元稱爲cell。每一個 cell都保存 着同一份數據的多個版本。版本經過時間戳來索引。時間戳的類型是 64位整型。時間戳能夠由HBASE(在數據寫入時自動 )賦值,此時時間戳是精確到毫秒 的當前系統時間。時間戳也能夠由客戶顯式賦值。若是應用程序要避免數據版 本衝突,就必須本身生成具備惟一性的時間戳。每一個 cell中,不一樣版本的數據按照時間倒序排序,即最新的數據排在最前面。
爲了不數據存在過多版本形成的的管理 (包括存貯和索引)負擔,HBASE提供 了兩種數據版本回收方式。一是保存數據的最後n個版本,二是保存最近一段 時間內的版本(好比最近七天)。用戶能夠針對每一個列族進行設置。
命名空間的結構:
1) Table:表,全部的表都是命名空間的成員,即表必屬於某個命名空間,若是沒有指定,則在default默認的命名空間中。
2) RegionServer group:一個命名空間包含了默認的RegionServer Group。
3) Permission:權限,命名空間可以讓咱們來定義訪問控制列表ACL(Access Control List)。例如,建立表,讀取表,刪除,更新等等操做。
4) Quota:限額,能夠強制一個命名空間可包含的region的數量。
HBase讀數據流程如圖3所示
圖3所示 HBase讀數據流程
1)Client先訪問zookeeper,從meta表讀取region的位置,而後讀取meta表中的數據。meta中又存儲了用戶表的region信息;
2)根據namespace、表名和rowkey在meta表中找到對應的region信息;
3)找到這個region對應的regionserver;
4)查找對應的region;
5)先從MemStore找數據,若是沒有,再到BlockCache裏面讀;
6)BlockCache尚未,再到StoreFile上讀(爲了讀取的效率);
7)若是是從StoreFile裏面讀取的數據,不是直接返回給客戶端,而是先寫入BlockCache,再返回給客戶端。
Hbase寫流程如圖2所示
圖2 HBase寫數據流程
1)Client向HregionServer發送寫請求;
2)HregionServer將數據寫到HLog(write ahead log)。爲了數據的持久化和恢復;
3)HregionServer將數據寫到內存(MemStore);
4)反饋Client寫成功。
1)當MemStore數據達到閾值(默認是128M,老版本是64M),將數據刷到硬盤,將內存中的數據刪除,同時刪除HLog中的歷史數據;
2)並將數據存儲到HDFS中;
3)在HLog中作標記點。
1)當數據塊達到4塊,Hmaster觸發合併操做,Region將數據塊加載到本地,進行合併;
2)當合並的數據超過256M,進行拆分,將拆分後的Region分配給不一樣的HregionServer管理;
3)當HregionServer宕機後,將HregionServer上的hlog拆分,而後分配給不一樣的HregionServer加載,修改.META.;
4)注意:HLog會同步到HDFS。
新建項目後在pom.xml中添加依賴:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency>
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency>
<dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> |
public static Configuration conf; static{ //使用HBaseConfiguration的單例方法實例化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.9.102"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } |
public static boolean isTableExist(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //在HBase中管理、訪問表須要先建立HBaseAdmin對象 //Connection connection = ConnectionFactory.createConnection(conf); //HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); HBaseAdmin admin = new HBaseAdmin(conf); return admin.tableExists(tableName); } |
public static void createTable(String tableName, String... columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ HBaseAdmin admin = new HBaseAdmin(conf); //判斷表是否存在 if(isTableExist(tableName)){ System.out.println("表" + tableName + "已存在"); //System.exit(0); }else{ //建立表屬性對象,表名須要轉字節 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); //建立多個列族 for(String cf : columnFamily){ descriptor.addFamily(new HColumnDescriptor(cf)); } //根據對錶的配置,建立表 admin.createTable(descriptor); System.out.println("表" + tableName + "建立成功!"); } } |
public static void dropTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ HBaseAdmin admin = new HBaseAdmin(conf); if(isTableExist(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("表" + tableName + "刪除成功!"); }else{ System.out.println("表" + tableName + "不存在!"); } } |
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException{ //建立HTable對象 HTable hTable = new HTable(conf, tableName); //向表中插入數據 Put put = new Put(Bytes.toBytes(rowKey)); //向Put對象中組裝數據 put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); hTable.put(put); hTable.close(); System.out.println("插入數據成功"); } |
public static void deleteMultiRow(String tableName, String... rows) throws IOException{ HTable hTable = new HTable(conf, tableName); List<Delete> deleteList = new ArrayList<Delete>(); for(String row : rows){ Delete delete = new Delete(Bytes.toBytes(row)); deleteList.add(delete); } hTable.delete(deleteList); hTable.close(); } |
public static void getAllRows(String tableName) throws IOException{ HTable hTable = new HTable(conf, tableName); //獲得用於掃描region的對象 Scan scan = new Scan(); //使用HTable獲得resultcanner實現類的對象 ResultScanner resultScanner = hTable.getScanner(scan); for(Result result : resultScanner){ Cell[] cells = result.rawCells(); for(Cell cell : cells){ //獲得rowkey System.out.println("行鍵:" + Bytes.toString(CellUtil.cloneRow(cell))); //獲得列族 System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } } |
public static void getRow(String tableName, String rowKey) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); //get.setMaxVersions();顯示全部版本 //get.setTimeStamp();顯示指定時間戳的版本 Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行鍵:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("時間戳:" + cell.getTimestamp()); } } |
public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行鍵:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } |
經過HBase的相關JavaAPI,咱們能夠實現伴隨HBase操做的MapReduce過程,好比使用MapReduce將數據從本地文件系統導入到HBase的表中,好比咱們從HBase中讀取一些原始數據後使用MapReduce作數據分析。
1.查看HBase的MapReduce任務的執行
$ bin/hbase mapredcp
2.環境變量的導入
(1)執行環境變量的導入(臨時生效,在命令行執行下述操做)
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile配置
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-2.7.2
並在hadoop-env.sh中配置:(注意:在for循環以後配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
3.運行官方的MapReduce任務
-- 案例一:統計Student表中有多少行數據
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
-- 案例二:使用MapReduce將本地數據導入到HBase
1)在本地建立一個tsv格式的文件:fruit.tsv
1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow |
2)建立HBase表
hbase(main):001:0> create 'fruit','info'
3)在HDFS中建立input_fruit文件夾並上傳fruit.tsv文件
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
hbase(main):001:0> scan 'fruit'
目標:將fruit表中的一部分數據,經過MR遷入到fruit_mr表中。
分步實現:
1.構建ReadFruitMapper類,用於讀取fruit表中的數據
package com.atguigu;
import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //將fruit的name和color提取出來,至關於將每一行數據讀取出來放入到Put對象中。 Put put = new Put(key.get()); //遍歷添加column行 for(Cell cell: value.rawCells()){ //添加/克隆列族:info if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ //添加/克隆列:name if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //將該列cell加入到put對象中 put.add(cell); //添加/克隆列:color }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //向該列cell加入到put對象中 put.add(cell); } } } //將從fruit讀取到的每行數據寫入到context中做爲map的輸出 context.write(key, put); } } |
2. 構建WriteFruitMRReducer類,用於將讀取到的fruit表中的數據寫入到fruit_mr表中
package com.atguigu.hbase_mr;
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //讀出來的每一行數據寫入到fruit_mr表中 for(Put put: values){ context.write(NullWritable.get(), put); } } } |
3.構建Fruit2FruitMRRunner extends Configured implements Tool用於組裝運行Job任務
//組裝Job public int run(String[] args) throws Exception { //獲得Configuration Configuration conf = this.getConf(); //建立Job任務 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Fruit2FruitMRRunner.class);
//配置Job Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500);
//設置Mapper,注意導入的是mapreduce包下的,不是mapred包下的,後者是老版本 TableMapReduceUtil.initTableMapperJob( "fruit", //數據源的表名 scan, //scan掃描控制器 ReadFruitMapper.class,//設置Mapper類 ImmutableBytesWritable.class,//設置Mapper輸出key類型 Put.class,//設置Mapper輸出value值類型 job//設置給哪一個JOB ); //設置Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job); //設置Reduce數量,最少1個 job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } |
4.主函數中調用運行該Job任務
public static void main( String[] args ) throws Exception{ Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args); System.exit(status); } |
5.打包運行任務
$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar
com.z.hbase.mr1.Fruit2FruitMRRunner
提示:運行任務前,若是待數據導入的表不存在,則須要提早建立。
提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,須要插件:maven-shade-plugin)
目標:實現將HDFS中的數據寫入到HBase表中。
分步實現:
1.構建ReadFruitFromHDFSMapper於讀取HDFS中的文件數據
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //從HDFS中讀取的數據 String lineValue = value.toString(); //讀取出來的每行數據使用\t進行分割,存於String數組 String[] values = lineValue.split("\t");
//根據數據中值的含義取值 String rowKey = values[0]; String name = values[1]; String color = values[2];
//初始化rowKey ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化put對象 Put put = new Put(Bytes.toBytes(rowKey));
//參數分別:列族、列、值 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put); } } |
2.構建WriteFruitMRFromTxtReducer類
package com.z.hbase.mr2;
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //讀出來的每一行數據寫入到fruit_hdfs表中 for(Put put: values){ context.write(NullWritable.get(), put); } } } |
3.建立Txt2FruitRunner組裝Job
public int run(String[] args) throws Exception { //獲得Configuration Configuration conf = this.getConf();
//建立Job任務 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Txt2FruitRunner.class); Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv"); FileInputFormat.addInputPath(job, inPath);
//設置Mapper job.setMapperClass(ReadFruitFromHDFSMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class);
//設置Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);
//設置Reduce數量,最少1個 job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); }
return isSuccess ? 0 : 1; } |
4.調用執行Job
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Txt2FruitRunner(), args); System.exit(status); } |
5.打包運行
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner
提示:運行任務前,若是待數據導入的表不存在,則須要提早建立之。
提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,須要插件:maven-shade-plugin)
1.Hive
(1) 數據倉庫
Hive的本質其實就至關於將HDFS中已經存儲的文件在Mysql中作了一個雙射關係,以方便使用HQL去管理查詢。
(2) 用於數據分析、清洗
Hive適用於離線的數據分析和清洗,延遲較高。
(3) 基於HDFS、MapReduce
Hive存儲的數據依舊在DataNode上,編寫的HQL語句終將是轉換爲MapReduce代碼執行。
2.HBase
(1) 數據庫
是一種面向列存儲的非關係型數據庫。
(2) 用於存儲結構化和非結構化的數據
適用於單表非關係型數據的存儲,不適合作關聯查詢,相似JOIN等操做。
(3) 基於HDFS
數據持久化存儲的體現形式是Hfile,存放於DataNode中,被ResionServer以region的形式進行管理。
(4) 延遲較低,接入在線業務使用
面對大量的企業數據,HBase能夠直線單表大量數據的存儲,同時提供了高效的數據訪問速度。
尖叫提示:HBase與Hive的集成在最新的兩個版本中沒法兼容。因此,咱們只能含着淚勇敢的從新編譯:hive-hbase-handler-1.2.2.jar!!好氣!!
環境準備
由於咱們後續可能會在操做Hive的同時對HBase也會產生影響,因此Hive須要持有操做HBase的Jar,那麼接下來拷貝Hive所依賴的Jar包(或者使用軟鏈接的形式)。
export HBASE_HOME=/opt/module/hbase export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar |
同時在hive-site.xml中修改zookeeper的屬性,以下:
<property> <name>hive.zookeeper.quorum</name> <value>hadoop102,hadoop103,hadoop104</value> <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property> <property> <name>hive.zookeeper.client.port</name> <value>2181</value> <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property> |
1.案例一
目標:創建Hive表,關聯HBase表,插入數據到Hive表的同時可以影響HBase表。
分步實現:
(1) 在Hive中建立表同時關聯HBase
CREATE TABLE hive_hbase_emp_table( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table"); |
提示:完成以後,能夠分別進入Hive和HBase查看,都生成了對應的表
(2) 在Hive中建立臨時中間表,用於load文件中的數據
提示:不能將數據直接load進Hive所關聯HBase的那張表中
CREATE TABLE emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) row format delimited fields terminated by '\t'; |
(3) 向Hive中間表中load數據
hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp; |
(4) 經過insert命令將中間表中的數據導入到Hive關聯HBase的那張表中
hive> insert into table hive_hbase_emp_table select * from emp; |
(5) 查看Hive以及關聯的HBase表中是否已經成功的同步插入了數據
Hive:
hive> select * from hive_hbase_emp_table; |
HBase:
hbase> scan 'hbase_emp_table' |
2.案例二
目標:在HBase中已經存儲了某一張表hbase_emp_table,而後在Hive中建立一個外部表來關聯HBase中的hbase_emp_table這張表,使之能夠藉助Hive來分析HBase這張表中的數據。
注:該案例2緊跟案例1的腳步,因此完成此案例前,請先完成案例1。
分步實現:
(1) 在Hive中建立外部表
CREATE EXTERNAL TABLE relevance_hbase_emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table"); |
(2) 關聯後就可使用Hive函數進行一些分析操做了
hive (default)> select * from relevance_hbase_emp; |
在HBase中Hmaster負責監控RegionServer的生命週期,均衡RegionServer的負載,若是Hmaster掛掉了,那麼整個HBase集羣將陷入不健康的狀態,而且此時的工做狀態並不會維持過久。因此HBase支持對Hmaster的高可用配置。
1.關閉HBase集羣(若是沒有開啓則跳過此步)
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
2.在conf目錄下建立backup-masters文件
[atguigu@hadoop102 hbase]$ touch conf/backup-masters
3.在backup-masters文件中配置高可用HMaster節點
[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters
4.將整個conf目錄scp到其餘節點
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/
5.打開頁面測試查看
每個region維護着startRow與endRowKey,若是加入的數據符合某個region維護的rowKey範圍,則該數據交給這個region維護。那麼依照這個原則,咱們能夠將數據所要投放的分區提早大體的規劃好,以提升HBase性能。
1.手動設定預分區
hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']
2.生成16進制序列預分區
create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
3.按照文件中設置的規則預分區
建立splits.txt文件內容以下:
aaaa bbbb cccc dddd |
而後執行:
create 'staff3','partition3',SPLITS_FILE => 'splits.txt'
4.使用JavaAPI建立預分區
//自定義算法,產生一系列Hash散列值存儲在二維數組中 byte[][] splitKeys = 某個散列值函數 //建立HBaseAdmin實例 HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create()); //建立HTableDescriptor實例 HTableDescriptor tableDesc = new HTableDescriptor(tableName); //經過HTableDescriptor實例和散列值二維數組建立帶有預分區的HBase表 hAdmin.createTable(tableDesc, splitKeys); |
一條數據的惟一標識就是rowkey,那麼這條數據存儲於哪一個分區,取決於rowkey處於哪一個一個預分區的區間內,設計rowkey的主要目的 ,就是讓數據均勻的分佈於全部的region中,在必定程度上防止數據傾斜。接下來咱們就談一談rowkey經常使用的設計方案。
1.生成隨機數、hash、散列值
好比: 本來rowKey爲1001的,SHA1後變成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7 本來rowKey爲3001的,SHA1後變成:49042c54de64a1e9bf0b33e00245660ef92dc7bd 本來rowKey爲5001的,SHA1後變成:7b61dec07e02c188790670af43e717f0f46e8913 在作此操做以前,通常咱們會選擇從數據集中抽取樣本,來決定什麼樣的rowKey來Hash後做爲每一個分區的臨界值。 |
2.字符串反轉
20170524000001轉成10000042507102 20170524000002轉成20000042507102 |
這樣也能夠在必定程度上散列逐步put進來的數據。
3.字符串拼接
20170524000001_a12e 20170524000001_93i7 |
HBase操做過程當中須要大量的內存開銷,畢竟Table是能夠緩存在內存中的,通常會分配整個可用內存的70%給HBase的Java堆。可是不建議分配很是大的堆內存,由於GC過程持續過久會致使RegionServer處於長期不可用狀態,通常16~48G內存就能夠了,若是由於框架佔用內存太高致使系統內存不足,框架同樣會被系統服務拖死。
1.容許在HDFS的文件中追加內容
hdfs-site.xml、hbase-site.xml
屬性:dfs.support.append 解釋:開啓HDFS追加同步,能夠優秀的配合HBase的數據同步和持久化。默認值爲true。 |
2.優化DataNode容許的最大文件打開數
hdfs-site.xml
屬性:dfs.datanode.max.transfer.threads 解釋:HBase通常都會同一時間操做大量的文件,根據集羣的數量和規模以及數據動做,設置爲4096或者更高。默認值:4096 |
3.優化延遲高的數據操做的等待時間
hdfs-site.xml
屬性:dfs.image.transfer.timeout 解釋:若是對於某一次數據操做來說,延遲很是高,socket須要等待更長的時間,建議把該值設置爲更大的值(默認60000毫秒),以確保socket不會被timeout掉。 |
4.優化數據的寫入效率
mapred-site.xml
屬性: mapreduce.map.output.compress mapreduce.map.output.compress.codec 解釋:開啓這兩個數據能夠大大提升文件的寫入效率,減小寫入時間。第一個屬性值修改成true,第二個屬性值修改成:org.apache.hadoop.io.compress.GzipCodec或者其餘壓縮方式。 |
5.設置RPC監聽數量
hbase-site.xml
屬性:hbase.regionserver.handler.count 解釋:默認值爲30,用於指定RPC監聽的數量,能夠根據客戶端的請求數進行調整,讀寫請求較多時,增長此值。 |
6.優化HStore文件大小
hbase-site.xml
屬性:hbase.hregion.max.filesize 解釋:默認值10737418240(10GB),若是須要運行HBase的MR任務,能夠減少此值,由於一個region對應一個map任務,若是單個region過大,會致使map任務執行時間過長。該值的意思就是,若是HFile的大小達到這個數值,則這個region會被切分爲兩個Hfile。 |
7.優化hbase客戶端緩存
hbase-site.xml
屬性:hbase.client.write.buffer 解釋:用於指定HBase客戶端緩存,增大該值能夠減小RPC調用次數,可是會消耗更多內存,反之則反之。通常咱們須要設定必定的緩存大小,以達到減小RPC次數的目的。 |
8.指定scan.next掃描HBase所獲取的行數
hbase-site.xml
屬性:hbase.client.scanner.caching 解釋:用於指定scan.next方法獲取的默認行數,值越大,消耗內存越大。 |
9.flush、compact、split機制
當MemStore達到閾值,將Memstore中的數據Flush進Storefile;compact機制則是把flush出來的小文件合併成大的Storefile文件。split則是當Region達到閾值,會把過大的Region一分爲二。
涉及屬性:
即:128M就是Memstore的默認閾值
hbase.hregion.memstore.flush.size:134217728 |
即:這個參數的做用是當單個HRegion內全部的Memstore大小總和超過指定值時,flush該HRegion的全部memstore。RegionServer的flush是經過將請求添加一個隊列,模擬生產消費模型來異步處理的。那這裏就有一個問題,當隊列來不及消費,產生大量積壓請求時,可能會致使內存陡增,最壞的狀況是觸發OOM。
hbase.regionserver.global.memstore.upperLimit:0.4 hbase.regionserver.global.memstore.lowerLimit:0.38 |
即:當MemStore使用內存總量達到hbase.regionserver.global.memstore.upperLimit指定值時,將會有多個MemStores flush到文件中,MemStore flush 順序是按照大小降序執行的,直到刷新到MemStore使用內存略小於lowerLimit
1) 微博內容的瀏覽,數據庫表設計
2) 用戶社交體現:關注用戶,取關用戶
3) 拉取關注的人的微博內容
1) 建立命名空間以及表名的定義
2) 建立微博內容表
3) 建立用戶關係表
4) 建立用戶微博內容接收郵件表
5) 發佈微博內容
6) 添加關注用戶
7) 移除(取關)用戶
8) 獲取關注的人的微博內容
9) 測試
//獲取配置conf
private Configuration conf = HBaseConfiguration.create();
//微博內容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//用戶關係表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微博收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//命名空間相似於關係型數據庫中的schema,能夠想象成文件夾
NamespaceDescriptor weibo = NamespaceDescriptor
.create("weibo")
.addConfiguration("creator", "Jinji")
.addConfiguration("create_time", System.currentTimeMillis() + "")
.build();
admin.createNamespace(weibo);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
表結構:
方法名 |
creatTableeContent |
Table Name |
weibo:content |
RowKey |
用戶ID_時間戳 |
ColumnFamily |
info |
ColumnLabel |
標題,內容,圖片 |
Version |
1個版本 |
代碼:
/**
* 建立微博內容表
* Table Name:weibo:content
* RowKey:用戶ID_時間戳
* ColumnFamily:info
* ColumnLabel:標題 內容 圖片URL
* Version:1個版本
*/
public void createTableContent(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//建立表表述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//建立列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//設置塊緩存
info.setBlockCacheEnabled(true);
//設置塊緩存大小
info.setBlocksize(2097152);
//設置壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設置版本確界
info.setMaxVersions(1);
info.setMinVersions(1);
content.addFamily(info);
admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
表結構:
方法名 |
createTableRelations |
Table Name |
weibo:relations |
RowKey |
用戶ID |
ColumnFamily |
attends、fans |
ColumnLabel |
關注用戶ID,粉絲用戶ID |
ColumnValue |
用戶ID |
Version |
1個版本 |
代碼:
/**
* 用戶關係表
* Table Name:weibo:relations
* RowKey:用戶ID
* ColumnFamily:attends,fans
* ColumnLabel:關注用戶ID,粉絲用戶ID
* ColumnValue:用戶ID
* Version:1個版本
*/
public void createTableRelations(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
//關注的人的列族
HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
//設置塊緩存
attends.setBlockCacheEnabled(true);
//設置塊緩存大小
attends.setBlocksize(2097152);
//設置壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設置版本確界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉絲列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
表結構:
方法名 |
createTableReceiveContentEmails |
Table Name |
weibo:receive_content_email |
RowKey |
用戶ID |
ColumnFamily |
info |
ColumnLabel |
用戶ID |
ColumnValue |
取微博內容的RowKey |
Version |
1000 |
代碼:
/**
* 建立微博收件箱表
* Table Name: weibo:receive_content_email
* RowKey:用戶ID
* ColumnFamily:info
* ColumnLabel:用戶ID-發佈微博的人的用戶ID
* ColumnValue:關注的人的微博的RowKey
* Version:1000
*/
public void createTableReceiveContentEmail(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
info.setBlockCacheEnabled(true);
info.setBlocksize(2097152);
info.setMaxVersions(1000);
info.setMinVersions(1000);
receive_content_email.addFamily(info);;
admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
a、微博內容表中添加1條數據
b、微博收件箱表對全部粉絲用戶添加數據
代碼:Message.java
package com.atguigu.weibo;
public class Message {
private String uid;
private String timestamp;
private String content;
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
}
}
代碼:public void publishContent(String uid, String content)
/**
* 發佈微博
* a、微博內容表中數據+1
* b、向微博收件箱表中加入微博的Rowkey
*/
public void publishContent(String uid, String content){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、微博內容表中添加1條數據,首先獲取微博內容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
//組裝Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
contentTBL.put(put);
//b、向微博收件箱表中加入發佈的Rowkey
//b.1、查詢用戶關係表,獲得當前用戶有哪些粉絲
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//b.2、取出目標數據
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<byte[]>();
//遍歷取出當前發佈微博的用戶的全部粉絲數據
for(Cell cell : result.rawCells()){
fans.add(CellUtil.cloneQualifier(cell));
}
//若是該用戶沒有粉絲,則直接return
if(fans.size() <= 0) return;
//開始操做收件箱表
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
List<Put> puts = new ArrayList<Put>();
for(byte[] fan : fans){
Put fanPut = new Put(fan);
fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
puts.add(fanPut);
}
recTBL.put(puts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
a、在微博用戶關係表中,對當前主動操做的用戶添加新關注的好友
b、在微博用戶關係表中,對被關注的用戶添加新的粉絲
c、微博收件箱表中添加所關注的用戶發佈的微博
代碼實現:public void addAttends(String uid, String... attends)
/**
* 關注用戶邏輯
* a、在微博用戶關係表中,對當前主動操做的用戶添加新的關注的好友
* b、在微博用戶關係表中,對被關注的用戶添加粉絲(當前操做的用戶)
* c、當前操做用戶的微博收件箱添加所關注的用戶發佈的微博rowkey
*/
public void addAttends(String uid, String... attends){
//參數過濾
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
return;
}
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//用戶關係表操做對象(鏈接到用戶關係表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
List<Put> puts = new ArrayList<Put>();
//a、在微博用戶關係表中,添加新關注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends){
//爲當前用戶添加關注的人
attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b、爲被關注的人,添加粉絲
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
//將全部關注的人一個一個的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);
//c.1、微博收件箱添加關注的用戶發佈的微博內容(content)的rowkey
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
Scan scan = new Scan();
//用於存放取出來的關注的人所發佈的微博的rowkey
List<byte[]> rowkeys = new ArrayList<byte[]>();
for(String attend : attends){
//過濾掃描rowkey,即:前置位匹配被關注的人的uid_
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
//爲掃描對象指定過濾規則
scan.setFilter(filter);
//經過掃描對象獲得scanner
ResultScanner result = contentTBL.getScanner(scan);
//迭代器遍歷掃描出來的結果集
Iterator<Result> iterator = result.iterator();
while(iterator.hasNext()){
//取出每個符合掃描結果的那一行數據
Result r = iterator.next();
for(Cell cell : r.rawCells()){
//將獲得的rowkey放置於集合容器中
rowkeys.add(CellUtil.cloneRow(cell));
}
}
}
//c.二、將取出的微博rowkey放置於當前操做用戶的收件箱中
if(rowkeys.size() <= 0) return;
//獲得微博收件箱表的操做對象
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//用於存放多個關注的用戶的發佈的多條微博rowkey信息
List<Put> recPuts = new ArrayList<Put>();
for(byte[] rk : rowkeys){
Put put = new Put(Bytes.toBytes(uid));
//uid_timestamp
String rowKey = Bytes.toString(rk);
//借取uid
String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
//將微博rowkey添加到指定單元格中
put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
recPuts.add(put);
}
recTBL.put(recPuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
a、在微博用戶關係表中,對當前主動操做的用戶移除取關的好友(attends)
b、在微博用戶關係表中,對被取關的用戶移除粉絲
c、微博收件箱中刪除取關的用戶發佈的微博
代碼:public void removeAttends(String uid, String... attends)
/**
* 取消關注(remove)
* a、在微博用戶關係表中,對當前主動操做的用戶刪除對應取關的好友
* b、在微博用戶關係表中,對被取消關注的人刪除粉絲(當前操做人)
* c、從收件箱中,刪除取關的人的微博的rowkey
*/
public void removeAttends(String uid, String... attends){
//過濾數據
if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、在微博用戶關係表中,刪除已關注的好友
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//待刪除的用戶關係表中的全部數據
List<Delete> deletes = new ArrayList<Delete>();
//當前取關操做者的uid對應的Delete對象
Delete attendDelete = new Delete(Bytes.toBytes(uid));
//遍歷取關,同時每次取關都要將被取關的人的粉絲-1
for(String attend : attends){
attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
//b
Delete fansDelete = new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendDelete);
relationsTBL.delete(deletes);
//c、刪除取關的人的微博rowkey 從 收件箱表中
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
Delete recDelete = new Delete(Bytes.toBytes(uid));
for(String attend : attends){
recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
}
recTBL.delete(recDelete);
} catch (IOException e) {
e.printStackTrace();
}
}
a、從微博收件箱中獲取所關注的用戶的微博RowKey
b、根據獲取的RowKey,獲得微博內容
代碼實現:public List<Message> getAttendsContent(String uid)
/**
* 獲取微博實際內容
* a、從微博收件箱中獲取全部關注的人的發佈的微博的rowkey
* b、根據獲得的rowkey去微博內容表中獲得數據
* c、將獲得的數據封裝到Message對象中
*/
public List<Message> getAttendsContent(String uid){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//a、從收件箱中取得微博rowKey
Get get = new Get(Bytes.toBytes(uid));
//設置最大版本號
get.setMaxVersions(5);
List<byte[]> rowkeys = new ArrayList<byte[]>();
Result result = recTBL.get(get);
for(Cell cell : result.rawCells()){
rowkeys.add(CellUtil.cloneValue(cell));
}
//b、根據取出的全部rowkey去微博內容表中檢索數據
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
List<Get> gets = new ArrayList<Get>();
//根據rowkey取出對應微博的具體內容
for(byte[] rk : rowkeys){
Get g = new Get(rk);
gets.add(g);
}
//獲得全部的微博內容的result對象
Result[] results = contentTBL.get(gets);
List<Message> messages = new ArrayList<Message>();
for(Result res : results){
for(Cell cell : res.rawCells()){
Message message = new Message();
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String userid = rowKey.substring(0, rowKey.indexOf("_"));
String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
String content = Bytes.toString(CellUtil.cloneValue(cell));
message.setContent(content);
message.setTimestamp(timestamp);
message.setUid(userid);
messages.add(message);
}
}
return messages;
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
-- 測試發佈微博內容
public void testPublishContent(WeiBo wb)
-- 測試添加關注
public void testAddAttend(WeiBo wb)
-- 測試取消關注
public void testRemoveAttend(WeiBo wb)
-- 測試展現內容
public void testShowMessage(WeiBo wb)
代碼:
/**
* 發佈微博內容
* 添加關注
* 取消關注
* 展現內容
*/
public void testPublishContent(WeiBo wb){
wb.publishContent("0001", "今天買了一包空氣,送了點薯片,很是開心!!");
wb.publishContent("0001", "今每天氣不錯。");
}
public void testAddAttend(WeiBo wb){
wb.publishContent("0008", "準備下課!");
wb.publishContent("0009", "準備關機!");
wb.addAttends("0001", "0008", "0009");
}
public void testRemoveAttend(WeiBo wb){
wb.removeAttends("0001", "0008");
}
public void testShowMessage(WeiBo wb){
List<Message> messages = wb.getAttendsContent("0001");
for(Message message : messages){
System.out.println(message);
}
}
public static void main(String[] args) {
WeiBo weibo = new WeiBo();
weibo.initTable();
weibo.testPublishContent(weibo);
weibo.testAddAttend(weibo);
weibo.testShowMessage(weibo);
weibo.testRemoveAttend(weibo);
weibo.testShowMessage(weibo);
}
天天:
1) 消息量:發送和接收的消息數超過60億
2) 將近1000億條數據的讀寫
3) 高峯期每秒150萬左右操做
4) 總體讀取數據佔有約55%,寫入佔有45%
5) 超過2PB的數據,涉及冗餘共6PB數據
6) 數據每個月大概增加300千兆字節。
在平常生活中,包括在設計計算機軟件時,咱們常常要判斷一個元素是否在一個集合中。好比在字處理軟件中,須要檢查一個英語單詞是否拼寫正確(也就是要判斷它是否在已知的字典中);在 FBI,一個嫌疑人的名字是否已經在嫌疑名單上;在網絡爬蟲裏,一個網址是否被訪問過等等。最直接的方法就是將集合中所有的元素存在計算機中,遇到一個新元素時,將它和集合中的元素直接比較便可。通常來說,計算機中的集合是用哈希表(hash table)來存儲的。它的好處是快速準確,缺點是費存儲空間。當集合比較小時,這個問題不顯著,可是當集合巨大時,哈希表存儲效率低的問題就顯現出來了。好比說,一個像 Yahoo,Hotmail 和 Gmai 那樣的公衆電子郵件(email)提供商,老是須要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的 email 地址。因爲那些發送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則須要大量的網絡服務器。若是用哈希表,每存儲一億個 email 地址, 就須要 1.6GB 的內存(用哈希表實現的具體辦法是將每個 email 地址對應成一個八字節的信息指紋googlechinablog.com/2006/08/blog-post.html,而後將這些信息指紋存入哈希表,因爲哈希表的存儲效率通常只有 50%,所以一個 email 地址須要佔用十六個字節。一億個地址大約要 1.6GB, 即十六億字節的內存)。所以存貯幾十億個郵件地址可能須要上百 GB 的內存。除非是超級計算機,通常服務器是沒法存儲的。
布隆過濾器只須要哈希表 1/8 到 1/4 的大小就能解決一樣的問題。
Bloom Filter是一種空間效率很高的隨機數據結構,它利用位數組很簡潔地表示一個集合,並能判斷一個元素是否屬於這個集合。Bloom Filter的這種高效是有必定代價的:在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認爲屬於這個集合(false positive)。所以,Bloom Filter不適合那些"零錯誤"的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter經過極少的錯誤換取了存儲空間的極大節省。
下面咱們具體來看Bloom Filter是如何用位數組表示集合的。初始狀態時,Bloom Filter是一個包含m位的位數組,每一位都置爲0,如圖9-5所示。
圖9-5
爲了表達S={x1, x2,…,xn}這樣一個n個元素的集合,Bloom Filter使用k個相互獨立的哈希函數(Hash Function),它們分別將集合中的每一個元素映射到{1,…,m}的範圍中。對任意一個元素x,第i個哈希函數映射的位置hi(x)就會被置爲1(1≤i≤k)。注意,若是一個位置屢次被置爲1,那麼只有第一次會起做用,後面幾回將沒有任何效果。如圖9-6所示,k=3,且有兩個哈希函數選中同一個位置(從左邊數第五位)。
圖9-6
在判斷y是否屬於這個集合時,咱們對y應用k次哈希函數,若是全部hi(y)的位置都是1(1≤i≤k),那麼咱們就認爲y是集合中的元素,不然就認爲y不是集合中的元素。如圖9-7所示y1就不是集合中的元素。y2或者屬於這個集合,或者恰好是一個false positive。
圖9-7
· 爲了add一個元素,用k個hash function將它hash獲得bloom filter中k個bit位,將這k個bit位置1。
· 爲了query一個元素,即判斷它是否在集合中,用k個hash function將它hash獲得k個bit位。若這k bits全爲1,則此元素在集合中;若其中任一位不爲1,則此元素比不在集合中(由於若是在,則在add時已經把對應的k個bits位置爲1)。
· 不容許remove元素,由於那樣的話會把相應的k個bits位置爲0,而其中頗有可能有其餘元素對應的位。所以remove會引入false negative,這是絕對不被容許的。
布隆過濾器決不會漏掉任何一個在黑名單中的可疑地址。可是,它有一條不足之處,也就是它有極小的可能將一個不在黑名單中的電子郵件地址斷定爲在黑名單中,由於有可能某個好的郵件地址正巧對應一個八個都被設置成一的二進制位。好在這種可能性很小,咱們把它稱爲誤識機率。
布隆過濾器的好處在於快速,省空間,可是有必定的誤識別率,常見的補救辦法是在創建一個小的白名單,存儲那些可能個別誤判的郵件地址。
布隆過濾器具體算法高級內容,如錯誤率估計,最優哈希函數個數計算,位數組大小計算,請參見http://blog.csdn.net/jiaomeng/article/details/1495500。
2017年8月22日凌晨2點左右,HBase發佈了2.0.0 alpha-2,相比於上一個版本,修復了500個補丁,咱們來了解一下2.0版本的HBase新特性。
最新文檔:
http://hbase.apache.org/book.html#ttl
官方發佈主頁:
舉例:
1) region進行了多份冗餘
主region負責讀寫,從region維護在其餘HregionServer中,負責讀以及同步主region中的信息,若是同步不及時,是有可能出現client在從region中讀到了髒數據(主region還沒來得及把memstore中的變更的內容flush)。
2) 更多變更