複雜業務下,咱們爲什麼選擇 Akka 做爲異步通訊框架?

Akka 是 Scala 語言實現的一套基於 Actor 模型的異步通訊框架,可用於構建高併發、分佈式、可容錯、事件驅動的基於 JVM 的應用,在 Spark 中曾被用於實現進程、節點間通訊,在實際項目中協助咱們成功搭建了知足業務需求的模型部署平臺。算法

項目背景json

某國內大型連鎖餐飲企業旗下擁有大量門店。餐廳門店的每日生產、定貨、排班都依賴於每日客單量預估的合理性,其內部數據團隊實現了一套預估模型,須要 TalkingData 幫助構建一個工程化平臺以支撐模型的訓練和部署,從而將模型真正地應用到實際生產環節中。bash

通過交流,咱們發如今實際生產環境中,在各方面存在一些問題:網絡

  • 異步:全部門店的前日銷售、業務等數據均由各自門店的店長負責整合上傳。上傳的開始時間、結束時間、數據的完整性等均不肯定。而模型訓練和預測均依賴這部分數據,這就意味這沒法爲模型訓練和預測設置統一的開始入口。
    併發

  • 高併發:除了一些特殊類型的門店,絕大多數門店的營業時間相對固定,從店長決定整理上傳銷售數據,到準備物料、排班準備第二天營業,留給模型訓練和模型預測回吐預測結果的時間大概爲 3 小時。若是每一個門店的預測指標有 2 至 3 項,那麼須要有足夠的調度能力在規定時間內完成大概 2 萬次模型訓練加預測流程。app

  • 容錯:因爲門店數量衆多且狀況各不相同,仍然有不少潛在的因素可能致使流程出錯或失敗。原則上,某次流程的失敗不該該對其餘流程形成任何影響,每一個流程在平臺層面應該成爲互相獨立的任務。
    框架

所以,咱們須要一套輕量化的分佈式服務框架,來實現知足上述需求的模型訓練預測平臺,並在必定程度上保證平臺的可拓展性。結合此前團隊內的技術積累,最終選擇了 Akka 框架用於實現平臺的內部通訊。
異步

選型過程tcp

消息驅動方式——流程異步化分佈式

一次完整的預測任務包括:訓練數據準備→模型訓練→模型結果導出→預測數據準備→預測結果導出,其中數據準備步驟在時間上不肯定,模型相關步驟在執行結果上不肯定,若是採用同步模型,將會產生大量的等待線程,佔用浪費大量資源。在 Actor 模型中,每一個 Actor 做爲一個基本計算單元,迴應接收到的消息,同時並行的:

  • 發送有限數量的消息給其餘 Actor

  • 建立有限數量的新 Actor

  • 指定接受到下一個消息時的行爲

上述操做沒有順序執行的假設,所以能夠並行進行。發送者與已經發送的消息解耦,能夠進行無需等待的異步通訊。

Actor 模型通訊方式

Akka 中的 Actor 本質上就是接收消息並採起行動處理消息的對象,是封裝狀態和行爲的對象,它們惟一的通訊方式是交換消息——把消息存放在接收方的郵箱裏。Actor 天然造成樹形結構,這種結構的精髓在於任務被拆開、委託,直到任務小到能夠被完整地處理。所以,咱們將預測任務的各個步驟拆分抽象,並建立類型消息與步驟對應,將每一個步驟交給線程級別的 Actor 執行處理,經過發送不一樣類型的消息來觸發建立不一樣操做的 Actor,讓整個預測流程無需等待。

結構——應對高併發

因爲絕大多數門店的營業時間大體相同,平臺在流量上會有明顯的峯值和低谷,在低谷期間平臺須要儘量減小資源佔有量,而在流量峯值來臨時平臺要可以及時響應,保證足夠的可用性。

通過討論,咱們肯定了採用 Master-Worker 模式的平臺結構,Master 負責接收與分配任務,Worker 負責處理執行具體的模型任務。

Master 和 Worker 均爲獨立的 ActorSystem,管理內部不不同操做邏輯的 Actor,在空閒狀態下佔有資源很小。Actor 爲線程級別,一樣僅佔用極少許資源,生命週期由 ActorSystem 統一管理。少許請求時,Actor 線程具備很高的複用率,請求併發高時,ActorSystem 會建立大量的 Actor 線程用來承接請求,保證可用性。

Akka 中 Actor 的生命週期

子 Actor——模塊化提升容錯

每一個預測任務的模型相關步驟均存在失敗的可能性,此外,數據準備過程當中的網絡波動、內容校驗出錯等狀況,都會致使當前預測任務的失敗。對於失敗的任務,咱們但願可以儘量記錄錯誤信息,爲重跑提供先決條件。

在 Akka 中,構建了父子 Actor 的樹形監督結構,提供 Actor 的監督機制以保證容錯性,把處理響應錯誤的責任交給出錯對象之外的實體。父 Actor 建立子 Actor 來委託處理子任務,同時便會自動地監管它們。子 Actor 列表維護在父 Actor 的上下文中,父 Actor 能夠訪問它。

Akka 中的 Actor 結構

經過更進一步的拆分細化,咱們將 Worker 端的 Actor 分爲 Prepare 和 Executor 兩種,Prepare 爲主要負責數據準備步驟,Executor 負責模型相關步驟,統一由 Worker 端的父 Actor 管理,錯誤和異常均向上層拋出,由 Worker 端的父 Actor 記錄併發送給的錯誤收集模塊統一處理。

實踐應用

ActorSystem

建立 ActorSystem 時,默認將在 classpath 中尋找 application.conf、 application.json 和 application.properties,並自動加載:

val system=ActorSystem("RsModelActorSystem")
val system=ActorSystem("RsModelActorSystem", ConfigFactory.load()) // 同上複製代碼

若是想要使用本身的配置文件,能夠經過 ConfigFactory 來配置加載:

val system = ActorSystem("UniversityMessageSystem", 
           ConfigFactory.load("own-application.conf")) 


        val config = ConfigFactory.parseString(
              s""" |akka.remote.netty.tcp.hostname = $host |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.enabled-transport = akka.remote.netty.tcp |akka.remote.netty.tcp.port = 2445 """.stripMargin)
        val system = ActorSystem("RsModelActorSystem",
            config.withFallback(ConfigFactory.load())) // 同上
複製代碼


ActorSystem 的配置參數中有大量參數能夠自定義,須要根據實際須要修改,例如在該項目中,後期單個算法任務對象大小超過了 Akka remote 默認包大小 128000 bytes,須要修改參數 akka.remote.netty.tcp.maximum-frame-size

Actor

一個 Actor 包含了狀態、行爲、一個郵箱、子 Actor 和一個監管策略,全部這些封裝在一個 Actor 引用裏。Actor 對象一般包含一些變量來反映其所處的可能狀態,Akka-actor 自身的輕量線程與系統的其餘部分徹底隔離,所以無須擔憂併發問題。每當一個消息被處理,它會與 Actor 的當前行爲進行匹配。行爲是一個函數,它定義了在某個時間點處理當前消息所要採起的動做,須要結合實際需求編寫具體邏輯。Actor 的郵箱是鏈接發送者與接收者的紐帶,每一個 Actor 有且僅有一個郵箱,全部的發來的消息都在郵箱裏排隊。能夠有不一樣策略的郵箱實現供選擇,缺省時爲 FIFO。

編寫邏輯

在 Actor 類中,主要邏輯均在 receive 方法中實現,經過偏函數方法,執行並返回對應的邏輯:

//ActorLogging 提供 Actor 內部的日誌輸出
        class RsActor extends Actor with ActorLogging {
              override def receive: Receive = {
                case MapMessage(parameters) =>
                      println(parameters.get("code"))

                case MapKeyMessage(parameters, key) =>
                      println(parameters.get(key))

                case StringMessage(msg) =>
                      println(msg.getBytes().length)

                case o: Object =>
                      println(o.getClass)

                case _: AnyRef =>
                      println("233")
              }
        }複製代碼

生成引用

生成一個能夠接收消息的 Actor 實例主要有兩個方法:

// 生成一個基於本地類的 Actor 實例
        val rsActor = system.actorOf(Props[RsActor], "rsActor")
        // 生成一個基於遠程地址的 Actor 實例
        val rmActor = 
            system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor")

        //  使用! 向對應的 Actor 實例發送消息
        rsActor ! StringMessage("test")
        rmActor ! MapMessage(Map("code"->"233"))複製代碼

Message

Akka 中對傳遞的消息內容並無太嚴格要求,能夠是基本數據類型,也能夠是支持序列化的對象:

//scala 的 case class 便於簡潔地建立消息類
        case class StringMessage(msg: String) extends Serializable
        case class MapMessage(parameters: Map[String, String]) extends Serializable
        case class MapKeyMessage(parameters: Map[String, String], key: String) extends Serializable複製代碼

其餘

Akka 做爲一款被普遍使用的開源工具,在實際項目中體現出了不少的優點,異步的消息驅動方式也給咱們提供了一套新的思路和實現方法。


做者介紹:李天燁,TalkingData 數據科學家。畢業於東北大學,任職於 TalkingData 數據科學團隊,從事數據科學自動化相關工做。

本文轉自:InfoQ

相關文章
相關標籤/搜索