spark sql 在mysql的應用實踐

前言

目前spark sql 主要應用在structure streaming、etl 和 machine learning 的場景上, 它能對結構化的數據進行存儲和操做,結構化的數據能夠來自HIve、JSON、Parquet、JDBC/ODBC等數據源。因爲部門對數據的準確性和一致性要求等業務特色,咱們選擇mysql使用jdbc的方式做爲咱們的數據源,spark 集羣用yarn作爲資源管理,本文主要分享咱們在使用spark sql 過程當中遇到的問題和一些經常使用的開發實踐總結。html

運行環境:spark :2.1.0,hadoop: hadoop-2.5.0-cdh5.3.2 (yarn 資源管理,hdfs),mysql:5.7 ,scala: 2.11, java:1.8java


spark on yarn 

spark on yarn 運行機制

咱們先來了解一下spark on yarn 任務的運行機制。yarn 的基本思想是將JobTracker的兩個主要功能(資源管理和任務調度/監控)分離成單獨的組件:RM 和 AM;新的資源管理器**ResourceManager(RM)實現全局的全部應用的計算資源分配,應用控制器ApplicationMaster(AM)實現應用的調度和資源的協調;節點管理器NodeManager(NM)**則是每臺機器的代理,處理來自AM的命令,實現節點的監控與報告;容器 Container 封裝了內存、CPU、磁盤、網絡等資源,是資源隔離的基礎,當AM向RM申請資源時,RM爲AM返回的資源即是以Container表示,如上圖,spark master分配的 executor 的執行環境即是containner。目前咱們使用yarn 隊列的方式,能夠進一步的對應用執行進行管理,讓咱們的應用分組和任務分配更加清晰和方便管理。mysql

yarn 隊列


開發實踐

1. 讀取mysql表數據

import com.test.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;

public class SparkSimple01 {

    public static void main(String[] args) {

        // 建立spark會話,實質上是SQLContext和HiveContext的組合
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();

        // 設置日誌級別,默認會打印DAG,TASK執行日誌,設置爲WARN以後能夠只關注應用相關日誌
        sparkSession.sparkContext().setLogLevel("WARN");

        // 分區方式讀取mysql表數據
        Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people",
                (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties());

        predicateSet.show();

    }
}

爲了確認該查詢對mysql發出的具體sql,咱們先查看一下mysql執行sql日誌,sql

#mysql 命令窗口執行如下命令打開日誌記錄
SHOW VARIABLES LIKE "general_log%";
SET GLOBAL general_log = 'ON';

mysql log.png

打開Lenovo.log獲得以上代碼在mysql上的執行狀況: 分區執行sqlapache

經過分區查詢獲取表數據的方式有如下幾個優勢:api

  • 利用表索引查詢提升查詢效率
  • 自定義sql條件使分區數據更加均勻,方便後面的並行計算
  • 分區併發讀取能夠經過控制併發控制對mysql的查詢壓力
  • 能夠讀取大數據量的mysql表

spark jdbc 讀取msyql表還有直接讀取(沒法讀取大數據量表),指定字段分區讀取(分區不夠均勻)等方式,經過項目實踐總結,以上的分區讀取方式是咱們目前認爲對mysql最友好的方式。 分庫分表的系統也能夠利用這種方式讀取各個表在內存中union全部spark view獲得一張統一的內存表,在業務操做中將分庫分表透明化。若是線上數據表數據量較大的時候,在union以前就須要將spark view經過指定字段的方式查詢,避免on line ddl 在作變動時union表報錯,由於可能存在部分表已經添加新字段,部分表還未加上新字段,而union要求全部表的表結構一致,致使報錯。緩存

2. Dataset 分區數據查看

咱們都知道 Dataset 的分區是否均勻,對於結果集的並行處理效果有很重要的做用,spark Java版暫時沒法查看partition分區中的數據分佈,這裏用java調用scala 版api方式查看,線上不推薦使用,由於這裏的分區查看使用foreachPartition,多了一次action操做,而且打印出所有數據。網絡

import org.apache.spark.sql.{Dataset, Row}

/**
  * Created by lesly.lai on 2017/12/25.
  */
class SparkRddTaskInfo {
  def getTask(dataSet: Dataset[Row]) {
    val size = dataSet.rdd.partitions.length
    println(s"==> partition size: $size " )
    import scala.collection.Iterator
    val showElements = (it: Iterator[Row]) => {
      val ns = it.toSeq
      import org.apache.spark.TaskContext
      val pid = TaskContext.get.partitionId
      println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
    }
    dataSet.foreachPartition(showElements)
  }
}

仍是用上面讀取mysql數據的例子來演示調用,將predicateSet做爲參數傳入併發

new SparkRddTaskInfo().getTask(predicateSet);

控制檯打印結果app

分區結果.png

經過分區數據,咱們能夠看到以前的predicate 方式獲得的分區數就是predicate size 大小,而且按照咱們想要的數據分區方式分佈數據,這對於業務數據的批處理,executor的local cache,spark job執行參數調優都頗有幫助,例如調整spark.executor.cores,spark.executor.memory,GC方式等等。 這裏涉及java和Scala容器轉換的問題,Scala和Java容器庫有不少類似點,例如,他們都包含迭代器、可迭代結構、集合、 映射和序列。可是他們有一個重要的區別。Scala的容器庫特別強調不可變性,所以提供了大量的新方法將一個容器變換成一個新的容器。 在Scala內部,這些轉換是經過一系列「包裝」對象完成的,這些對象會將相應的方法調用轉發至底層的容器對象。因此容器不會在Java和Scala之間拷貝來拷貝去。一個值得注意的特性是,若是你將一個Java容器轉換成其對應的Scala容器,而後再將其轉換回一樣的Java容器,最終獲得的是一個和一開始徹底相同的容器對象(這裏的相贊成味着這兩個對象其實是指向同一片內存區域的引用,容器轉換過程當中沒有任何的拷貝發生)。

3. sql 自定義函數

自定義函數,能夠簡單方便的實現業務邏輯。

import com.tes.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

public class SparkSimple02 {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();
        sparkSession.sparkContext().setLogLevel("WARN");
        Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());
        originSet.cache().createOrReplaceTempView("people");

        // action操做 打印原始結果集
        originSet.show();

        // 註冊自定義函數
        sparkSession.sqlContext().udf().register("genderUdf", gender -> {
            if("M".equals(gender)){
                return  "男";
            }else if("F".equals(gender)){
                return  "女";
            }
            return "未知";
        }, DataTypes.StringType);

        // 查詢結果
        Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people ");

        // action操做 打印函數處理後結果集
        peopleDs.show();
    }
}

執行結果:

image.png

在sql中用使用java代碼實現邏輯操做,這爲sql的處理邏輯能力提高了好幾個層次,將函數抽取成接口實現類能夠方便的管理和維護這類自定義函數類。此外,spark也支持自定義內聚函數,窗口函數等等方式,相比傳統開發實現的功能方式,使用spark sql開發效率能夠明顯提升。

4. mysql 查詢鏈接複用

最近線上任務遇到一個獲取mysql connection blocked的問題,從spark ui的executor thread dump 能夠看到blocked的棧信息,如圖:

connection blocked.png

查看代碼發現DBConnectionManager 調用了 spark driver註冊mysql driver 使用同步方式的代碼

driverRegister.png

看到這裏咱們很容易以爲是註冊driver 致使的blocked,其實再仔細看回報錯棧信息,咱們會發現,這裏的getConnection是在dataset 的foreachpartition 中調用,而且是在每次db 操做時獲取一次getConnection 操做,這意味着在該分區下有屢次重複的在同步方法中註冊driver獲取鏈接的操做,看到這裏線程blocked的緣由就很明顯了,這裏咱們的解決方式是: a. 在同個partition中的connection 複用進行db操做 b. 爲了不partition數據分佈不均致使鏈接active時間過長,加上定時釋放鏈接再從鏈接池從新獲取鏈接操做 經過以上的鏈接處理,解決了blocked問題,tps也達到了4w左右。

5. executor 併發控制

咱們都知道,利用spark 集羣分區並行能力,能夠很容易實現較高的併發處理能力,若是是併發的批處理,那並行處理的能力能夠更好,可是,mysql 在面對這麼高的併發的時候,是有點吃不消的,所以咱們須要適當下降spark 應用的併發和上下游系統和平相處。控制spark job併發能夠經過不少參數配置組合、集羣資源、yarn隊列限制等方式實現,通過實踐,咱們選擇如下參數實現:

#須要關閉動態內存分配,其餘配置才生效
spark.dynamicAllocation.enabled = false
spark.executor.instances = 2
spark.executor.cores = 2

image.png

這裏發現除了設置executor配置以外,還須要關閉spark的動態executor分配機制,spark 的ExecutorAllocationManager 是 一個根據工做負載動態分配和刪除 executors 的管家, ExecutorAllocationManager 維持一個動態調整的目標executors數目, 而且按期同步到資源管理者,也就是 yarn ,啓動的時候根據配置設置一個目標executors數目, spark 運行過程當中會根據等待(pending)和正在運行(running)的tasks數目動態調整目標executors數目,所以須要關閉動態配置資源才能達到控制併發的效果。

除了executor是動態分配以外,Spark 1.6 以後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,能夠動態佔用對方的空閒區域,咱們先看看worker中的內存規劃是怎樣的:

worker memory schedule.png

worker 能夠根據實例配置,內存配置,cores配置動態生成executor數量,每個executor爲一個jvm進程,所以executor 的內存管理是創建在jvm的內存管理之上的。從本文第一張spark on yarn圖片能夠看到,yarn模式的 executor 是在yarn container 中運行,所以container的內存分配大小一樣能夠控制executor的數量。 RDD 的每一個 Partition 通過處理後惟一對應一個 Block(BlockId 的格式爲 rdd_RDD-ID_PARTITION-ID ),從上圖能夠看出,開發過程當中經常使用的分區(partition)數據是以block的方式存儲在堆內的storage內存區域的,還有爲了減小網絡io而作的broadcast數據也存儲在storage區域;堆內的另外一個區域內存則主要用於緩存rdd shuffle產生的中間數據;此外,worker 中的多個executor還共享同一個節點上的堆外內存,這部份內存主要存儲經序列化後的二進制數據,使用的是系統的內存,能夠減小沒必要要的開銷以及頻繁的GC掃描和回收。

爲了更好的理解executor的內存分配,咱們再來看一下executor各個內存塊的參數設置: executor jvm off-heap.png

瞭解spark 內存管理的機制後,就能夠根據mysql的處理能力來設置executor的併發處理能力,讓咱們的spark 應用處理能力收放自如。調整executor數量還有另一個好處,就是集羣資源規劃,目前咱們的集羣隊列是yarn fair 模式, yarn fair 集羣模式.png

先看看yarn fair模式,舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啓動一個job而B沒有任務時,A會得到所有集羣資源;當B啓動一個job後,A的job會繼續運行,當A的job執行完釋放資源後,不過一下子以後兩個任務會各自得到一半的集羣資源。若是此時B再啓動第二個job而且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用於四分之一的集羣資源,而A的job仍然用於集羣一半的資源,結果就是資源最終在兩個用戶之間平等的共享。

在這種狀況下,即便有多個隊列執行任務,fair模式容易在資源空閒時佔用其餘隊列資源,一旦佔用時間過長,就會致使其餘任務都卡住,這也是咱們遇到的實際問題。若是咱們在一開始能評估任務所用的資源,就能夠在yarn隊列的基礎上指定應用的資源,例如executor的內存,cup,實例個數,並行task數量等等參數來管理集羣資源,這有點相似於yarn Capacity Scheduler 隊列模式,但又比它有優點,由於spark 應用能夠經過spark context的配置來動態的設置,不用設置yarn 隊列後重啓集羣,稍微靈活了一點。

除了以上提到的幾點總結,咱們還遇到不少其餘的疑問和實踐,例如,何時出現shuffle;如何比較好避開或者利用shuffle;Dataset 的cache操做會不會有性能問題,如何從spark ui中分析定位問題;spark 任務異常處理等等,暫時到這裏,待續...

vip.fcs

參考資料: http://www.cnblogs.com/yangsy0915/p/5118100.html https://mp.weixin.qq.com/s/KhHy1mURJBiPMGqkl4-JEw https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral https://docs.scala-lang.org/zh-cn/overviews/collections/conversions-between-java-and-scala-collections.html https://www.jianshu.com/p/e7db5970e68c

相關文章
相關標籤/搜索