這裏的SparkSQL是指整合了Hive的spark-sql cli(關於SparkSQL和Hive的整合,見文章後面的參考閱讀).java
本質上就是經過Hive訪問HBase表,具體就是經過hive-hbase-handler .node
環境篇sql
hadoop-2.3.0-cdh5.0.0apache
apache-hive-0.13.1-bin緩存
spark-1.4.0-bin-hadoop2.3app
hbase-0.96.1.1-cdh5.0.0ide
部署狀況以下圖:oop
測試集羣,將Spark Worker部署在每臺DataNode上,是爲了最大程度的任務本地化,Spark集羣爲Standalone模式部署。性能
其中有三臺機器上也部署了RegionServer。學習
這個部署狀況對理解後面提到的任務本地化調度有幫助。
1. 拷貝如下HBase的相關jar包到Spark Master和每一個Spark Worker節點上的$SPARK_HOME/lib目錄下.
(我嘗試用–jars的方式添加以後,不work,因此採用這種土辦法)
$HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar
$HBASE_HOME/lib/htrace-core-2.01.jar
$HBASE_HOME/lib/protobuf-java-2.5.0.jar
$HBASE_HOME/lib/guava-12.0.1.jar
$HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar
2.配置每一個節點上的$SPARK_HOME/conf/spark-env.sh,將上面的jar包添加到SPARK_CLASSPATH
export SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:
$SPARK_HOME/lib/htrace-core-2.01.jar:
$SPARK_HOME/lib/protobuf-java-2.5.0.jar:
$SPARK_HOME/lib/guava-12.0.1.jar:
$SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:
${SPARK_CLASSPATH}
3.將hbase-site.xml拷貝至${HADOOP_CONF_DIR},因爲spark-env.sh中配置了Hadoop配置文件目錄${HADOOP_CONF_DIR},所以會將hbase-site.xml加載。
hbase-site.xml中主要是如下幾個參數的配置:
hbase.zookeeper.quorum
zkNode1:2181,zkNode2:2181,zkNode3:2181
HBase使用的zookeeper節點
hbase.client.scanner.caching
5000
HBase客戶端掃描緩存,對查詢性能有很大幫助
另外還有一個參數:zookeeper.znode.parent=/hbase
是HBase在zk中的根目錄,默認爲/hbase,視實際狀況進行配置。
4.重啓Spark集羣。
大數據學習交流羣:724693112 歡迎想學習大數據和須要大數據學習資料的同窗來一塊兒學習。
hbase中有表lxw1234,數據以下:
hbase(main):025:0* scan 'lxw1234'
ROW COLUMN+CELL
lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1
lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2
lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1
lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2
lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1
lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2
lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3
1 row(s) in 0.0350 seconds
進入spark-sql,使用以下語句建表:
CREATE EXTERNAL TABLE lxw1234 (
rowkey string,
f1 map,
f2 map,
f3 map
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")
TBLPROPERTIES ("hbase.table.name" = "lxw1234");
建好以後,就能夠查詢了:
spark-sql> select * from lxw1234;
lxw1234.com {"c1":"name1","c2":"name2"} {"c1":"age1","c2":"age2"} {"c1":"job1","c2":"job2","c3":"job3"}
Time taken: 4.726 seconds, Fetched 1 row(s)
spark-sql> select count(1) from lxw1234;
1
Time taken: 2.46 seconds, Fetched 1 row(s)
spark-sql>
大表查詢,消耗的時間和經過Hive用MapReduce查詢差很少。
spark-sql> select count(1) from lxw1234_hbase;
53609638
Time taken: 335.474 seconds, Fetched 1 row(s)
在spark-sql中經過insert插入數據到HBase表時候報錯:
INSERT INTO TABLE lxw1234
SELECT 'row1' AS rowkey,
map('c3','name3') AS f1,
map('c3','age3') AS f2,
map('c4','job3') AS f3
FROM lxw1234_a
limit 1;
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 10.0 (TID 23, slave013.uniclick.cloud):
java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
這個還有待分析。
先看這張圖,該圖爲運行select * from lxw1234_hbase;這張大表查詢時候的任務運行圖。
Spark和Hadoop MapReduce同樣,在任務調度時候都會考慮數據本地化,即」任務向數據靠攏」,儘可能將任務分配到數據所在的節點上運行。
基於這點,lxw1234_hbase爲HBase中的外部表,Spark在解析時候,經過org.apache.hadoop.hive.hbase.HBaseStorageHandler獲取到表lxw1234_hbase在HBase中的region所在的RegionServer,即:slave00四、slave00五、slave006 (上面的部署圖中提到了,總共只有三臺RegionServer,就是這三臺),因此,在調度任務時候,首先考慮要往這三臺節點上分配任務。
表lxw1234_hbase共有10個region,所以須要10個map task來運行。
再看一張圖,這是spark-sql cli指定的Executor配置:
每臺機器上Worker的實例爲2個,每一個Worker實例中運行的Executor爲1個,所以,每臺機器上運行兩個Executor.
那麼salve00四、slave00五、slave006上各運行2個Executor,總共6個,很好,Spark已經第一時間將這6個Task交給這6個Executor去執行了(NODE_LOCAL Tasks)。
剩下4個Task,沒辦法,想NODE_LOCAL運行,但那三臺機器上沒有剩餘的Executor了,只能分配給其餘Worker上的Executor,這4個Task爲ANY Tasks。
正如那張任務運行圖中所示。
經過Hive和spark-sql去訪問HBase表,只是爲統計分析提供了必定的便捷性,我的以爲性能上的優點並不明顯。
可能Spark經過API去讀取HBase數據,性能更好些吧,之後再試。
另外,spark-sql有一點好處,就是能夠先把HBase中的數據cache到一張內存表中,而後在這張內存表中,
經過SQL去統計分析,那就爽多了。