Spark入門實戰系列--6.SparkSQL(上)--SparkSQL簡介

【注】該系列文章以及使用到安裝包/測試數據 能夠在《傾情大奉送--Spark入門實戰系列》獲取

1SparkSQL的發展歷程

1.1 Hive and Shark

SparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,它是當時惟一運行在Hadoop上的SQL-on-Hadoop工具。可是MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,下降的運行效率,爲了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較爲突出的是:html

l MapRDrilljava

l ClouderaImpalaweb

l Sharksql

其中Shark是伯克利實驗室Spark生態環境的組件之一,它修改了下圖所示的右下角的內存管理、物理計劃、執行三個模塊,並使之能運行在Spark引擎上,從而使得SQL查詢的速度獲得10-100倍的提高。數據庫

clip_image002

1.2 SharkSparkSQL 

可是,隨着Spark的發展,對於野心勃勃的Spark團隊來講,Shark對於Hive的太多依賴(如採用Hive的語法解析器、查詢優化器等等),制約了SparkOne Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,因此提出了SparkSQL項目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優勢,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,從新開發了SparkSQL代碼;因爲擺脫了對Hive的依賴性,SparkSQL不管在數據兼容、性能優化、組件擴展方面都獲得了極大的方便,真可謂「退一步,海闊天空」。express

l數據兼容方面  不但兼容Hive,還能夠從RDDparquet文件、JSON文件中獲取數據,將來版本甚至支持獲取RDBMS數據以及cassandraNOSQL數據;apache

l性能優化方面  除了採起In-Memory Columnar Storagebyte-code generation等優化技術外、將會引進Cost Model對查詢進行動態評估、獲取最佳物理計劃等等;編程

l組件擴展方面  不管是SQL的語法解析器、分析器仍是優化器均可以從新定義,進行擴展。json

201461Shark項目和SparkSQL項目的主持人Reynold Xin宣佈:中止對Shark的開發,團隊將全部資源放SparkSQL項目上,至此,Shark的發展畫上了句話,但也所以發展出兩個直線:SparkSQLHive on Spark數組

clip_image004

其中SparkSQL做爲Spark生態的一員繼續發展,而再也不受限於Hive,只是兼容Hive;而Hive on Spark是一個Hive的發展計劃,該計劃將Spark做爲Hive的底層引擎之一,也就是說,Hive將再也不受限於一個引擎,能夠採用Map-ReduceTezSpark等引擎。

1.3 SparkSQL的性能

Shark的出現,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提升:

clip_image006

那麼,擺脫了Hive的限制,SparkSQL的性能又有怎麼樣的表現呢?雖然沒有Shark相對於Hive那樣矚目地性能提高,但也表現得很是優異:

clip_image008

爲何SparkSQL的性能會獲得怎麼大的提高呢?主要SparkSQL在下面幾點作了優化:

A:內存列存儲(In-Memory Columnar Storage

SparkSQL的表數據在內存中存儲不是採用原生態的JVM對象存儲方式,而是採用內存列存儲,以下圖所示。

clip_image010

該存儲方式不管在空間佔用量和讀取吞吐率上都佔有很大優點。

對於原生態的JVM對象存儲方式,每一個對象一般要增長12-16字節的額外開銷,對於一個270MBTPC-H lineitem table數據,使用這種方式讀入內存,要使用970MB左右的內存空間(一般是25倍於原生數據空間);另外,使用這種方式,每一個數據記錄產生一個JVM對象,若是是大小爲200B的數據記錄,32G的堆棧將產生1.6億個對象,這麼多的對象,對於GC來講,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關)。顯然這種內存存儲方式對於基於內存計算的Spark來講,很昂貴也負擔不起。

對於內存列存儲來講,將全部原生數據類型的列採用原生數組來存儲,將Hive支持的複雜數據類型(如arraymap等)先序化後並接成一個字節數組來存儲。這樣,每一個列建立一個JVM對象,從而致使能夠快速的GC和緊湊的數據存儲;額外的,還可使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)下降內存開銷;更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會獲得很大的提升,緣由就是這些列的數據放在一塊兒,更容易讀入內存進行計算。

B:字節碼生成技術(bytecode generation,即CG

在數據庫查詢中有一個昂貴的操做是查詢語句中的表達式,主要是因爲JVM的內存模型引發的。好比以下一個查詢:

SELECT a + b FROM table

在這個查詢裏,若是採用通用的SQL語法途徑去處理,會先生成一個表達式樹(有兩個節點的Add樹,參考後面章節),在物理處理這個表達式樹的時候,將會如圖所示的7個步驟:

1.  調用虛函數Add.eval(),須要確認Add兩邊的數據類型

2.  調用虛函數a.eval(),須要確認a的數據類型

3.  肯定a的數據類型是Int,裝箱

4.  調用虛函數b.eval(),須要確認b的數據類型

5.  肯定b的數據類型是Int,裝箱

6.  調用Int類型的Add

7.  返回裝箱後的計算結果

其中屢次涉及到虛函數的調用,虛函數的調用會打斷CPU的正常流水線處理,減緩執行。

Spark1.1.0catalyst模塊的expressions增長了codegen模塊,若是使用動態字節碼生成技術(配置spark.sql.codegen參數),SparkSQL在執行物理計劃的時候,對匹配的表達式採用特定的代碼,動態編譯,而後運行。如上例子,匹配到Add方法:

clip_image012

而後,經過調用,最終調用:

clip_image014

最終實現效果相似以下僞代碼:

val a: Int = inputRow.getInt(0)

val b: Int = inputRow.getInt(1)

val result: Int = a + b

resultRow.setInt(0, result)

對於Spark1.1.0,對SQL表達式都做了CG優化,具體能夠參看codegen模塊。CG優化的實現主要仍是依靠scala2.10的運行時放射機制(runtime reflection)。對於SQL查詢的CG優化,能夠簡單地用下圖來表示:

clip_image016

CScala代碼優化

另外,SparkSQL在使用Scala編寫代碼的時候,儘可能避免低效的、容易GC的代碼;儘管增長了編寫代碼的難度,但對於用戶來講,仍是使用統一的接口,沒受到使用上的困難。下圖是一個Scala代碼優化的示意圖:

clip_image018

2SparkSQL運行架構

相似於關係型數據庫,SparkSQL也是語句也是由Projectiona1a2a3)、Data SourcetableA)、Filtercondition)組成,分別對應sql查詢過程當中的ResultData SourceOperation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。

clip_image020

 

當執行SparkSQL語句的順序爲:

1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECTFROMWHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規範;

2.SQL語句和數據庫的數據字典(列、表、視圖等等)進行綁定(Bind),若是相關的ProjectionData Source等都是存在的話,就表示這個SQL語句是能夠執行的;

3.通常的數據庫會提供幾個執行計劃,這些計劃通常都有運行統計數據,數據庫會在這些計劃中選擇一個最優計劃(Optimize);

4.計劃執行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執行過程有時候甚至不須要讀取物理表就能夠返回結果,好比從新運行剛運行過的SQL語句,可能直接從數據庫的緩衝池中獲取返回結果。

2.1 TreeRule

SparkSQLSQL語句的處理和關係型數據庫對SQL語句的處理採用了相似的方法,首先會將SQL語句進行解析(Parse),而後造成一個Tree,在後續的如綁定、優化等處理過程都是對Tree的操做,而操做的方法是採用Rule,經過模式匹配,對不一樣類型的節點採用不一樣的操做。在整個sql語句的處理過程當中,TreeRule相互配合,完成了解析、綁定(在SparkSQL中稱爲Analysis)、優化、物理計劃等過程,最終生成能夠執行的物理計劃。

2.1.1 Tree

l  Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees

l  Logical PlansExpressionsPhysical Operators均可以使用Tree表示

l  Tree的具體操做是經過TreeNode來實現的

Ø  SparkSQL定義了catalyst.trees的日誌,經過這個日誌能夠形象的表示出樹的結構

Ø  TreeNode可使用scala的集合操做方法(如foreach, map, flatMap, collect等)進行操做

Ø  有了TreeNode,經過Tree中各個TreeNode之間的關係,能夠對Tree進行遍歷操做,如使用transformDowntransformUpRule應用到給定的樹段,而後用結果替代舊的樹段;也可使用transformChildrenDowntransformChildrenUp對一個給定的節點進行操做,經過迭代將Rule應用到該節點以及子節點。

l  TreeNode能夠細分紅三種類型的Node

Ø  UnaryNode 一元節點,即只有一個子節點。如LimitFilter操做

Ø  BinaryNode 二元節點,即有左右子節點的二叉節點。如JionUnion操做

Ø  LeafNode 葉子節點,沒有子節點的節點。主要用戶命令類操做,如SetCommand

 

2.1.2 Rule

l  Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules

l  RuleSparkSQLAnalyzerOptimizerSparkPlan等各個組件中都有應用到

l  Rule是一個抽象類,具體的Rule實現是經過RuleExecutor完成

l  Rule經過定義batchbatchs,能夠簡便的、模塊化地對Tree進行transform操做

l  Rule經過定義OnceFixedPoint,能夠對Tree進行一次操做或屢次操做(如對某些Tree進行屢次迭代操做的時候,達到FixedPoint次數迭代或達到先後兩次的樹結構沒變化才中止操做,具體參看RuleExecutor.apply

2.2 sqlContexthiveContext的運行過程

SparkSQL有兩個分支,sqlContexthiveContextsqlContext如今只支持SQL語法解析器(SQL-92語法);hiveContext如今支持SQL語法解析器和hivesql語法解析器,默認爲hiveSQL語法解析器,用戶能夠經過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法,

2.2.1 sqlContext的運行過程

sqlContext總的一個過程以下圖所示:

1.SQL語句通過SqlParse解析成UnresolvedLogicalPlan

2.使用analyzer結合數據數據字典(catalog)進行綁定,生成resolvedLogicalPlan

3.使用optimizerresolvedLogicalPlan進行優化,生成optimizedLogicalPlan

4.使用SparkPlanLogicalPlan轉換成PhysicalPlan

5.使用prepareForExecution()PhysicalPlan轉換成可執行物理計劃;

6.使用execute()執行可執行物理計劃;

7.生成SchemaRDD

在整個運行過程當中涉及到多個SparkSQL的組件,如SqlParseanalyzeroptimizerSparkPlan等等

clip_image022

2.2.2hiveContext的運行過程

hiveContext總的一個過程以下圖所示:

1.SQL語句通過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程當中對hiveql語句使用getAst()獲取AST樹,而後再進行解析;

2.使用analyzer結合數據hive源數據Metastore(新的catalog)進行綁定,生成resolved LogicalPlan

3.使用optimizerresolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;

4.使用hivePlannerLogicalPlan轉換成PhysicalPlan

5.使用prepareForExecution()PhysicalPlan轉換成可執行物理計劃;

6.使用execute()執行可執行物理計劃;

7.執行後,使用map(_.copy)將結果導入SchemaRDD

clip_image024

2.3 catalyst優化器

SparkSQL1.1整體上由四個模塊組成:corecatalysthivehive-Thriftserver

l  core處理數據的輸入輸出,從不一樣的數據源獲取數據(RDDParquetjson等),將查詢結果輸出成schemaRDD

l  catalyst處理查詢語句的整個處理過程,包括解析、綁定、優化、物理計劃等,說其是優化器,還不如說是查詢引擎;

l  hivehive數據的處理

l  hive-ThriftServer提供CLIJDBC/ODBC接口

在這四個模塊中,catalyst處於最核心的部分,其性能優劣將影響總體的性能。因爲發展時間尚短,還有不少不足的地方,但其插件式的設計,爲將來的發展留下了很大的空間。下面是catalyst的一個設計圖:

clip_image026

 

其中虛線部分是之後版本要實現的功能,實線部分是已經實現的功能。從上圖看,catalyst主要的實現組件有:

lsqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;

lAnalyzer,主要完成綁定工做,將不一樣來源的Unresolved LogicalPlan和數據元數據(如hive metastoreSchema catalog)進行綁定,生成resolved LogicalPlan

loptimizerresolved LogicalPlan進行優化,生成optimized LogicalPlan

l PlannerLogicalPlan轉換成PhysicalPlan

l CostModel,主要根據過去的性能統計數據,選擇最佳的物理執行計劃

這些組件的基本實現方法:

l 先將sql語句經過解析生成Tree,而後在不一樣階段使用不一樣的Rule應用到Tree上,經過轉換完成各個組件的功能。

l Analyzer使用Analysis Rules,配合數據元數據(如hive metastoreSchema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan

l optimizer使用Optimization Rules,對resolved LogicalPlan進行合併、列裁剪、過濾器下推等優化做業而轉換成optimized LogicalPlan

l Planner使用Planning Strategies,對optimized LogicalPlan

3SparkSQL CLI

CLICommand-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執行指令的界面,它一般不支持鼠標,用戶經過鍵盤輸入指令,計算機接收到指令後予以執行。Spark CLI指的是使用命令界面直接輸入SQL命令,而後發送到Spark集羣進行執行,在界面中顯示運行過程和最終的結果。

Spark1.1相較於Spark1.0最大的差異就在於Spark1.1增長了Spark SQL CLIThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數據庫管理員較容易地上手,真正意義上進入了SQL時代。

【注】Spark CLISpark Thrift Server實驗環境爲第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

3.1  運行環境說明

3.1.1 硬軟件環境

l  主機操做系統:Windows 64位,雙核4線程,主頻2.2G10G內存

l  虛擬軟件:VMware® Workstation 9.0.0 build-812388

l  虛擬機操做系統:CentOS 64位,單核

l  虛擬機運行環境:

Ø  JDK1.7.0_55 64

Ø  Hadoop2.2.0(須要編譯爲64位)

Ø  Scala2.11.4

Ø  Spark1.1.0(須要編譯)

Ø  Hive0.13.1

3.1.2 機器網絡環境

集羣包含三個節點,節點之間能夠免密碼SSH訪問,節點IP地址和主機名分佈以下:

序號

IP地址

機器名

類型

核數/內存

用戶名

目錄

1

192.168.0.61

hadoop1

NN/DN/RM

Master/Worker

1/3G

hadoop

/app 程序所在路徑

/app/scala-...

/app/hadoop

/app/complied

2

192.168.0.62

hadoop2

DN/NM/Worker

1/2G

hadoop

3

192.168.0.63

hadoop3

DN/NM/Worker

1/2G

hadoop

3.2 配置並啓動

3.2.1 建立並配置hive-site.xml

在運行Spark SQL CLI中須要使用到Hive Metastore,故須要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下建立hive-site.xml文件,而後在該配置文件中,添加hive.metastore.uris屬性,具體以下:

<configuration> 

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

</configuration>

clip_image028

3.2.2 啓動Hive

在使用Spark SQL CLI以前須要啓動Hive Metastore(若是數據存放在HDFS文件系統,還須要啓動HadoopHDFS),使用以下命令可使Hive Metastore啓動後運行在後臺,能夠經過jobs查詢:

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image030

3.2.3 啓動Spark集羣和Spark SQL CLI

經過以下命令啓動Spark集羣和Spark SQL CLI

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

在集羣監控頁面能夠看到啓動了SparkSQL應用程序:

clip_image032

這時就可使用HQL語句對Hive數據進行查詢,另外可使用COMMAND,如使用set進行設置參數:默認狀況下,SparkSQL Shuffle的時候是200partition,可使用以下命令修改該參數:

SET spark.sql.shuffle.partitions=20;

運行同一個查詢語句,參數改變後,Taskpartition)的數量就由200變成了20

clip_image034

3.2.4 命令參數

經過bin/spark-sql --help能夠查看CLI命令參數:

clip_image036

clip_image038

其中[options] CLI啓動一個SparkSQL應用程序的參數,若是不設置--master的話,將在啓動spark-sql的機器以local方式運行,只能經過http://機器名:4040進行監控;這部分參數,能夠參照Spark1.0.0 應用程序部署工具spark-submit 的參數。

[cli option]CLI的參數,經過這些參數CLI能夠直接運行SQL文件、進入命令行運行SQL命令等等,相似之前的Shark的用法。須要注意的是CLI不是使用JDBC鏈接,因此不能鏈接到ThriftServer;但能夠配置conf/hive-site.xml鏈接到HiveMetastore,而後對Hive數據進行查詢。

3.3 實戰Spark SQL CLI

3.3.1 獲取訂單每一年的銷售單數、銷售總額

第一步   設置任務個數,在這裏修改成20

spark-sql>SET spark.sql.shuffle.partitions=20;

clip_image040

第二步   運行SQL語句

spark-sql>use hive;

clip_image042

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image044

第三步   查看運行結果

clip_image046

clip_image048

3.3.2 計算全部訂單每一年的總金額

第一步   執行SQL語句

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image050

第二步   執行結果

使用CLI執行結果以下:

clip_image052

clip_image054

3.3.3 計算全部訂單每一年最大金額訂單的銷售額

第一步   執行SQL語句

spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

clip_image056

第二步   執行結果

使用CLI執行結果以下:

clip_image058

 

clip_image060

4Spark Thrift Server

ThriftServer是一個JDBC/ODBC接口,用戶能夠經過JDBC/ODBC鏈接ThriftServer來訪問SparkSQL的數據。ThriftServer在啓動的時候,會啓動了一個SparkSQL的應用程序,而經過JDBC/ODBC鏈接進來的客戶端共同分享這個SparkSQL應用程序的資源,也就是說不一樣的用戶之間能夠共享數據;ThriftServer啓動時還開啓一個偵聽器,等待JDBC客戶端的鏈接和提交查詢。因此,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,若是要使用Hive數據的話,還要提供Hive Metastoreuris

【注】Spark CLISpark Thrift Server實驗環境爲第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

4.1 配置並啓動

4.1.1 建立並配置hive-site.xml

第一步   建立hive-site.xml配置文件

$SPARK_HOME/conf目錄下修改hive-site.xml配置文件(若是在Spark SQL CLI中已經添加,能夠省略):

$cd /app/hadoop/spark-1.1.0/conf

$sudo vi hive-site.xml

clip_image062

第二步   修改配置文件

設置hadoop1Metastore服務器,hadoop2Thrift Server服務器,配置內容以下:

<configuration>

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.min.worker.threads</name>

    <value>5</value>

    <description>Minimum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.max.worker.threads</name>

    <value>500</value>

    <description>Maximum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.port</name>

    <value>10000</value>

    <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.bind.host</name>

    <value>hadoop2</value>

    <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>

  </property>

</configuration>

clip_image064

4.1.2 啓動Hive

hadoop1節點中,在後臺啓動Hive Metastore(若是數據存放在HDFS文件系統,還須要啓動HadoopHDFS):

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image066

4.1.3 啓動Spark集羣和Thrift Server

hadoop1節點啓動Spark集羣

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

hadoop2節點上進入SPARK_HOME/sbin目錄,使用以下命令啓動Thrift Server

$cd /app/hadoop/spark-1.1.0/sbin

$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g

clip_image068

注意Thrift Server須要按照配置在hadoop2啓動!

在集羣監控頁面能夠看到啓動了SparkSQL應用程序:

clip_image070

4.1.4 命令參數

使用sbin/start-thriftserver.sh --help能夠查看ThriftServer的命令參數:

$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]

        Thrift server options: Use value for given property

clip_image072

clip_image074

其中[options] Thrift Server啓動一個SparkSQL應用程序的參數,若是不設置--master的話,將在啓動Thrift Server的機器以local方式運行,只能經過http://機器名:4040進行監控;這部分參數,能夠參照Spark1.0.0 應用程序部署工具spark-submit 的參數。在集羣中提供Thrift Server的話,必定要配置masterexecutor-memory等參數。

[thrift server options]Thrift Server的參數,可使用-dproperty=value的格式來定義;在實際應用上,由於參數比較多,一般使用conf/hive-site.xml配置。

4.2 實戰Thrift Server

4.2.1 遠程客戶端鏈接

能夠在任意節點啓動bin/beeline,用!connect jdbc:hive2://hadoop2:10000鏈接ThriftServer,由於沒有采用權限管理,因此用戶名用運行bin/beeline的用戶hadoop,密碼爲空:

$cd /app/hadoop/spark-1.1.0/bin

$./beeline

beeline>!connect jdbc:hive2://hadoop2:10000

clip_image076

4.2.2 基本操做

第一步   顯示hive數據庫全部表

beeline>show database;

beeline>use hive;

beeline>show tables;

clip_image078

第二步   建立表testThrift

beeline>create table testThrift(field1 String , field2 Int);

beeline>show tables;

clip_image080

第三步   tbStockDetail表中金額大於3000插入到testThrift表中

beeline>insert into table testThrift select ordernumber,amount from tbStockDetail  where amount>3000;

beeline>select * from testThrift;

clip_image082

第四步   從新建立testThrift表中,把年度最大訂單插入該表中

beeline>drop table testThrift;

beeline>create table testThrift (field1 String , field2 Int);

beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

beeline>select * from testThrift;

clip_image084

4.2.3 計算全部訂單每一年的訂單數

第一步   執行SQL語句

spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第二步   執行結果

clip_image086

Stage監控頁面:

clip_image088

查看Details for Stage 28

clip_image090

4.2.4 計算全部訂單月銷售額前十名

第一步   執行SQL語句

spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;

第二步   執行結果

clip_image092

Stage監控頁面:

clip_image094

在其第一個Task中,從本地讀入數據

clip_image096

在後面的Task是從內存中獲取數據

clip_image098

4.2.5 緩存表數據

第一步   緩存數據

beeline>cache table tbStock;

beeline>select count(*) from tbStock;

clip_image100

第二步   運行4.2.4中的「計算全部訂單月銷售額前十名」

beeline>select count(*) from tbStock;

clip_image102

本次計算劃給11.233秒,查看webUI,數據已經緩存,緩存率爲100%

clip_image104

第三步   在另外節點再次運行

hadoop3節點啓動bin/beeline,用!connect jdbc:hive2://hadoop2:10000鏈接ThriftServer,而後直接運行對tbStock計數(注意沒有進行數據庫的切換):

clip_image106

用時0.343秒,再查看webUI中的stage

clip_image108

Locality LevelPROCESS,顯然是使用了緩存表。

從上能夠看出,ThriftServer能夠鏈接多個JDBC/ODBC客戶端,並相互之間能夠共享數據。順便提一句,ThriftServer啓動後處於監聽狀態,用戶可使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。

4.2.6 IDEAJDBC訪問

有了ThriftServer,開發人員能夠很是方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個scala代碼,查詢表tbStockDetail,返回amount>3000的單據號和交易金額:

第一步   IDEA建立class6包和類JDBCofSparkSQL

參見《Spark編程模型(下)--IDEA搭建及實戰》在IDEA中建立class6包並新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大於3000的訂單:

package class6

import java.sql.DriverManager

 

object JDBCofSparkSQL {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")

    try {

      val statement = conn.createStatement

val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail  where amount>3000")

      while (rs.next) {

        val ordernumber = rs.getString("ordernumber")

        val amount = rs.getString("amount")

        println("ordernumber = %s, amount = %s".format(ordernumber, amount))

      }

    } catch {

      case e: Exception => e.printStackTrace

    }

    conn.close

  }

}

第二步   查看運行結果

IDEA中能夠觀察到,在運行日誌窗口中沒有運行過程的日誌,只顯示查詢結果

clip_image110

第三步   查看監控結果

Spark監控界面中觀察到,該Job有一個編號爲6Stage,該Stage2Task,分別運行在hadoop1hadoop2節點,獲取數據爲NODE_LOCAL方式。

clip_image112

clip_image114

clip_image116

hadoop2中觀察Thrift Server運行日誌以下:

clip_image118

相關文章
相關標籤/搜索