目前spark sql 主要應用在structure streaming、etl 和 machine learning 的場景上, 它能對結構化的數據進行存儲和操做,結構化的數據能夠來自HIve、JSON、Parquet、JDBC/ODBC等數據源。因爲部門對數據的準確性和一致性要求等業務特色,咱們選擇mysql使用jdbc的方式做爲咱們的數據源,spark 集羣用yarn作爲資源管理,本文主要分享咱們在使用spark sql 過程當中遇到的問題和一些經常使用的開發實踐總結。html
運行環境:spark :2.1.0,hadoop: hadoop-2.5.0-cdh5.3.2 (yarn 資源管理,hdfs),mysql:5.7 ,scala: 2.11, java:1.8java
咱們先來了解一下spark on yarn 任務的運行機制。yarn 的基本思想是將JobTracker的兩個主要功能(資源管理和任務調度/監控)分離成單獨的組件:RM 和 AM;新的資源管理器**ResourceManager(RM)實現全局的全部應用的計算資源分配,應用控制器ApplicationMaster(AM)實現應用的調度和資源的協調;節點管理器NodeManager(NM)**則是每臺機器的代理,處理來自AM的命令,實現節點的監控與報告;容器 Container 封裝了內存、CPU、磁盤、網絡等資源,是資源隔離的基礎,當AM向RM申請資源時,RM爲AM返回的資源即是以Container表示,如上圖,spark master分配的 executor 的執行環境即是containner。目前咱們使用yarn 隊列的方式,能夠進一步的對應用執行進行管理,讓咱們的應用分組和任務分配更加清晰和方便管理。mysql
import com.test.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class SparkSimple01 { public static void main(String[] args) { // 建立spark會話,實質上是SQLContext和HiveContext的組合 SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); // 設置日誌級別,默認會打印DAG,TASK執行日誌,設置爲WARN以後能夠只關注應用相關日誌 sparkSession.sparkContext().setLogLevel("WARN"); // 分區方式讀取mysql表數據 Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties()); predicateSet.show(); } }
爲了確認該查詢對mysql發出的具體sql,咱們先查看一下mysql執行sql日誌,sql
#mysql 命令窗口執行如下命令打開日誌記錄 SHOW VARIABLES LIKE "general_log%"; SET GLOBAL general_log = 'ON';
打開Lenovo.log獲得以上代碼在mysql上的執行狀況: apache
經過分區查詢獲取表數據的方式有如下幾個優勢:api
spark jdbc 讀取msyql表還有直接讀取(沒法讀取大數據量表),指定字段分區讀取(分區不夠均勻)等方式,經過項目實踐總結,以上的分區讀取方式是咱們目前認爲對mysql最友好的方式。 分庫分表的系統也能夠利用這種方式讀取各個表在內存中union全部spark view獲得一張統一的內存表,在業務操做中將分庫分表透明化。若是線上數據表數據量較大的時候,在union以前就須要將spark view經過指定字段的方式查詢,避免on line ddl 在作變動時union表報錯,由於可能存在部分表已經添加新字段,部分表還未加上新字段,而union要求全部表的表結構一致,致使報錯。緩存
咱們都知道 Dataset 的分區是否均勻,對於結果集的並行處理效果有很重要的做用,spark Java版暫時沒法查看partition分區中的數據分佈,這裏用java調用scala 版api方式查看,線上不推薦使用,由於這裏的分區查看使用foreachPartition,多了一次action操做,而且打印出所有數據。網絡
import org.apache.spark.sql.{Dataset, Row} /** * Created by lesly.lai on 2017/12/25. */ class SparkRddTaskInfo { def getTask(dataSet: Dataset[Row]) { val size = dataSet.rdd.partitions.length println(s"==> partition size: $size " ) import scala.collection.Iterator val showElements = (it: Iterator[Row]) => { val ns = it.toSeq import org.apache.spark.TaskContext val pid = TaskContext.get.partitionId println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}") } dataSet.foreachPartition(showElements) } }
仍是用上面讀取mysql數據的例子來演示調用,將predicateSet做爲參數傳入併發
new SparkRddTaskInfo().getTask(predicateSet);
控制檯打印結果app
經過分區數據,咱們能夠看到以前的predicate 方式獲得的分區數就是predicate size 大小,而且按照咱們想要的數據分區方式分佈數據,這對於業務數據的批處理,executor的local cache,spark job執行參數調優都頗有幫助,例如調整spark.executor.cores,spark.executor.memory,GC方式等等。 這裏涉及java和Scala容器轉換的問題,Scala和Java容器庫有不少類似點,例如,他們都包含迭代器、可迭代結構、集合、 映射和序列。可是他們有一個重要的區別。Scala的容器庫特別強調不可變性,所以提供了大量的新方法將一個容器變換成一個新的容器。 在Scala內部,這些轉換是經過一系列「包裝」對象完成的,這些對象會將相應的方法調用轉發至底層的容器對象。因此容器不會在Java和Scala之間拷貝來拷貝去。一個值得注意的特性是,若是你將一個Java容器轉換成其對應的Scala容器,而後再將其轉換回一樣的Java容器,最終獲得的是一個和一開始徹底相同的容器對象(這裏的相贊成味着這兩個對象其實是指向同一片內存區域的引用,容器轉換過程當中沒有任何的拷貝發生)。
自定義函數,能夠簡單方便的實現業務邏輯。
import com.tes.spark.db.ConnectionInfos; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; public class SparkSimple02 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate(); sparkSession.sparkContext().setLogLevel("WARN"); Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties()); originSet.cache().createOrReplaceTempView("people"); // action操做 打印原始結果集 originSet.show(); // 註冊自定義函數 sparkSession.sqlContext().udf().register("genderUdf", gender -> { if("M".equals(gender)){ return "男"; }else if("F".equals(gender)){ return "女"; } return "未知"; }, DataTypes.StringType); // 查詢結果 Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people "); // action操做 打印函數處理後結果集 peopleDs.show(); } }
執行結果:
在sql中用使用java代碼實現邏輯操做,這爲sql的處理邏輯能力提高了好幾個層次,將函數抽取成接口實現類能夠方便的管理和維護這類自定義函數類。此外,spark也支持自定義內聚函數,窗口函數等等方式,相比傳統開發實現的功能方式,使用spark sql開發效率能夠明顯提升。
最近線上任務遇到一個獲取mysql connection blocked的問題,從spark ui的executor thread dump 能夠看到blocked的棧信息,如圖:
查看代碼發現DBConnectionManager 調用了 spark driver註冊mysql driver 使用同步方式的代碼
看到這裏咱們很容易以爲是註冊driver 致使的blocked,其實再仔細看回報錯棧信息,咱們會發現,這裏的getConnection是在dataset 的foreachpartition 中調用,而且是在每次db 操做時獲取一次getConnection 操做,這意味着在該分區下有屢次重複的在同步方法中註冊driver獲取鏈接的操做,看到這裏線程blocked的緣由就很明顯了,這裏咱們的解決方式是: a. 在同個partition中的connection 複用進行db操做 b. 爲了不partition數據分佈不均致使鏈接active時間過長,加上定時釋放鏈接再從鏈接池從新獲取鏈接操做 經過以上的鏈接處理,解決了blocked問題,tps也達到了4w左右。
咱們都知道,利用spark 集羣分區並行能力,能夠很容易實現較高的併發處理能力,若是是併發的批處理,那並行處理的能力能夠更好,可是,mysql 在面對這麼高的併發的時候,是有點吃不消的,所以咱們須要適當下降spark 應用的併發和上下游系統和平相處。控制spark job併發能夠經過不少參數配置組合、集羣資源、yarn隊列限制等方式實現,通過實踐,咱們選擇如下參數實現:
#須要關閉動態內存分配,其餘配置才生效 spark.dynamicAllocation.enabled = false spark.executor.instances = 2 spark.executor.cores = 2
這裏發現除了設置executor配置以外,還須要關閉spark的動態executor分配機制,spark 的ExecutorAllocationManager 是 一個根據工做負載動態分配和刪除 executors 的管家, ExecutorAllocationManager 維持一個動態調整的目標executors數目, 而且按期同步到資源管理者,也就是 yarn ,啓動的時候根據配置設置一個目標executors數目, spark 運行過程當中會根據等待(pending)和正在運行(running)的tasks數目動態調整目標executors數目,所以須要關閉動態配置資源才能達到控制併發的效果。
除了executor是動態分配以外,Spark 1.6 以後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,能夠動態佔用對方的空閒區域,咱們先看看worker中的內存規劃是怎樣的:
worker 能夠根據實例配置,內存配置,cores配置動態生成executor數量,每個executor爲一個jvm進程,所以executor 的內存管理是創建在jvm的內存管理之上的。從本文第一張spark on yarn圖片能夠看到,yarn模式的 executor 是在yarn container 中運行,所以container的內存分配大小一樣能夠控制executor的數量。 RDD 的每一個 Partition 通過處理後惟一對應一個 Block(BlockId 的格式爲 rdd_RDD-ID_PARTITION-ID ),從上圖能夠看出,開發過程當中經常使用的分區(partition)數據是以block的方式存儲在堆內的storage內存區域的,還有爲了減小網絡io而作的broadcast數據也存儲在storage區域;堆內的另外一個區域內存則主要用於緩存rdd shuffle產生的中間數據;此外,worker 中的多個executor還共享同一個節點上的堆外內存,這部份內存主要存儲經序列化後的二進制數據,使用的是系統的內存,能夠減小沒必要要的開銷以及頻繁的GC掃描和回收。
爲了更好的理解executor的內存分配,咱們再來看一下executor各個內存塊的參數設置:
瞭解spark 內存管理的機制後,就能夠根據mysql的處理能力來設置executor的併發處理能力,讓咱們的spark 應用處理能力收放自如。調整executor數量還有另一個好處,就是集羣資源規劃,目前咱們的集羣隊列是yarn fair 模式,
先看看yarn fair模式,舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啓動一個job而B沒有任務時,A會得到所有集羣資源;當B啓動一個job後,A的job會繼續運行,當A的job執行完釋放資源後,不過一下子以後兩個任務會各自得到一半的集羣資源。若是此時B再啓動第二個job而且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用於四分之一的集羣資源,而A的job仍然用於集羣一半的資源,結果就是資源最終在兩個用戶之間平等的共享。
在這種狀況下,即便有多個隊列執行任務,fair模式容易在資源空閒時佔用其餘隊列資源,一旦佔用時間過長,就會致使其餘任務都卡住,這也是咱們遇到的實際問題。若是咱們在一開始能評估任務所用的資源,就能夠在yarn隊列的基礎上指定應用的資源,例如executor的內存,cup,實例個數,並行task數量等等參數來管理集羣資源,這有點相似於yarn Capacity Scheduler 隊列模式,但又比它有優點,由於spark 應用能夠經過spark context的配置來動態的設置,不用設置yarn 隊列後重啓集羣,稍微靈活了一點。
除了以上提到的幾點總結,咱們還遇到不少其餘的疑問和實踐,例如,何時出現shuffle;如何比較好避開或者利用shuffle;Dataset 的cache操做會不會有性能問題,如何從spark ui中分析定位問題;spark 任務異常處理等等,暫時到這裏,待續...
vip.fcs
參考資料: http://www.cnblogs.com/yangsy0915/p/5118100.html https://mp.weixin.qq.com/s/KhHy1mURJBiPMGqkl4-JEw https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral https://docs.scala-lang.org/zh-cn/overviews/collections/conversions-between-java-and-scala-collections.html https://www.jianshu.com/p/e7db5970e68c