Phoenix是一個HBase的開源SQL引擎。你可使用標準的JDBC API代替HBase客戶端API來建立表,插入數據,查詢你的HBase數據。
Phoenix是構建在HBase之上的SQL引擎。你也許會存在「Phoenix是否會下降HBase的效率?」或者「Phoenix效率是否很低?」這樣的疑慮,事實上並不會,Phoenix經過如下方式實現了比你本身手寫的方式相同或者多是更好的性能(更不用說能夠少寫了不少代碼): html
除此以外,Phoenix還作了一些有趣的加強功能來更多地優化性能: java
如下是Phoenix對(吊)比(打)Hive、Impala的測試:git
Phoenix VS Hive
Query: select count(1) from table over 10M and 100M rows. Data is 5 narrow columns. Number of Region Servers: 4 (HBase heap: 10GB, Processor: 6 cores @ 3.3GHz Xeon)github
Phoenix vs Impala
Query: select count(1) from table over 1M and 5M rows. Data is 3 narrow columns. Number of Region Server: 1 (Virtual Machine, HBase heap: 2GB, Processor: 2 cores @ 3.3GHz Xeon)算法
目前有哪些公司在使用Phoenix?sql
首先,經過命令hbase version
能夠查看到
咱們的HBase版本是1.2.0的。
接下來咱們到 http://archive.apache.org/dist/phoenix 下載對應版本的安裝包。這裏咱們選擇apache-phoenix-4.8.1-HBase-1.2-bin.tar.gz 這個版本。shell
咱們將下載的Phoenix壓縮包上傳到Master節點(任意一個都行)中,而後解壓
咱們能夠看到裏面包含了不少組件的jar包,咱們只須要將phoenix-core-4.8.1-HBase-1.2.jar
和phoenix-4.8.1-HBase-1.2-client.jar
拷貝到HBase的lib目錄下,而後將HBase的配置文件hbase-site.xml
文件拷貝到Phoenix解壓的目錄下的bin目錄。而後重啓HBase
輸入bin/sqlline.py bqdpm1,bqdpm2,bqdps1:2181
。結果報錯
數據庫
很明顯,這裏提示包衝突了。咱們回想一下,下載的phoenix的包本來是從apache的官方下載的,裏面打包的是apache的hadoop和hbase,也就是說並不支持cdh。那應該怎麼辦呢?apache
因爲Phoenix工程裏面使用的依賴都是Apache原版的jar包,所以咱們須要修改成CDH的依賴。能夠參考編譯phoenix用於CDH平臺數組
修改了依賴事後還須要修改部分代碼才行,這樣就比較麻煩了。好在咱們有萬能的github,已經有大神幫咱們作好了修改,能夠直接拿下來用。連接:phoenix-for-cloudera。雖然工程上寫着是CDH4.8,可是實際上4.7也能用。
將工程克隆下來或者直接批量下載下來,解壓後能夠看到以下目錄
很明顯,這是一個maven工程。在肯定電腦安裝了maven以後,使用命令mvn clean package -DskipTests -Dcdh.flume.version=1.4.0
。這裏的flume版本須要指定爲咱們須要的Flume版本,CDH4.7中使用的是1.4。接下來就是漫長的等待。。。
最終,編譯的jar包和工程的文件將會打包到phoenix-assembly/target
中
將打包好的phoenix-4.8.0-cdh5.8.0.tar.gz文件上傳到CDH環境中,而後解壓能夠看到以下文件:
而後將phoenix-4.8.0-cdh5.8.0-server.jar
文件拷貝到各個節點的HBase依賴路徑下,即/opt/cloudera/parcels/CDH/lib/hbase/lib/
再將hbase的配置文件hbase-site.xml
拷貝到bin目錄下便可。
而後進入bin目錄,執行./sqlline.py bqdpm1,bqdpm2,bqdps1:2181
看到以下信息說明成功
若是出現下面問題
則須要檢查hdfs的權限控制是否關閉了。而後執行hbase clean --cleanZk
最後重啓HBase便可
首先建立us_population.sql文件,裏面的建立一個名爲us_population的表
CREATE TABLE IF NOT EXISTS us_population ( state CHAR(2) NOT NULL, city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk PRIMARY KEY (state, city));
接下來新建一個數據文件us_population.csv
NY,New York,8143197CA,Los Angeles,3844829IL,Chicago,2842518TX,Houston,2016582PA,Philadelphia,1463281AZ,Phoenix,1461575TX,San Antonio,1256509CA,San Diego,1255540TX,Dallas,1213825CA,San Jose,912332
最後建立一個查詢sql文件us_population_queries.sql
SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"FROM us_populationGROUP BY stateORDER BY sum(population) DESC;
執行../bin/psql.py bqdpm1,bqdpm2,bqdps1:2181 us_population.sql us_population.csv us_population_queries.sql
這裏的命令中的us_population.sql和us_population.csv必須同名
其實,建立了表以後咱們單獨運行../bin/psql.py bqdpm1,bqdpm2,bqdps1:2181 us_population_queries.sql
也是能夠的
經過Phoenix建的表都會自動轉成大寫,若是須要使用小寫的表,請使用
create table "tablename"
安裝了Phoenix以後就會存在四張系統表
在Phoenix中建立的表同時會在HBase中建立一張表與之對應
使用./sqlline.py bqdpm1,bqdpm2,bqdps1:2181
登陸到Phoenix的shell中,可使用正常的SQL語句進行操做,
可使用!table
查看錶信息
使用!describe tablename
能夠查看錶字段信息
使用!history能夠查看執行的歷史SQL
使用!dbinfo
能夠查看Phoenix全部的屬性配置
除了上面這些之外以外還有不少其餘操做,能夠用過help查看
0: jdbc:phoenix:bqdpm1,bqdpm2,bqdps1:2181> help!all Execute the specified SQL against all the current connections!autocommit Set autocommit mode on or off!batch Start or execute a batch of statements!brief Set verbose mode off!call Execute a callable statement!close Close the current connection to the database!closeall Close all current open connections!columns List all the columns for the specified table!commit Commit the current transaction (if autocommit is off)!connect Open a new connection to the database.!dbinfo Give metadata information about the database!describe Describe a table!dropall Drop all tables in the current database!exportedkeys List all the exported keys for the specified table!go Select the current connection!help Print a summary of command usage!history Display the command history!importedkeys List all the imported keys for the specified table!indexes List all the indexes for the specified table!isolation Set the transaction isolation for this connection!list List the current connections!manual Display the SQLLine manual!metadata Obtain metadata information!nativesql Show the native SQL for the specified statement!outputformat Set the output format for displaying results (table,vertical,csv,tsv,xmlattrs,xmlelements)!primarykeys List all the primary keys for the specified table!procedures List all the procedures!properties Connect to the database specified in the properties file(s)!quit Exits the program!reconnect Reconnect to the database!record Record all output to the specified file!rehash Fetch table and column names for command completion!rollback Roll back the current transaction (if autocommit is off)!run Run a script from the specified file!save Save the current variabes and aliases!scan Scan for installed JDBC drivers!script Start saving a script to a file!set Set a sqlline variable!sql Execute a SQL command!tables List all the tables in the database!typeinfo Display the type map for the current connection!verbose Set verbose mode on
首先須要保證電腦安裝了SQuirrel.
而後點擊Driver的增長按鈕,填上名字和Phoenix的jdbc地址示例如jdbc:phoenix:zookeeper quorum server
,而後指定外部驅動爲本地的phoenix-4.8.0-cdh5.8.0-client.jar
的org.apache.phoenix.jdbc.PhoenixDriver
而後添加鏈接,選擇驅動爲剛纔咱們添加的Phoenix驅動,而後URL寫上具體的zookeeper集羣,好比jdbc: phoenix:bqdpm1,bqdpm2,bqdps1
點擊OK
鏈接好了事後就能夠執行SQL語句了
這裏咱們就直接使用以前寫好的一個工程來進行測試吧。
首先添加數據庫連接的conf設置
phoenix { jdbcClass = org.apache.phoenix.jdbc.PhoenixDriver url = "jdbc:phoenix:bqdpm1,bqdpm2,bqdps1" user = "" password = "" }
而後講驅動jar文件引入工程,因爲咱們用的CDH版本的client,因此maven的中央倉庫搜不到,所以咱們只好手動添加。固然,後續會傳到nexus私服
接下來寫一個查詢sql運行,達到結果
Phoenix不建議使用鏈接池進行操做,詳見:https://phoenix.apache.org/faq.html#Is_there_a_way_to_bulk_load_in_Phoenix
在頁面 http://phoenix.apache.org/language/index.html 中給出了全部的操做說明。
數據類型 | Java Map | 佔用大小 | 範圍 (byte) |
---|---|---|---|
INTEGER | java.lang.Integer | 4 | -2147483648 to 2147483647 |
UNSIGNED_INT | java.lang.Integer | 4 | 0 to 2147483647 |
BIGINT | java.lang.Long | 8 | -9223372036854775807 to 9223372036854775807 |
UNSIGNED_LONG | java.lang.Long | 8 | 0 to 9223372036854775807 |
TINYINT | java.lang.Byte | 1 | -128 to 127 |
UNSIGNED_TINYINT | java.lang.Byte | 1 | 0 to 127 |
SMALLINT | java.lang.Short | 2 | -32768 to 32767 |
UNSIGNED_SMALLINT | java.lang.Short | 2 | 0 to 32767 |
FLOAT | java.lang.Float | 4 | -3.402823466 E + 38 to 3.402823466 E + 38 |
UNSIGNED_FLOAT | java.lang.Float | 4 | 0 to 3.402823466 E + 38 |
DOUBLE | java.lang.Double | 8 | -1.7976931348623158 E + 308 to 1.7976931348623158 E + 308 |
UNSIGNED_DOUBLE | java.lang.Double | 8 | 0 to 1.7976931348623158 E + 308 |
DECIMAL | java.math.BigDecimal | DECIMAL(p,s) | |
BOOLEAN | java.lang.Boolean | 1 | TRUE and FALSE |
TIME | java.sql.Time | 8 | yyyy-MM-dd hh:mm:ss |
DATE | java.sql.Date | 8 | yyyy-MM-dd hh:mm:ss, |
TIMESTAMP | java.sql.Timestamp | 12 | yyyy-MM-dd hh:mm:ss[.nnnnnnnnn] |
UNSIGNED_TIME | java.sql.Time | 8 | yyyy-MM-dd hh:mm:ss |
UNSIGNED_DATE | java.sql.Date | 8 | yyyy-MM-dd hh:mm:ss |
UNSIGNED_TIMESTAMP | java.sql.Timestamp | 12 | |
VARCHAR | java.lang.String | VARCHAR(n) | |
CHAR | java.lang.String | CHAR (n) | |
BINARY | byte[] | BINARY(n) | |
VARBINARY | byte[] | VARBINARY | |
ARRAY | java.sql.Array | VARCHAR ARRAY |
請移步http://phoenix.apache.org/language/datatypes.html 測試
可是實際使用中,咱們若是使用INTEGER等數值類型,必須將4個字節補全,不能直接在HBase中建寫入直接的數值,所以我建議若是要關聯已有的HBase表,最好直接使用VARCHAR類型
在Phoenix中是沒有Insert語句的,取而代之的是Upsert語句。Upsert有兩種用法,分別是:UPSERT INTO
和UPSERT SELECT
UPSERT INTO US_POPULATION VALUES('AK','Juneau',30711);UPSERT INTO US_POPULATION (STATE,CITY,POPULATION) VALUES('AK','Anchorage',260283);
UPSERT INTO US_POPULATION (STATE,CITY,POPULATION) SELECT STATE,CITY,POPULATION FROM AK_POPULATION WHERE POPULATION < 40000;
能夠看到,Phoenix將源表中人口數少於4萬的兩個城市信息插入到了目標表中
上面的sql語句中寫明瞭插入的字段,若是自己這兩張表徹底相同,或者某些字段相同,能夠直接這樣寫
UPSERT INTO US_POPULATION SELECT STATE,CITY,POPULATION FROM AK_POPULATION WHERE POPULATION > 40000;--或者UPSERT INTO US_POPULATION SELECT * FROM AK_POPULATION WHERE POPULATION > 40000;
注意:在phoenix中插入語句並不會像傳統數據庫同樣存在重複數據。由於Phoenix是構建在HBase之上的,也就是必須存在一個主鍵。而US_POPULATION這張表的主鍵是由(state, city)共同決定的,所以只要這兩個值相同的數據插入進去都是覆蓋操做。下面這張圖片就是US_POPULATION對應的HBase的主鍵字段
刪除數據和其餘數據庫類似
DELETE FROM US_POPULATION;DELETE FROM US_POPULATION WHERE CITY = 'Kenai';
能夠看到 CITY = 'Kenai'的這條記錄已經被刪除了
因爲HBase的主鍵設計,相同rowkey的內容能夠直接覆蓋,這就變相的更新了數據。因此Phoenix的更新操做仍舊是 3.2.1 的那兩種
好比我想將US_POPULATION中CITY = 'Juneau'的人口數修改成40711
UPSERT INTO US_POPULATION (STATE,CITY,POPULATION) VALUES('AK','Juneau',40711);
這裏咱們將所有字段都寫出來了的,若是我只想操做一列,我能簡化嗎?
答案是能夠,不過有個條件,就是必須包含生成HBase的rowkey的全部字段。不然會報如下錯誤:
這裏提示state不能爲空,也就是組成rowkey的state和city字段缺一不可。其實很好理解,畢竟HBase全部的操做都是圍繞着rowkey進行的。
Phoenix做爲SQL On HBase引擎必不可少的就是SQL查詢語句了,他能兼容大部分的SQL查詢語句,好比UNION ALL
GROUP BY
ORDER BY
LIMIT
下面是一些簡單的sql例子
SELECT * FROM TEST LIMIT 1000;SELECT * FROM TEST LIMIT 1000 OFFSET 100;SELECT full_name FROM SALES_PERSON WHERE ranking >= 5.0UNION ALL SELECT reviewer_name FROM CUSTOMER_REVIEW WHERE score >= 8.0
在Phoenix中是沒有Database的概念的,全部的表都在同一個命名空間。固然,Phoenix4.8開始支持多個命名空間了,在http://phoenix.apache.org/namspace_mapping.html 這個網頁說明了如何將Schema映射到命名空間中。
<property> <name>phoenix.schema.isNamespaceMappingEnabled</name> <value>true</value></property>
這裏必定要注意:若是設置爲true,建立的帶有schema的表將映射到一個namespace,這個須要客戶端和服務端同時設置。一旦設置爲true,就不能回滾了。舊的客戶端將沒法再正常工做。因此建議你們都查看官方文檔,肯定後再進行設置。
在官網上咱們看到Phoenix建立表的定義是這樣的:
從這裏咱們能夠猜想Phoenix建表的時候能夠定義表的屬性(包括了HBase的一些屬性)以及預分區操做。
咱們進入到tableOption下面能夠看到SALT_BUCKETS, DISABLE_WAL, IMMUTABLE_ROWS, MULTI_TENANT, DEFAULT_COLUMN_FAMILY, STORE_NULLS, TRANSACTIONAL
, and UPDATE_CACHE_FREQUENCY
這些屬性,下面咱們就一一解答。
Salting可以經過預分區(pre-splitting)數據到多個region中來顯著提高讀寫性能。
Salting 翻譯成中文是加鹽的意思,本質是在hbase中,rowkey的byte數組的第一個字節位置設定一個系統生成的byte值,這個byte值是由主鍵生成rowkey的byte數組作一個哈希算法,計算得來的。Salting以後能夠把數據分佈到不一樣的region上,這樣有利於phoenix併發的讀寫操做。關於SaltedTable的說明在 http://phoenix.apache.org/salted.html
CREATE TABLE TEST (HOST VARCHAR NOT NULL PRIMARY KEY, DESCRIPTION VARCHAR) SALT_BUCKETS=16;
SALT_BUCKETS的值範圍在(1 ~ 256)
接下來就開始劃重點了:
salted table能夠自動在每個rowkey前面加上一個字節,這樣對於一段連續的rowkeys,它們在表中實際存儲時,就被自動地分佈到不一樣的region中去了。當指定要讀寫該段區間內的數據時,也就避免了讀寫操做都集中在同一個region上。
簡而言之,若是咱們用Phoenix建立了一個saltedtable,那麼向該表中寫入數據時,原始的rowkey的前面會被自動地加上一個byte(不一樣的rowkey會被分配不一樣的byte),使得連續的rowkeys也能被均勻地分佈到多個regions。
爲了印證這一說法,我往TEST中添加幾分數據看看
UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.1','S1');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.2','S2');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.3','S3');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.4','S4');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.5','S5');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.6','S6');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.7','S7');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.8','S8');UPSERT INTO TEST (HOST,DESCRIPTION) values ('192.168.0.9','S9');
此時的HBase中的數據爲
能夠看到,在每條rowkey前面加了一個Byte,這裏顯示爲了16進制。也正是由於添加了一個Byte,因此SALT_BUCKETS的值範圍在必須再1 ~ 256之間。而添加的這個Byte是根據什麼來分的我就不得而知了,因此最好不要使用HBase的API插入數據。
所以,在使用SALT_BUCKETS的時候須要注意如下兩點:
建立salted table後,應該使用Phoenix SQL來讀寫數據,而不要混合使用Phoenix SQL和HBase API
若是經過Phoenix建立了一個salted table,那麼只有經過Phoenix SQL插入數據才能使得被插入的原始rowkey前面被自動加上一個byte,經過HBase shell插入數據沒法prefix原始的rowkey
Salting可以自動的設置表預分區,可是你得去控制表是如何分區的,因此在建phoenix表時,能夠精確的指定要根據什麼值來作預分區,好比:
CREATE TABLE TEST (HOST VARCHAR NOT NULL PRIMARY KEY, DESCRIPTION VARCHAR) SPLIT ON ('CS','EU','NA');
列族包含相關的數據都在獨立的文件中,在Phoenix設置多個列族能夠提升查詢性能。例如:
CREATE TABLE TEST (MYKEY VARCHAR NOT NULL PRIMARY KEY, A.COL1 VARCHAR,A.COL2 VARCHAR, B.COL3 VARCHAR);
插入下面的數據
UPSERT INTO TEST values ('key1','A1','B1','C1');UPSERT INTO TEST values ('key2','A2','B2','C2');UPSERT INTO TEST values ('key3','A3','B3','C3');UPSERT INTO TEST values ('key4','A4','B4','C4');UPSERT INTO TEST values ('key5','A5','B5','C5');UPSERT INTO TEST values ('key6','A6','B6','C6');UPSERT INTO TEST values ('key7','A7','B7','C7');UPSERT INTO TEST values ('key8','A8','B8','C8');UPSERT INTO TEST values ('key9','A9','B9','C9');
這樣就有兩個列族了
在數據量大的表上使用壓縮算法來提升性能。
例如:
CREATE TABLE TEST (HOST VARCHAR NOT NULL PRIMARY KEY, DESCRIPTION VARCHAR) COMPRESSION='Snappy';
刪除表和其餘的數據庫相似。不一樣的是能夠加上CASCADE關鍵字,用於刪除表的同時刪除基於該表的全部視圖。
DROP TABLE my_schema.my_table;DROP TABLE IF EXISTS my_table;DROP TABLE my_schema.my_table CASCADE;
CREATE VIEW "my_hbase_table" ( k VARCHAR primary key, "v" UNSIGNED_LONG) default_column_family='a';CREATE VIEW my_view ( new_col SMALLINT ) AS SELECT * FROM my_table WHERE k = 100;CREATE VIEW my_view_on_view AS SELECT * FROM my_view WHERE new_col > 70;
例子:
CREATE VIEW TEST_VIEW AS SELECT * FROM TEST where DESCRIPTION in ('S1','S2','S3')
結果:
除此以外,咱們還能在視圖上建立視圖
Phoenix中的視圖實際上是不完美的
好比我執行
CREATE VIEW TEST_VIEW ( HOST VARCHAR) AS SELECT HOST FROM TEST where DESCRIPTION in ('S1','S2','S3');
就會報錯
提示我必須帶‘*’ ,因此它的視圖是沒辦法只獲取一部分數據的數據的,即便使用子查詢也不行。
DROP VIEW my_viewDROP VIEW IF EXISTS my_schema.my_viewDROP VIEW IF EXISTS my_schema.my_view CASCADE
參考:Phoenix二級索引
從Phoenix 2.1版本開始,Phoenix支持可變和不可變(數據插入後再也不更新)數據創建二級索引。Phoenix 2.0版本僅支持在不可變數據創建二級索引。
CREATE INDEX my_idx ON sales.opportunity(last_updated_date DESC)CREATE INDEX my_idx ON log.event(created_date DESC) INCLUDE (name, payload) SALT_BUCKETS=10CREATE INDEX IF NOT EXISTS my_comp_idx ON server_metrics ( gc_time DESC, created_date DESC ) DATA_BLOCK_ENCODING='NONE',VERSIONS=?,MAX_FILESIZE=2000000 split on (?, ?, ?)CREATE INDEX my_idx ON sales.opportunity(UPPER(contact_name))
假如我對TEST的HOST,DESCRIPTION建立索引,具體sql以下:
CREATE INDEX TEST_INDEX ON TEST (HOST) INCLUDE (DESCRIPTION);
結果報錯
咱們須要在每一個region的hbase-site.xml中添加
<property> <name>hbase.regionserver.wal.codec</name> <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value></property>
開啓可變索引須要在HMaster和RegionServer上加入特殊的選項,而且須要重啓集羣。配置方式以下:
<!-- Phoenix訂製的索引負載均衡器 --> <property> <name>hbase.master.loadbalancer.class</name> <value>org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer</value> </property> <!-- Phoenix訂製的索引觀察者 --> <property> <name>hbase.coprocessor.master.classes</name> <value>org.apache.phoenix.hbase.index.master.IndexMasterObserver</value> </property>
RegionServer hbase-site.xml
<!-- Enables custom WAL edits to be written, ensuring proper writing/replay of the index updates. This codec supports the usual host of WALEdit options, most notably WALEdit compression. --><property> <name>hbase.regionserver.wal.codec</name> <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value></property><!-- Prevent deadlocks from occurring during index maintenance for global indexes (HBase 0.98.4+ and Phoenix 4.3.1+ only) by ensuring index updates are processed with a higher priority than data updates. It also prevents deadlocks by ensuring metadata rpc calls are processed with a higher priority than data rpc calls --><property> <name>hbase.region.server.rpc.scheduler.factory.class</name> <value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value><description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description></property><property> <name>hbase.rpc.controllerfactory.class</name> <value>org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory</value><description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description></property>
你能夠在hbase-site.xml裏配置如下參數
index.builder.threads.max
o 爲主表更新操做創建索引的最大線程數
o Default: 10
index.builder.threads.keepalivetime
o 上面線程的超時時間
o Default: 60
index.writer.threads.max
o 將索引寫到索引表的最大線程數
o Default: 10
index.writer.threads.keepalivetime
o 上面線程的超時時間
o Default: 60
hbase.htable.threads.max
o 同時最多有這麼多線程往索引表寫入數據
o Default: 2,147,483,647
hbase.htable.threads.keepalivetime
o 上面線程的超時時間
o Default: 60
index.tablefactory.cache.size
o 緩存10個往索引表寫數據的線程
o Default: 10
CDH能夠直接在CM中配置
重啓HBase以後便可
DROP INDEX my_idx ON sales.opportunityDROP INDEX IF EXISTS my_idx ON server_metrics
例如:
DROP INDEX IF EXISTS XDGL_ACCT_FEE_INDEX ON XDGL_ACCT_FEE
前面咱們提到了建立表和建立視圖,可是咱們都沒有對現有的HBase表關聯進行舉例,由於這一場景實際上是用的最多的,因此提出來單獨講。
使用Phoenix和HBase關聯有兩種方式:建立關聯表和建立關聯視圖。
首先建立一張HBase表
#建立STUDENT,包含cf1和cf2兩個列族create 'STUDENT' ,'cf1','cf2'#往student裏面添加數據。cf1包含了name和age信息,cf2包含了成績信息put 'STUDENT','0001','cf1:name','Xiao Ming'put 'STUDENT','0001','cf1:age','18'put 'STUDENT','0001','cf2:score','90'put 'STUDENT','0002','cf1:name','Xiao Hua'put 'STUDENT','0002','cf1:age','17'put 'STUDENT','0002','cf2:score','95'put 'STUDENT','0003','cf1:name','Xiao Fang'put 'STUDENT','0003','cf1:age','18'put 'STUDENT','0003','cf2:score','95'put 'STUDENT','0004','cf1:name','Xiao Gang'put 'STUDENT','0004','cf1:age','18'put 'STUDENT','0004','cf2:score','85'
接下來對兩種模式進行介紹:
--建立的Phoenix表名必須和HBase表名一致CREATE TABLE STUDENT (--這句話直接寫就能夠了,這樣的話,HBase中的RowKey轉換成phoenix中的主鍵,列名就叫自取。--rowkey自動會和primary key進行對應。id VARCHAR NOT NULL PRIMARY KEY ,--將名爲cf1的列族下,字段名爲name的字段,寫在這裏。"cf1"."name" VARCHAR ,--下面就以此類推"cf1"."age" VARCHAR ,"cf2"."score" VARCHAR )
此時能夠在Phoenix上進行增刪改查
若是在Phoenix上刪除數據
對應的HBase也會刪除數據
同理,增長和修改也會修改HBase的數據
刪除數據會修改HBase的數據,若是我整個表刪除掉會怎麼樣呢?
在Phoenix上執行
DROP TABLE IF EXISTS STUDENT
此時在HBase中已經看不到咱們建立的Student表了
所以,Phoenix刪除了表,會將HBase的表也刪掉。這點很是重要。
建立關聯視圖和建立關聯表基本同樣,只是將CREATE TABLE
替換成了CREATE VIEW
--建立的Phoenix表名必須和HBase表名一致CREATE VIEW STUDENT (--這句話直接寫就能夠了,這樣的話,HBase中的RowKey轉換成phoenix中的主鍵,列名就叫自取。--rowkey自動會和primary key進行對應。id VARCHAR NOT NULL PRIMARY KEY ,--將名爲cf1的列族下,字段名爲name的字段,寫在這裏。"cf1"."name" VARCHAR ,--下面就以此類推"cf1"."age" VARCHAR ,"cf2"."score" VARCHAR )
此時咱們對STUDENT視圖進行插入、更新或者刪除數據操做會報表是隻讀的錯誤
不只如此,咱們刪除student這個view,對HBase的數據不會有任何影響
那麼如何往關聯的視圖中插入數據呢?
只能經過HBase來操做了。往HBase中插入一條新的數據
put 'STUDENT','0005','cf1:name','Xiao Qiang'put 'STUDENT','0005','cf1:age','18'put 'STUDENT','0005','cf2:score','100'
此時在Phoenix中的數據也會隨之增長
適用於常常在Phoenix上進行增刪改的操做
建立HBase的關聯視圖:
首先登陸nexus私服的帳戶,添加新的Jar文件。
最後咱們能夠在nexus上看到咱們的jar包了。咱們能夠根據maven的xml裏面添加依賴
而後在sbt工程中將其添加進去
經過Spark操做Phoenix必需要經過phoenix-spark-4.8.0-cdh5.8.0.jar
這個包,若是每次使用Phoenix都本身指定一次這個文件路徑會比較麻煩,所以最好將這個文件添加到HBase的lib下,而後在spark-env.sh中指定一下。若是是CDH的環境能夠直接在CM上的spark->配置->高級->spark-conf/spark-env.sh 的 Spark 客戶端高級配置代碼段(安全閥)
中添加以下代碼
#添加Phoenix依賴for file in $(find /opt/cloudera/parcels/CDH/lib/hbase/lib/ |grep phoenix)do SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:$file"doneexport SPARK_DIST_CLASSPATH
這樣每次啓動spark任務都會將phoenix的jar包添加到classpath了
經過DataFr的DataSource API能夠加載Phoenix的數據
import org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.phoenix.spark._val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val sqlContext = new SQLContext(sc)val df = sqlContext.read .options(Map("table" -> "STUDENT", "zkUrl" -> "bqdpm1,bqdpm2,bqdps1:2181")) .format("org.apache.phoenix.spark") .loaddf.show
結果輸出
除了上述的方法,Phoenix-spark爲咱們提供了一個專用的API,其定義以下:
def phoenixTableAsDataFrame(table: String, columns: Seq[String],predicate: Option[String] = None, zkUrl: Option[String] = None,conf: Configuration = new Configuration): DataFrame = { // Create the PhoenixRDD and convert it to a DataFrame new PhoenixRDD(sqlContext.sparkContext, table, columns, predicate, zkUrl, conf) .toDataFrame(sqlContext)}
下面是一個實際的例子
import org.apache.hadoop.conf.Configurationimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.phoenix.spark._val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val sqlContext = new SQLContext(sc)val configuration = new Configuration()configuration.set("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps1")val df = sqlContext.phoenixTableAsDataFrame( "STUDENT",Array("ID","name","age"),conf = configuration)df.show
固然,咱們也能夠創建RDD
def phoenixTableAsRDD(table: String, columns: Seq[String], predicate: Option[String] = None,zkUrl: Option[String] = None, conf: Configuration = new Configuration()): RDD[Map[String, AnyRef]] = { // Create a PhoenixRDD, but only return the serializable 'result' map new PhoenixRDD(sc, table, columns, predicate, zkUrl, conf).map(_.result)}
實際例子以下
import org.apache.hadoop.conf.Configurationimport org.apache.spark.SparkContextimport org.apache.phoenix.spark._val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val rdd = sc.phoenixTableAsRDD( "STUDENT",Array("ID","name","age"),zkUrl = Some("bqdpm1,bqdpm2,bqdps1:2181"))rdd.countrdd.collect()
能夠將Spark中的數據,不管是RDD仍是DataFrame均可以將數據保存到Phoenix中
import org.apache.hadoop.conf.Configurationimport org.apache.spark.SparkContextimport org.apache.phoenix.spark._val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val dataSet = List(("0005","Zhang San","20","70"),("0006","Li Si","20","82"),("0005","Wang Wu","19","90"))sc.parallelize(dataSet) .saveToPhoenix( "STUDENT", Seq("ID","name","age","score"), zkUrl = Some("bqdpm1,bqdpm2,bqdps1:2181") )
結果輸出
假設咱們將
sc.parallelize(dataSet) .saveToPhoenix( "STUDENT", Seq("ID","name","age","score"), zkUrl = Some("bqdpm1,bqdpm2,bqdps1:2181") )
改爲
sc.parallelize(dataSet) .saveToPhoenix( "STUDENT", Seq("ID","NAME","AGE","SCORE"),//或者 Seq("ID","Name","Age","Score") zkUrl = Some("bqdpm1,bqdpm2,bqdps1:2181") )
結果會報錯
提示咱們列對應不上。因而可知 經過Spark操做Phoenix是須要區分大小寫的。這點很是重要,後面咱們還會提到。
case class Student(ID:String,Name:String,Age:String,Score:String)val dataSet = List(Student("0008","Ma Liu","18","95"),Student("0009","Zhao Qi","19","80"),Student("0010","Liu Ba","19","100"))val df = sqlContext.createDataFrame(dataSet)df.write .format("org.apache.phoenix.spark") .mode( SaveMode.Overwrite) .options(Map("table" -> "STUDENT", "zkUrl" -> "bqdpm1,bqdpm2,bqdps1:2181")).save()
運行接錯報錯,提示列對不上?難道不支持將數據寫到多個列族? (請看下節)
而後我從新使用官網的例子來驗證。
首先在Phoenix上建立一張OUTPUT_TABLE的表
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
而後將DataFrame保存到Phoenix中
case class Student(ID:Long,col1:String,col2:Int)val dataSet = List(Student(1,"Ma Liu",18),Student(2,"Zhao Qi",19),Student(3,"Liu Ba",19))val df = sqlContext.createDataFrame(dataSet)df.write .format("org.apache.phoenix.spark") .mode( SaveMode.Overwrite) .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "bqdpm1,bqdpm2,bqdps1:2181")).save()
結果成功了
咱們將case class Student(ID:Long,col1:String,col2:Int)
換成case class Student(ID:Long,Col1:String,Col2:Int)
再次執行,也是成功的
在前面咱們嘗試使用RDD和DataFrame保存數據到Phoenix,發現二者在列的支持上是有一些不一樣的。
我嘗試從源碼的角度來分析
sc.parallelize(dataSet) .saveToPhoenix( "STUDENT", Seq("ID","NAME","AGE","SCORE"),//或者 Seq("ID","Name","Age","Score") zkUrl = Some("bqdpm1,bqdpm2,bqdps1:2181") )
的時候,Phoenix的API將Column原本來本做爲輸出的Column名
因此使用RDD的saveToPhoenix函數時必須嚴格按照Phoenix的Column名的大小寫來輸入
df.write.format("org.apache.phoenix.spark").mode( SaveMode.Overwrite).options(Map("table" -> "STUDENT1", "zkUrl" -> "bqdpm1,bqdpm2,bqdps1:2181")).save()
也就是說藉助了org.apache.phoenix.spark裏面的隱式函數
implicit def toDataFrameFunctions(data: DataFrame): DataFrameFunctions = { new DataFrameFunctions(data)}
而這個SchemaUtil.normalizeIdentifier(x)
僅僅只是將字符串裏面的引號去掉,而後轉成大寫
public static String normalizeIdentifier(String name) { if (name == null) { return name; } if (isCaseSensitive(name)) { // Don't upper case if in quotes return name.substring(1, name.length()-1); } return name.toUpperCase();}
也就是說無論咱們的DataFrame的列是什麼格式,最終都會轉成大寫。
而後Phoenix裏面的列可能不是大寫的,因此就可能出現列名是對的,可是大小寫對應不上的尷尬局面,並不是Phoenix不能寫入大寫數據。
爲了驗證這一個猜測,咱們再次創建了一張經過Phoenix關聯HBase已有的表Student1
create 'STUDENT1' ,'cf1','cf2'put 'STUDENT1','0001','cf1:NAME','Xiao Ming'put 'STUDENT1','0001','cf1:AGE','18'put 'STUDENT1','0001','cf2:SCORE','90'
CREATE TABLE STUDENT1 (ID VARCHAR NOT NULL PRIMARY KEY ,"cf1"."NAME" VARCHAR ,"cf1"."AGE" VARCHAR ,"cf2"."SCORE" VARCHAR )
再次執行
case class Student(ID:Long,col1:String,col2:Int)val dataSet = List(Student(1,"Ma Liu",18),Student(2,"Zhao Qi",19),Student(3,"Liu Ba",19))val df = sqlContext.createDataFrame(dataSet)df.write .format("org.apache.phoenix.spark") .mode( SaveMode.Overwrite) .options(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "bqdpm1,bqdpm2,bqdps1:2181")).save()
此次也沒用報錯了
結論:
在使用RDD保存數據到Phoenix的時候,要嚴格按照Phoenix列名的大小寫來輸入
使用DataFrame保存的時候,對數據源的列名大小寫無要求。可是必須保證Phoenix的表列名必須是大寫的
HBase建表的時候,咱們建議您對錶名和列都使用大寫
使用Phoenix建立表的時候,除非是已經存在了HBase的表,不然無須要建表的時候對列帶引號,這樣sql中即便是小寫的列也會保存爲大寫,好比:
--建立的Phoenix表名必須和HBase表名一致CREATE TABLE STUDENT3 (--這句話直接寫就能夠了,這樣的話,HBase中的RowKey轉換成phoenix中的主鍵,列名就叫自取。--rowkey自動會和primary key進行對應。ID VARCHAR NOT NULL PRIMARY KEY ,--將名爲cf1的列族下,字段名爲name的字段,寫在這裏。cf1.name VARCHAR ,--下面就以此類推cf1.age VARCHAR ,cf2.score VARCHAR )
下圖是經過Phoenix取數據的每面請求條數
一樣的邏輯,最開始使用Scan的時候只有50000 requests/secend,二者性能差異很是大。
之前從HBase須要4個小時才能拿完的數據,如今只須要一個小時了.