死磕Spark事件總線

Spark中大量採用事件監聽方式,實現driver端的組件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的java

觀察者模式和監聽器

在設計模式中有一個觀察者模式,該模式創建一種對象與對象之間的依賴關係,一個對象狀態發生改變時當即通知其餘對象,其餘對象就據此做出相應的反應。其中發生改變的對象稱之爲觀察目標(也有叫主題的),被通知的對象稱之爲觀察者,能夠有多個觀察者註冊到一個觀察目標中,這些觀察者之間沒有聯繫,其數量能夠根據須要增減。編程

image-20210710162453712

事件驅動的異步化編程

Spark-Core內部的事件框架實現了基於事件的異步化編程模式。它的最大好處是能夠提高應用程序對物理資源的充分利用,能最大限度的壓榨物理資源,提高應用程序的處理效率。缺點比較明顯,下降了應用程序的可讀性。Spark的基於事件的異步化編程框架由事件框架和異步執行線程池組成,應用程序產生的Event發送給ListenerBus,ListenerBus再把消息廣播給全部的Listener,每一個Listener收到Event判斷是否本身感興趣的Event,如果,會在Listener獨享的線程池中執行Event所對應的邏輯程序塊。下圖展現Event、ListenerBus、Listener、Executor的關係,從事件生成、事件傳播、事件解釋三個方面的視角來看。 img設計模式

咱們從線程的視角來看,看異步化處理。異步化處理體如今事件傳播、事件解釋兩個階段,其中事件解釋的異步化實現了咱們的基於事件的異步化編程。app

在這裏插入圖片描述

Spark的實現

Spark-Core、Spark-Streaming採用了分類的思路(分而治之)進行管理,每一大類事件都有獨自的Event、ListenerBus框架

在這裏插入圖片描述

Event

Spark-Core的核心事件trait是SparkListenerEvent,Spark-Straming的核心事件trait是StreamingListenerEvent異步

下圖是各類事件實體類:ide

image-20210709232059314

image-20210709232140439

咱們在定義事件須要注意哪些方面呢?咱們以SparkListenerTaskStart爲例,分析一個事件擁有哪些特徵。post

  1. 見名知義,SparkListenerTaskStart,一看名字咱們就能猜到是SparkListener的一個任務啓動事件。
  2. 觸發條件,一個事件的觸發條件必須清晰,可以清晰的描述一個行爲,且行爲宿主最好是惟一的。SparkListenerTaskStart事件生成的宿主是DAGScheduler,在DAGScheduler產生BeginEvent事件後生成SparkListenerTaskStart。
  3. 事件傳播,事件傳播可選擇Point-Point或者BroadCast,這個可根據業務上的須要權衡、選擇。Spark-Core、Spark-Streaming的事件框架採用BroadCast模式。
  4. 事件解釋,一個事件能夠有一個或者多個解釋。Spark-Core、Spark-Streaming因爲採用BroadCast模式,因此支持Listener對事件解釋,原則一個Listener對一個事件只有一種解釋。AppStatusListener、EventLoggingListener、ExecutorAllocationManager等分別對SparkListenerTaskStart作了解釋。 咱們在設計事件框架上可根據實際須要借鑑以上四點,設計一個最恰當的事件框架。

Listner

Spark-Core的核心監聽triat是SparkListener,Spark-Streaming的核心監聽triat StreamingListener,二者都表明了一類監聽的抽象ui

下圖是一些監聽實體類:spa

image-20210709232805895

image-20210709232901521

ListenerBus

監聽器總線對象,Spark程序在運行的過程當中,Driver端的不少功能都依賴於事件的傳遞和處理,而事件總線在這中間發揮着相當重要的紐帶做用。事件總線經過異步線程,提升了Driver執行的效率。Listener註冊到ListenerBus對象中,而後經過ListenerBus對象來實現事件監聽(相似於計算機與周邊設備之間的關係)

其start方法直接啓動一個dispatchThread,其核心邏輯就是不停地在一個事件隊列eventQueue裏取出事件,若是事件合法且LiverListenerBus沒有被關停,就將事件通知給全部註冊的listener中

其dispatch方法就是向事件隊列裏添加相應的事件。

ListenerBus用於管理全部的Listener,Spark-Core和Spark-Streaming公用相同的trait ListenerBus, 最終都是使用AsyncEventQueue類對Listener進行管理。

image-20210709233404950

LiveListenerBus:

管理全部註冊的Listener,爲一類Listener建立一個惟一的AsyncEventQueue,廣播Event到全部的Listener。默承認提供四類AsyncEventQueue分別爲‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。目前Spark-Core並無放開類別設置,意謂着最多隻能有上述四類,從設計的嚴謹上來說分類並非越多越好,每多一個類別,就會多一個AsyncEventQueue實例,每一個實例中會包含一個事件傳播的線程,對系統的資源佔用仍是比較多的。

異步事件處理線程listenerThread

  private val listenerThread = new Thread(name) {
    setDaemon(true) //線程自己設爲守護線程 
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire()//不斷獲取信號量,信號量減一,能獲取到說明還有事件未處理
          self.synchronized {
            processingEvent = true
          }
          try {
            val event = eventQueue.poll  //獲取事件, remove() 和 poll() 方法都是從隊列中刪除第一個元素(head)。
            if (event == null) {
              // 此時說明沒有事件,但仍是拿到信號量了,這說明stop方法被調用了
              // 跳出while循環,關閉守護進程線程
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            // 調用ListenerBus的postxToAll(event: E)方法
            postxToAll(event)
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

核心屬性

private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)
//存放事件
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
// 表示隊列中產生和使用的事件數量的計數器,這個信號量是爲了不消費者線程空跑
private val eventLock = new Semaphore(0)

核心方法

start

LiveListenerBus在SparkContext的setupAndStartListenerBus中被初始化,並調用start方法啓動LiveListenerBus。

  def start(): Unit = {
    if (started.compareAndSet(false, true)) { 
      listenerThread.start() //啓動消費者線程
    } else {
      throw new IllegalStateException(s"$name already started!")
    }

stop

中止LiveListenerBus,它將等待隊列事件被處理,但在中止後丟掉全部新的事件。須要注意stop可能會致使長時間的阻塞,執行stop方法的線程會被掛起,直到全部的AsyncEventQueue(默認四個)中的dispatch線程都退出後執行stop主法的線程纔會被喚醒。

  def stop(): Unit = {
    if (!started.get()) {
      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
    }
    if (stopped.compareAndSet(false, true)) {
      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
      // `stop` is called.
      // 釋放一個信號量,但此時是沒有事件的,從而listenerThread會拿到一個空事件,從而知道該中止了
      eventLock.release()
      //而後等待消費者線程自動關閉
      listenerThread.join()
    } else {
      // Keep quiet
    }
  }

post

採用廣播的方式事件傳播,這個過程很快,主線程只須要把事件傳播給AsyncEventQueue便可,最後由AsyncEventQueue再廣播給相應的Listener

def post(event: SparkListenerEvent): Unit = {
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    // 在事件隊列隊尾添加事件
    // add()和offer()區別:二者都是往隊列尾部插入元素,不一樣的時候,當超出隊列界限的時候,add()方法是拋出異常讓你處理,而offer()方法是直接返回false
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      //若是成功加入隊列,則在信號量中加一
      eventLock.release()
    } else {
      // 若是事件隊列超過其容量,則將刪除新的事件,這些子類將被通知到刪除事件。
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }

    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      // Don't log too frequently   日誌不要太頻繁
      // 若是上一次,隊列滿了EVENT_QUEUE_CAPACITY=1000設置的值,就丟掉,而後記錄一個時間,若是一直持續丟掉,那麼每過60秒記錄一第二天志,否則日誌會爆滿的
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          // 記錄一個warn日誌,表示這個事件,被丟棄了
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }

完整流程

image-20210710220515744

  1. 圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是經過調用LiveListenerBus的post方法將消息提交給事件隊列,每post一個事件,信號量就加一。

  2. listenerThread不停的獲取信號量,而後從事件隊列中取出事件,取到事件,則調用postForAll把事件分發給已註冊的監聽器,不然,就是取到空事件,它明白這是事件總線搞的鬼,它調用了stop可是每post事件,從而中止事件總線線程。

相關文章
相關標籤/搜索