l 主機操做系統:Windows 64位,雙核4線程,主頻2.2G,10G內存html
l 虛擬軟件:VMware® Workstation 9.0.0 build-812388web
l 虛擬機操做系統:CentOS6.5 64位,單核算法
l 虛擬機運行環境:sql
Ø JDK:1.7.0_55 64位shell
Ø Hadoop:2.2.0(須要編譯爲64位)apache
Ø Scala:2.10.4json
Ø Spark:1.1.0(須要編譯)緩存
Ø Hive:0.13.1(源代碼編譯,參見1.2)性能優化
本次實驗環境只須要hadoop1一臺機器便可,網絡環境配置以下:服務器
序號 |
機器名 |
類型 |
用戶名 |
目錄 |
|
1 |
192.168.0.61 |
hadoop1 |
NN/DN |
hadoop |
/app 程序所在路徑 /app/scala-... /app/hadoop /app/complied |
這裏選擇下載的版本爲hive-0.13.1,這個版本須要到apache的歸檔服務器下載,下載地址:http://archive.apache.org/dist/hive/hive-0.13.1/,選擇apache-hive-0.13.1-src.tar.gz文件進行下載:
把下載的hive-0.13.0.tar.gz安裝包,使用SSH Secure File Transfer工具(參見第2課《Spark編譯與部署(上)--基礎環境搭建》1.3.1介紹)上傳到/home/hadoop/upload 目錄下。
到上傳目錄下,用以下命令解壓縮hive安裝文件:
$cd /home/hadoop/upload
$tar -zxf apache-hive-0.13.1-src.tar.gz
更名並移動到/app/complied目錄下:
$sudo mv apache-hive-0.13.1-src /app/complied/hive-0.13.1-src
$ll /app/complied
編譯Hive源代碼的時候,須要從網上下載依賴包,因此整個編譯過程機器必須保證在聯網狀態。編譯執行以下腳本:
$cd /app/complied/hive-0.13.1-src/
$export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
$mvn -Phadoop-2,dist -Dmaven.test.skip=true clean package
在編譯過程當中可能出現速度慢或者中斷,能夠再次啓動編譯,編譯程序會在上次的編譯中斷處繼續進行編譯,整個編譯過程耗時與網速緊密相關,網速較快的狀況須要1個小時左右(上圖的時間是屢次編譯後最後成功的界面)。最終編譯的結果爲$HIVE_HOME/packaging/target/apache-hive-0.13.1-bin.tar.gz
經過以下命令查看最終編譯完成整個目錄大小,能夠看到大小爲353.6M左右
$du -s /app/complied/hive-0.13.1-src
【注】已經編譯好的Hive包在本系列配套資源/install/6.hive-0.13.1-src.tar.gz,讀者直接使用
因爲首次運行hive-console須要在Spark源代碼進行編譯,關於Spark源代碼的獲取能夠參考第二課《Spark編譯與部署(下)--Spark編譯安裝》方式進行獲取,鏈接地址爲 http://spark.apache.org/downloads.html,獲取源代碼後把Spark源代碼移動到/app/complied目錄,並命名爲spark-1.1.0-hive
第一步 使用以下命令打開/etc/profile文件:
$sudo vi /etc/profile
第二步 設置以下參數:
export HADOOP_HOME=/app/hadoop/hadoop-2.2.0
export HIVE_HOME=/app/complied/hive-0.13.1-src
export HIVE_DEV_HOME=/app/complied/hive-0.13.1-src
第三步 生效配置並驗證
$sudo vi /etc/profile
$echo $HIVE_DEV_HOME
運行hive/console不須要啓動Spark,須要進入到Spark根目錄下使用sbt/sbt hive/console進行首次運行編譯,編譯之後下次能夠直接啓動。編譯Spark源代碼的時候,須要從網上下載依賴包,因此整個編譯過程機器必須保證在聯網狀態。編譯命令以下:
$cd /app/complied/spark-1.1.0-hive
$sbt/sbt hive/console
編譯時間會很長,在編譯過程當中可能出現速度慢或者中斷,能夠再次啓動編譯,編譯程序會在上次的編譯中斷處繼續進行編譯,整個編譯過程耗時與網速緊密相關。
經過以下命令查看最終編譯完成整個目錄大小,能夠看到大小爲267.9M左右
$du -s /app/complied/spark-1.1.0-hive
【注】已經編譯好的Spark for hive-console包在本系列配套資源/install/6.spark-1.1.0-hive.tar.gz,可直接使用
進入到spark根目錄下,使用以下命令啓動hive-console
$cd /app/complied/spark-1.1.0-hive
$sbt/sbt hive/console
可使用:help查看幫助內容
scala>:help
可使用tab鍵查看全部可以使用命令、函數
首先定義Person類,在該類中定義name、age和state三個列,而後把該類註冊爲people表並裝載數據,最後經過查詢到數據存放到query中
scala>case class Person(name:String, age:Int, state:String)
scala>sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")
scala>val query= sql("select * from people")
scala>query.printSchema
scala>query.collect()
scala>query.queryExecution
scala>query.queryExecution.logical
scala>query.queryExecution.analyzed
scala>query.queryExecution.optimizedPlan
scala>query.queryExecution.sparkPlan
scala>query.toDebugString
上面經常使用操做裏介紹了源自RDD的數據, SparkSQL也能夠源自多個數據源:jsonFile、parquetFile和Hive等。
第一步 Json測試數據
Json文件支持嵌套表,SparkSQL也能夠讀入嵌套表,以下面形式的Json數據,可使用jsonFile讀入SparkSQL。該文件能夠在配套資源/data/class6中找到,在如下測試中把文件放到 /home/hadoop/upload/class6 路徑中
{
"fullname": "Sean Kelly",
"org": "SK Consulting",
"emailaddrs": [
{"type": "work", "value": "kelly@seankelly.biz"},
{"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}
],
"telephones": [
{"type": "work", "pref": 1, "value": "+1 214 555 1212"},
{"type": "fax", "value": "+1 214 555 1213"},
{"type": "mobile", "value": "+1 214 555 1214"}
],
"addresses": [
{"type": "work", "format": "us",
"value": "1234 Main StnSpringfield, TX 78080-1216"},
{"type": "home", "format": "us",
"value": "5678 Main StnSpringfield, TX 78080-1316"}
],
"urls": [
{"type": "work", "value": "http://seankelly.biz/"},
{"type": "home", "value": "http://seankelly.tv/"}
]
}
第二步 讀入Json數據
使用jsonFile讀入數據並註冊成表jsonPerson,而後定義一個查詢jsonQuery
scala>jsonFile("/home/hadoop/upload/class6/nestjson.json").registerTempTable("jsonPerson")
scala>val jsonQuery = sql("select * from jsonPerson")
第三步 查看jsonQuery的schema
scala>jsonQuery.printSchema
第四步 查看jsonQuery的整個運行計劃
scala>jsonQuery.queryExecution
Parquet數據放在配套資源/data/class6/wiki_parquet中,在如下測試中把文件放到 /home/hadoop/upload/class6 路徑下
第一步 讀入Parquet數據
parquet文件讀入並註冊成表parquetWiki,而後定義一個查詢parquetQuery
scala>parquetFile("/home/hadoop/upload/class6/wiki_parquet").registerTempTable("parquetWiki")
scala>val parquetQuery = sql("select * from parquetWiki")
有報錯但不影響使用
第二步 查詢parquetQuery的schema
scala>parquetQuery.printSchema
第三步 查詢parquetQuery的整個運行計劃
scala>parquetQuery.queryExecution
第四步 查詢取樣
scala>parquetQuery.takeSample(false,10,2)
在TestHive類中已經定義了大量的hive0.13的測試數據的表格式,如src、sales等等,在hive-console中能夠直接使用;第一次使用的時候,hive-console會裝載一次。下面咱們使用sales表看看其schema和整個運行計劃。
第一步 讀入測試數據並定義一個查詢hiveQuery
scala>val hiveQuery = sql("select * from sales")
第二步 查看hiveQuery的schema
scala>hiveQuery.printSchema
第三步 查看hiveQuery的整個運行計劃
scala>hiveQuery.queryExecution
第四步 其餘SQL語句的運行計劃
scala>val hiveQuery = sql("select * from (select * from src limit 5) a limit 3")
scala>val hiveQuery = sql("select * FROM (select * FROM src) a")
scala>hiveQuery.where('key === 100).queryExecution.toRdd.collect
scala>sql("select name, age,state as location from people").queryExecution
scala>sql("select name from (select name,state as location from people) a where location='CA'").queryExecution
scala>sql("select sum(age) from people").queryExecution
scala>sql("select sum(age) from people").toDebugString
scala>sql("select state,avg(age) from people group by state").queryExecution
scala>sql("select state,avg(age) from people group by state").toDebugString
scala>sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution
scala>sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString
scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution
scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString
CombineFilters就是合併Filter,在含有多個Filter時發生,以下查詢:
sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution
上面的查詢,在Optimized的過程當中,將age>=19和age<30這兩個Filter合併了,合併成((age>=19) && (age<30))。其實上面還作了一個其餘的優化,就是project的下推,子查詢使用了表的全部列,而主查詢使用了列name,在查詢數據的時候子查詢優化成只查列name。
PushPredicateThroughProject就是project下推,和上面例子中的project同樣
sql("select name from (select name,state as location from people) a where location='CA'").queryExecution
ConstantFolding是常量疊加,用於表達式。以下面的例子:
sql("select name,1+2 from people").queryExecution
Spark是一個快速的內存計算框架,同時是一個並行運算的框架,在計算性能調優的時候,除了要考慮廣爲人知的木桶原理外,還要考慮平行運算的Amdahl定理。
木桶原理又稱短板理論,其核心思想是:一隻木桶盛水的多少,並不取決於桶壁上最高的那塊木塊,而是取決於桶壁上最短的那塊。將這個理論應用到系統性能優化上,系統的最終性能取決於系統中性能表現最差的組件。例如,即便系統擁有充足的內存資源和CPU資源,可是若是磁盤I/O性能低下,那麼系統的整體性能是取決於當前最慢的磁盤I/O速度,而不是當前最優越的CPU或者內存。在這種狀況下,若是須要進一步提高系統性能,優化內存或者CPU資源是毫無用處的。只有提升磁盤I/O性能才能對系統的總體性能進行優化。
Amdahl定理,一個計算機科學界的經驗法則,因吉恩·阿姆達爾而得名。它表明了處理器平行運算以後效率提高的能力。並行計算中的加速比是用並行前的執行速度和並行後的執行速度之比來表示的,它表示了在並行化以後的效率提高狀況。阿姆達爾定律是固定負載(計算總量不變時)時的量化標準。可用公式:來表示。式中
分別表示問題規模的串行份量(問題中不能並行化的那一部分)和並行份量,p表示處理器數量。當
時,上式的極限是
,其中
。這意味着不管咱們如何增大處理器數目,加速比是沒法高於這個數的。
SparkSQL做爲Spark的一個組件,在調優的時候,也要充分考慮到上面的兩個原理,既要考慮如何充分的利用硬件資源,又要考慮如何利用好分佈式系統的並行計算。因爲測試環境條件有限,本篇不能作出更詳盡的實驗數據來講明,只能在理論上加以說明。
SparkSQL在集羣中運行,將一個查詢任務分解成大量的Task分配給集羣中的各個節點來運行。一般狀況下,Task的數量是大於集羣的並行度。好比前面第六章和第七章查詢數據時,shuffle的時候使用了缺省的spark.sql.shuffle.partitions,即200個partition,也就是200個Task:
而實驗的集羣環境卻只能並行3個Task,也就是說同時只能有3個Task保持Running:
這時你們就應該明白了,要跑完這200個Task就要跑200/3=67批次。如何減小運行的批次呢?那就要儘可能提升查詢任務的並行度。查詢任務的並行度由兩方面決定:集羣的處理能力和集羣的有效處理能力。
l對於Spark Standalone集羣來講,集羣的處理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES參數、SPARK_WORKER_CORES參數決定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超過物理機器的實際CPU core;
l集羣的有效處理能力是指集羣中空閒的集羣資源,通常是指使用spark-submit或spark-shell時指定的--total-executor-cores,通常狀況下,咱們不須要指定,這時候,Spark Standalone集羣會將全部空閒的core分配給查詢,而且在Task輪詢運行過程當中,Standalone集羣會將其餘spark應用程序運行完後空閒出來的core也分配給正在運行中的查詢。
綜上所述,SparkSQL的查詢並行度主要和集羣的core數量相關,合理配置每一個節點的core能夠提升集羣的並行度,提升查詢的效率。
高效的數據格式,一方面是加快了數據的讀入速度,另外一方面能夠減小內存的消耗。高效的數據格式包括多個方面:
分佈式計算系統的精粹在於移動計算而非移動數據,可是在實際的計算過程當中,總存在着移動數據的狀況,除非是在集羣的全部節點上都保存數據的副本。移動數據,將數據從一個節點移動到另外一個節點進行計算,不但消耗了網絡IO,也消耗了磁盤IO,下降了整個計算的效率。爲了提升數據的本地性,除了優化算法(也就是修改spark內存,難度有點高),就是合理設置數據的副本。設置數據的副本,這須要經過配置參數並長期觀察運行狀態才能獲取的一個經驗值。
下面是Spark webUI監控Stage的一個圖:
lPROCESS_LOCAL是指讀取緩存在本地節點的數據
lNODE_LOCAL是指讀取本地節點硬盤數據
lANY是指讀取非本地節點數據
l一般讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,儘可能使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關。
對於要查詢的數據,定義合適的數據類型也是很是有必要。對於一個tinyint可使用的數據列,不須要爲了方便定義成int類型,一個tinyint的數據佔用了1個byte,而int佔用了4個byte。也就是說,一旦將這數據進行緩存的話,內存的消耗將增長數倍。在SparkSQL裏,定義合適的數據類型能夠節省有限的內存資源。
對於要查詢的數據,在寫SQL語句的時候,儘可能寫出要查詢的列名,如Select a,b from tbl,而不是使用Select * from tbl;這樣不但能夠減小磁盤IO,也減小緩存時消耗的內存。
在查詢的時候,最終仍是要讀取存儲在文件系統中的文件。採用更優的數據存儲格式,將有利於數據的讀取速度。查看SparkSQL的Stage,能夠發現,不少時候,數據讀取消耗佔有很大的比重。對於sqlContext來講,支持 textFiile、SequenceFile、ParquetFile、jsonFile;對於hiveContext來講,支持AvroFile、ORCFile、Parquet File,以及各類壓縮。根據本身的業務需求,測試並選擇合適的數據存儲格式將有利於提升SparkSQL的查詢效率。
spark應用程序最糾結的地方就是內存的使用了,也是最能體現「細節是魔鬼」的地方。Spark的內存配置項有很多,其中比較重要的幾個是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,能夠充分的利用節點的內存資源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超過節點自己具有的內存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark應用程序時申請使用的內存數量;不要超過節點的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark應用程序在所申請的內存資源中可用於cache的比例
lspark.shuffle.memoryFraction spark應用程序在所申請的內存資源中可用於shuffle的比例
在實際使用上,對於後兩個參數,能夠根據經常使用查詢的內存消耗狀況作適當的變動。另外,在SparkSQL使用上,有幾點建議:
l對於頻繁使用的表或查詢才進行緩存,對於只使用一次的表不須要緩存;
l對於join操做,優先緩存較小的表;
l要多注意Stage的監控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的監控,多思考如何才能Fraction cached的比例更多
對於SparkSQL,還有一個比較重要的參數,就是shuffle時候的Task數量,經過spark.sql.shuffle.partitions來調節。調節的基礎是spark集羣的處理能力和要處理的數據量,spark的默認值是200。Task過多,會產生不少的任務啓動開銷,Task多少,每一個Task的處理時間過長,容易straggle。
優化的方面的內容不少,但大部分都是細節性的內容,下面就簡單地提提:
l 想要獲取更好的表達式查詢速度,能夠將spark.sql.codegen設置爲Ture;
l 對於大數據集的計算結果,不要使用collect() ,collect()就結果返回給driver,很容易撐爆driver的內存;通常直接輸出到分佈式文件系統中;
l 對於Worker傾斜,設置spark.speculation=true 將持續不給力的節點去掉;
l 對於數據傾斜,採用加入部分中間步驟,如聚合後cache,具體狀況具體分析;
l 適當的使用序化方案以及壓縮方案;
l 善於利用集羣監控系統,將集羣的運行情況維持在一個合理的、平穩的狀態;
l 善於解決重點矛盾,多觀察Stage中的Task,查看最耗時的Task,查找緣由並改善;