akka框架——異步非阻塞高併發處理框架

akka actor, akka cluster

akka是一系列框架,包括akka-actor, akka-remote, akka-cluster, akka-stream等,分別具備高併發處理模型——actor模型,遠程通訊,集羣管理,流處理等功能。html

akka支持scala和java等JVM編程語言。java

akka actor

akka actor是一個actor模型框架。actor模型是一種將行爲定義到actor,actor間經過消息通訊,消息發送異步進行,消息處理(在actor內)同步有序進行的一種高併發、非阻塞式編程模型。node

Actor模型優:git

  • event-driven model 事件驅動
  • strong isolation principles - 強隔離原則。 actor只應該處理消息,不應有其餘方法接口,不保存狀態、不共享狀態
  • location transparency acto的物理位置對用戶不可見,用戶看到actor視圖邏輯上是一致的,儘管物理位置不一樣
  • lightweight 輕量級

actor的層級結構;actor名字與路徑、地址;actor的消息收件箱;發送消息的異步性;actor消息處理的有序性;actor按序挨個處理消息(而非併發);github

case class MsgA(data:Type)
case class MsgB(x:X)

class SomeActor extends Actor {
    def receive()={
        case MsgA(d)=>
        case MsgB(x)=>
    }
}

val system=ActorSystem("sysname")
val act:ActorRef=system.actorOf(p:Props[], "act-name")
act ! MsgA("data")
act ! MsgB(xxx)

.tell() :Fire-Forgetspring

.ask() : Send-And-Receive-Futureapache

阻塞程序等待消息返回結果:編程

import akka.pattern._
import scala.concurrent._
implicit val akkaAskTimeout:Timeout = Timeout(5 seconds)
val awaitTimeout= 10 seconds
val res=Await.result(actor ? MessageXxx, awaitTimeout)
println(res)

akka programming general rules: (通常編碼規則,該建議來自官網)數組

  • messages with good names, rich semantic, domain specific
  • imuutable messages
  • put actor's associated messages in its companion object
  • a .props() : Props[?] method in actor companion object to construct the actor

.actorOf()建立Actor,返回ActorRef。
.actorSelect()查找actor,返ActorRef。網絡

akka-remote

配置鍵akka.remote.netty.tcp.hostname定義remoting模塊通訊的網口,akka.remote.netty.tcp.port定義通訊端口。通訊網口在未配置或配置爲空串時(不能配置爲null)默認監聽局域網網口(不是迴環網口127.0.0.1)。通訊端口在未配置默認監聽2552端口,在配置爲0時會監聽一個隨機端口。網口名與ActorSystem地址中的主機名嚴格對應,不能試圖以迴環地址鏈接本機上監聽局域網網口的actor。好比,本機上運行的單節點集羣,集羣即只有自身,其集羣種子節點配置爲akka.cluster.seed-nodes=["akka.tcp://xx@127.0.0.1:2551"],則其remoting通訊網口配置akka.remote.netty.tcp.hostname只能是127.0.0.1,不能是局域網網口。

……

actor的建立和部署不單只是在本地,還有可能涉及遠程部署(如集羣)。涉及遠程部署時牽涉到序列化問題。

若是能確信actor的建立只會涉及本地,則可經過配置關閉actor建立器的序列化行爲akka.actor.serialize-creators=off(默認關閉)。

akka-cluster

akka cluster是……集羣,做用……,特色……TODO

akka cluster由多個ActorSystem(節點)構成,actorsystem根據配置指定的種子節點組建集羣。

論及集羣時,所謂節點不必定指一臺物理機,通常指ActorSystem實例,不一樣的ActorSystem實例對應不一樣的<主機+端口>。一臺物理機可運行多個ActorSystem實例。論及集羣成員時,這裏的成員指的是節點,不是集羣內的actor。

集羣種子節點配置akka.cluster.seed-nodes中的節點沒必要都啓動,但節點列表中第一個必須啓動,不然其餘節點(不管是否在seed-nodes中)不能加入集羣。也就說,若是不啓動列表第一個節點,啓動多個其餘節點,不能組成集羣(也就不能產生/接收到集羣事件),直到啓動列表第一個節點纔將已啓動的多個節點組成集羣。種子列表第一個節點可組建一個只有本身的集羣。
任意節點都可加入集羣,並不是必定得是種子節點。節點加入的只能是集羣(不能是另外一個節點,即聯繫的其餘節點必須處於集羣中),也就說有個節點得先組建一個僅包含本身的集羣,以使得其餘節點有集羣可加入。
集羣內節點ActorSystem的名字要求一致,由組建過程可知,即種子列表第一個節點的名字。

若是集羣含超過2個節點,那麼列表第一個節點能夠不存活。若是種子節點全都同時不存活,那麼以相同配置再次啓動節點將組建不一樣於之前的新集羣,即不能進入之前的集羣。

經過接口Cluster.joinSeedNodes(List[Address])可動態添加種子節點,節點可經過接口cluster.join(Address)加入集羣。

配置項akka.cluster.seed-node-timeout指啓動中的節點試圖聯繫集羣(種子節點)的超時時間,若是超時,將在akka.cluster.retry-unsuccessful-join-after指定的時間後再次重試聯繫,默認無限次重試聯繫直到聯繫上,經過配置akka.cluster.shutdown-after-unsuccessful-join-seed-nodes指定一個超時時間使得本節點在聯繫不上集羣種子節點超過該時間後再也不繼續聯繫,終止聯繫後執行擴展程序CoordinatedShutdown以停掉本節點actor相關行爲(即關停ActorSystem),若是設置akka.coordinated-shutdown.terminate-actor-system = on(默認開啓)將致使擴展程序關停ActorSystem後退出JVM。

配置項akka.cluster.min-nr-of-members指定集羣要求的最小成員個數。

集羣成員狀態

集羣成員在加入集羣、存在於集羣、到退出集羣整個生命週期中的變化有對應狀態,狀態包括joining, up, leaving, exiting, remoed, down, unreachable。。TODO

集羣成員狀態轉移圖:
集羣成員狀態轉移圖
圖片來源:akka官網

圖中的框表示成員狀態,邊表示驅動狀態轉換的相關操做。"(leader action)"表示該操做由集羣首領驅使完成。「(fd*)」表示該操做由系統失敗檢測器(failure detector)驅使完成,失敗檢測器是一個監測集羣成員通訊狀態的後臺程序。

網絡中的節點隨時可能沒法通訊(通訊不可達),針對這個問題,集羣系統設有失敗檢測器(Failure Detector),發現成員異常不可達時將廣播UnreachableMember事件。須要在程序中顯式調用接口Cluster.down(Address)來改變成員狀態爲Down,集羣以後會廣播MemberDowned事件。節點正常退出時不會轉入unreachable狀態,而是進入leaving(事件MemberLeft)。

自動down:經過配置可以使成員自動從unreachable轉入down,經過配置akka.cluster.auto-down-unreachable-after開啓並指定成員自動從狀態unreachable轉入down的時長,官方提供此功能僅爲測試,並告知不要在生產環境使用。

集羣事件

集羣可訂閱的事件(cluster.subscribe(,Class[_]*)方法)要求事件類型實參是ClusterDomainEvent類型的(ClusterDomainEvent或其子類),ClusterDomainEvent是一個標記型接口。事件相關的類型定義在akka.cluster.ClusterEvent中,ClusterEvent只是object,沒有伴生對象。

伴隨集羣成員生命週期的事件:

  • MemberJoined - 新節點加入,其狀態被改成了 Joining。
  • MemberUp - 新節點成功加入,成爲了集羣成員,其狀態被改成了 Up。
  • MemberExited - 集羣成員正要離開集羣,其狀態被改成了 Exiting。注意當其餘節點收到此事時,事件主體節點可能已經關停。
  • MemberDowned - 集羣成員狀態爲down。
  • MemberRemoved - 節點已從集羣移除。
  • UnreachableMember - 成員被認爲不可達(失敗檢測器詢問完其餘全部成員均認爲該節點不可達)。
  • ReachableMember - 以前的不可達成員從新變得可達。事件的發生要求全部以前認爲該節點不可達的成員如今均可達該節點。

WeaklyUp事件:當部分節點不可達時gossip不收斂,沒有集羣首領(leader),首領行爲沒法實施,但此時咱們仍但願新節點可加入集羣,這時須要WeaklyUp狀態特性。特性使得,若是集羣不能達到gossip收斂,Joining成員將被提高爲WeaklyUp,成爲集羣的一部分,當gossip收斂,WeaklyUp成員轉爲Up,可經過配置akka.cluster.allow-weakly-up-members = off關閉這種特性。

CurrentClusterState:新節點加入集羣后,在其收到任何集羣成員事件前,集羣會向其推送一個CurrentClusterState消息(此class並不是ClusterDomainEvent子類,即不能成爲cluster.subscribe()中事件類型的參數),表示新成員加入時(訂閱集羣事件時)集羣中的成員狀態快照信息,特別地,MemberUp事件伴隨的集羣狀態快照可能沒有任何集羣成員,避免此狀況下CurrentClusterState事件被髮送的方法是在cluster.registerOnMemberUp(...)參數中提交集羣事件訂閱行爲。接收CurrentClusterState消息是基於訂閱時參數cluster.subscribe(,initialStateMode=InitialStateAsSnapshot,)的條件下,CurrentClusterState將成爲actor接收到的第一條集羣消息,可不接收快照,而是將現有成員的存在視爲相關事件,這時應使用訂閱參數(initialStateMode=InitialStateAsEvents)。

val cluster = Cluster(context.system)
cluster.registerOnMemberUp{
  cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
}

Terminated消息是被監控(.watch())節點在被下線(down)或移除(remove)後發送的消息,類繼承結構上不是集羣事件(ClusterDomainEvent)。

actor需訂閱事件才能接收到(cluster.subscribe(self,,Class[EvenetType]*)),接收到的消息包括訂閱時提供的事件類型的子類。


actor接收到的事件有時是基於本身看到的集羣狀態的,不是全部actor收到的事件都相同,尤爲注意,當節點本身停機時,會收到集羣內全部節點(包括本身以及其餘沒停機節點)的MemberRemoved事件,也就至關於在actor本身看來這個集羣停機了。而其餘存活的節點只會收到停機節點的MemberRemoved事件。有時本身退羣又只收到本身的MemeberLeft事件,並且沒有MemberRemoved事件,沒有其餘成員的離羣事件。(意外出現過,需重現以及分析緣由)


節點獲取自身地址的方式:

val cluster=Cluster(actorSystem)
cluster.selfAddress     // 節點自身地址,信息包括協議+system名+主機+端口
cluster.selfUniqueAdress    //節點自身惟一性地址,信息包括自身地址+自身UID

節點角色 node roles

集羣節點可標記若干自定義角色類型,經過配置項akka.cluster.roles指定。

每種角色羣有一個首領節點,以執行角色相關操做,通常無需感知。

配置項akka.cluster.role.<role-name>.min-nr-of-members定義集羣中角色爲 的節點的最小個數。

集羣內單例 Cluster Singleton:保證某種Actor類在集羣內或集羣內某種角色羣中只有一個實例。活躍的單例是集羣的成員,是成員便可被移出集羣,被移出的單例其類型在被管理器建立補充以前活躍單例前存在一小段實例缺失時間,這期間單例的集羣成員不可達狀態會被失敗檢測器檢測到。

單例模式須要注意的問題:

  • 性能瓶頸
  • 單點故障
  • 不能假設單例是不間斷可用的,由於在活躍單例失效後,新的活躍單例工做前的一小段時間內單例不可用。

單例工具依賴庫「com.typesafe.akka:akka-cluster-tools」。
建立單例依賴類ClusterSingletonManager,是一個actor,需在集羣內全部節點上(儘早)啓動,訪問單例依賴類ClusterSingletonProxy

給單例actor發送消息時,查找的ActorRef應是單例代理,而不是單例管理器。

集羣節點分身 Cluster Sharding:分發actor到多個節點,邏輯標識符是同一個,不用關心其實際位置。

分佈式訂閱發佈 Distributed Publish Subscribe:僅經過邏輯路徑實現集羣內actor間的訂閱發佈通訊、點對點通訊,沒必要關心物理位置。

集羣客戶端 Cluster Client:與集羣通訊的外部系統稱爲集羣的客戶端。

藉助akka工具包com.typesafe.akka:akka-cluster-tools,可實現集羣客戶端與集羣間的通訊。在集羣中的節點上調用ClusterClientReceptionist(actorSystem).registerService(actorRef)將actorRef實例
註冊爲負責鏈接外部系統通訊請求的接待員,在外部系統中利用 val c = system.actorOf(ClusterClient.props(ClusterClientSettings(system).withInitialContacts(<集羣接待員地址列表,可配到配置文件>)))得到能與集羣接待員通訊的ActorRef,再經過c ! ClusterClient.Send("/user/some-actor-path", msg)發送消息到集羣。

集羣客戶端配置集羣接待員經過指定akka.cluster.client.initial-contacts數組完成,一個接待員地址包含集羣成員地址(集羣中任意一個可通訊成員)和接待員在集羣成員ActorSystem中的路徑(系統生成的接待員的服務地址在/system/receptionist),如["akka.tcp://my-cluster@127.0.0.1:2552/system/receptionist"]

集羣節點協議不能是akka,需是akka.tcp,配置項akka.remote下需配置屬性netty.tcp,而不是artery(對應akka協議),不然集羣客戶端鏈接時會因Connection reset by peer而失敗(固然,集羣節點akka.actor.provider顯然仍是cluster)。

集羣節點路由 Cluster Aware Routers:容許路由器對集羣內的節點進行路由,自動管理路由對象表(routees),涉及相關成員的進羣和退羣行爲。

兩種路對象(routee)管理策略的路由類型,Group和Pool:

  • Goup 羣組 共用集羣成員節點做爲路由對象(routee)。
    router轉發策略有多種,對於消息一致性轉發的router來講轉的消息必須可一致性散列(ConsistentHashable),或者用ConsitentHashEnvolope包裝消息使其變得可一致散列,不然routee收不到消息,而定義routee的接收方法receive時,獲得的數據對象是拆包了的,也就說若是路由器轉發的消息通過一致性散列信封包裝,routee獲得的消息已被自動去掉信封。

  • Pool 池 每一個router各自自動建立、管理本身的的routees。
    池中的routee actor消亡不會引起路由器自動建立一個來替補,路由器將在全部routee actor消亡會隨即消亡,動態路由器,如使用了數量調整策略的路由器,會改變這種行爲,動態調整routee。

    akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node定義每一個節點上routee的個數上限(默認1個),因爲所有routee在節點啓動時即被啓動(而不是按需延遲啓動),所以該上限同時定義了節點上的routee個數。
    router在集羣中能夠有多個,同一邏輯路徑下也容許有多個router(即非單例router,就像actor)。單節點routee個數配置max-nr-of-instances-per-node是對每一個router而言,也就說,若是集羣router個數是m,其類型、配置相同,單節點routee個數爲n,集羣節點數爲c,集羣中該類型router管理的routee類型實例數爲m*n*c

集羣管理 Management:經過HTTP、JMX查看管理集羣。





集羣相關例子(包含集羣、集羣事件、集羣客戶端、集羣節點路由、集羣內單例、角色):

/*集羣有一個單例Master,有路由功能,管理多個WorkerActor,負責接收集羣外部消息以及轉發給worker。集羣外部系統(集羣客戶端)給集羣內master發送消息。
*/
//對應.conf配置文件內容在代碼後面
//↓↓↓↓↓↓↓↓↓↓集羣節點程序代碼↓↓↓↓↓↓↓↓↓(對應配置文件node.conf)
//程序入口、建立actor system、建立&啓動節點worker、建立
object ClusterNodeMain {
  def main(args: Array[String]): Unit = {
    val conf=ConfigFactory.load("node")
    val system=ActorSystem("mycluster", conf)
    val log = Logging.getLogger(system, this)
    val manager = system.actorOf(
      ClusterSingletonManager.props(Props[Master],
        PoisonPill,
        ClusterSingletonManagerSettings(system).withRole("compute")),
      "masterManager")  //這裏的名字要和配置中部署路徑中的保持一致
    log.info(s"created singleton manager, at path: {}", manager.path)
    val proxy = system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/masterManager",
      settings = ClusterSingletonProxySettings(system).withRole("compute")),
      name = "masterProxy")  //單例代理類,準備讓單例接收的消息都應發到代理
    ClusterClientReceptionist(system).registerService(proxy)
    log.info("created singleton proxy, at path: {}", proxy.path)
  }
}
class Master extends Actor with ActorLogging{
  //下述FromConf.xxx從配置文件中讀取部署配置,本Master actor咱們以單例模式建立,其路徑爲/user/masterManager/singleton,所以部署的配置鍵爲"/masterManager/singleton/workerRouter"(配置鍵需省掉/user)
  private val router = context.actorOf(FromConfig.props(Props[WorkerActor]), name = "workerRouter")
  override def receive: Receive = {
    case x =>
      log.info(s"got message on master, msg: $x, actor me: ${self.path}")
      router forward ConsistentHashableEnvelope(x, x) //轉發給路由器,路由器將選擇一個worker讓其接收消息
  }
}
class WorkerActor extends Actor with ActorLogging {
  private val cluster = Cluster(context.system)
  override def preStart(): Unit = {
//    actor成員需向集羣訂閱事件才能接收到集羣事件消息
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[ReachabilityEvent])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)
  def receive:Receive = {
    case MemberWeaklyUp(member)=>
      log.info(s"Member is WeaklyUp: $member, actor me: $self")
    case MemberUp(member) =>  //集羣成員狀態剛設爲了Up
      log.info(s"Member is Up: $member, actor me: $self")
    case MemberJoined(member)=>  //集羣成員狀態剛設爲了Joining
      log.info(s"Member is Joined: $member, actor me: $self")
    case MemberLeft(member)=>  // 狀態剛設爲了leaving
      log.info(s"Member is Left: $member, actor me: $self")
    case MemberExited(member)=>  // 成員本身正常退出
      log.info(s"Member is Exited: $member, actor me: $self")
    // 狀態成了Down,down狀態通常由unreachable狀態以後轉移過來,
    // 由編程者本身顯式設置cluster.down(member)
    // (auto-down特性可自動轉移unreachable成員到down,官方不建議在生成環境中啓用)
    case MemberDowned(member)=>
      log.info(s"Member is Downed: $member, actor me: $self")
    case MemberRemoved(member, previousStatus) => //成員被移出集羣
      log.info(s"Member is Removed: $member after $previousStatus, actor me: $self")
    case UnreachableMember(member) => //failure-detector檢測器發現了一個通訊不可達的成員
      log.info(s"Member detected as unreachable: $member, actor me: $self")
      cluster.down(member.address)
//      context.actorSelection(RootActorPath(member.address) / "user" / "otherActor") ! SomeMessage
    //共7種MemberEvent
    case ReachableMember(member) => //不可達成員從新變得可達
      log.info(s"Member is reachable again: $member, actor me: $self")

    case n:Int =>
      log.info(s"got an int: $n, actor me: ${self.path}")
    case s:String =>
      log.info(s"sender said: $s, actor me: ${self.path}")
    case any=>
      log.info(s"what is it? :$any, actor me: ${self.path}")
  }
}
//↑↑↑↑↑↑↑↑↑↑↑集羣節點程序代碼↑↑↑↑↑↑↑↑↑↑在將部署的節點機器上運行(注意修改對應hostname&port配置)

//↓↓↓↓↓↓↓↓↓↓集羣客戶端點程序代碼(另外一個項目)↓↓↓↓↓↓↓↓↓↓(對應配置文件client.conf)
object ClusterClientMain {
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load("client")
    val system = ActorSystem("myclient", conf) //actor system名字隨意,和集羣名無關
    val clusterClient = system.actorOf(ClusterClient.props(ClusterClientSettings(system)), "clusterClient")

    // 向集羣某個路徑下的actor發送消息,對應路徑的actor需在集羣端提早向集羣接待員註冊好服務
    clusterClient ! ClusterClient.Send("/user/masterProxy", 100, localAffinity = true)
    // 注意,不是簡單的:   clusterClient ! 100
    clusterClient ! ClusterClient.Send("/user/masterProxy", 1, localAffinity = true)
    clusterClient ! ClusterClient.Send("/user/masterProxy", "Hi", localAffinity = true)

    println("main done")
  }
}

節點程序和客戶端程序使用的配置文件分別以下:

//↓↓↓↓↓↓↓↓↓↓  node.conf
akka {
  actor {
    provider = "cluster"  //有3種provider:local, remote, cluster,集羣成員actor用cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {   //使用artery便可組建集羣系統(對應協議akka://),但只有使用netty.tcp(對應協議akka.tcp://)才能和集羣外部通訊
      hostname = "127.0.0.1"  //節點主機
      port = 2551             //節點actor system的端口。主機和端口根據部部署的機器及想要對外開放的端口而變
    }
  }
  cluster {
    seed-nodes = [  //種子節點必須啓動第一個節點,其餘種子節點不要求必定啓動,加入集羣的節點不限於必須在種子列表中(但必須設置正確的種子節點以便能鏈接進集羣)
    //節點部署徹底沒必要在同個主機
      "akka.tcp://mycluster@127.0.0.1:2551",
    // "akka.tcp://mycluster@127.0.0.1:2552"
    ]
    roles=["compute"]  //爲集羣中這一類actor打上一種自定義標籤
    // 官方不建議生產環境中啓用auto-down特性。
    //auto downing is NOT safe for production deployments.
    // you may want to use it during development, read more about it in the docs.
    // auto-down-unreachable-after = 10s
  }
}

akka.extensions=[
  "akka.cluster.client.ClusterClientReceptionist"  //集羣客戶端接待員擴展
]

akka.actor.deployment {  //想要部署的actor的路徑做爲配置鍵
  "/masterManager/singleton/workerRouter" {  //路由器actor路徑做爲配置鍵,配置路由器相關屬性
    router = consistent-hashing-pool
    cluster {
      enabled = on
      max-nr-of-instances-per-node= 2
      allow-local-routees = on
      use-role = "compute"
    }
  }
}

//↓↓↓↓↓↓↓↓↓↓  client.conf
akka {
  actor {
    provider = remote // 咱們的客戶端是獨自成普通actor,沒有建新集羣,設爲remote以提供對外遠程通訊
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      //      hostname = ""  // 集羣客戶端部署的主機地址(默認本機局域網地址)
      //      port = 0    //集羣客戶端actor system的端口,能夠不配置(akka系統自動分配端口)
    }
  }
}
akka.cluster.client {
    initial-contacts = [  //接待員地址。 集羣成員中建立了集羣客戶端接待員的任意節點,地址中actor system名字是集羣名,後面的/system/receptionist是固定的(系統自動建立的接待員)
    "akka.tcp://mycluster@127.0.0.1:2551/system/receptionist", #不要求非得是第一個成員或接待員註冊的ActorRef服務所在的節點
    //    "akka.tcp://mycluster@127.0.0.1:2552/system/receptionist"
  ]
}

akka extension

An extension is a singleton instance created per actor system.

集成spring/spring-boot

被spring容器管理的Actor類上必須標註@Scope("prototype")。(@Scope("prototype")表示spring容器中每次須要時生成一個實例,是否影響集羣內單例? <== 不影響)

cluster中涉及actor遠程部署,可能會因SpringApplicationContext不能序列化而失敗,可將其設爲靜態變量,在spring啓動後手動初始化,以提供spring管理器。

示例代碼:TODO


常見問題

  1. 運行時程序異常終止,提示akka.version屬性沒有配置問題
    異發生在初始化ActorSystem過程當中,初始化時會經過加載reference.conf配置文件讀取該屬性,在運行時沒能讀取到該屬性致使異常。akka-actor jar中有reference.conf、version.conf文件,reference.conf在文件首經過「incluce version"語法導入version.conf,version.conf中定義了akka.version屬性,ActorSystem本可讀到,但因爲akka系列其餘jar包中也有reference.conf,內容不一樣akka-actor中的,也沒有導入version.conf,而打包時akka-actor的reference.conf(有可能)被其餘jar包中的reference.conf覆蓋,從而致使reference.conf中沒有akka.version屬性,進而致使程序異終止。

    解決方案:定義打包時資源文件處理行爲,不覆蓋reference.conf,合併全部jar包中reference.conf文件內容到一個文件。以下定義pom.xml中shade插件的行爲:

    <plugin>
     <artifactId>maven-shade-plugin</artifactId>
     <version>3.2.0</version>
     <executions>
         <execution>
             <phase>package</phase>
             <goals><goal>shade</goal></goals>
             <configuration>
                 <transformers>
                     <!--合併資源文件,而不是默認的覆蓋-->
                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                         <resource>reference.conf</resource>
                     </transformer>
  2. Spring集成akka-cluster,部署多個節點時主節點打印序列化相關ERROR日誌。
ERROR   akka.remote.EndpointWriter               : Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. Transient association error (association remains live)

akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
    at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:62) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.EndpointWriter.$anonfun$serializeMessage$1(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) ~[scala-library-2.12.8.jar!/:na]
    at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:768) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.actor.ActorCell.invoke(ActorCell.scala:557) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
Caused by: java.io.NotSerializableException: No configured serialization-bindings for class [org.springframework.context.annotation.AnnotationConfigApplicationContext]
    at akka.serialization.Serialization.serializerFor(Serialization.scala:320) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.serialization.Serialization.findSerializerFor(Serialization.scala:295) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.serialization.DaemonMsgCreateSerializer.serialize(DaemonMsgCreateSerializer.scala:184) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.serialization.DaemonMsgCreateSerializer.$anonfun$toBinary$1(DaemonMsgCreateSerializer.scala:76) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at scala.collection.immutable.List.foreach(List.scala:392) ~[scala-library-2.12.8.jar!/:na]
    at akka.remote.serialization.DaemonMsgCreateSerializer.propsProto$1(DaemonMsgCreateSerializer.scala:75) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.serialization.DaemonMsgCreateSerializer.toBinary(DaemonMsgCreateSerializer.scala:86) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
    at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:52) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]

集成spring方式參考https://www.baeldung.com/akka-with-spring,序列化ERROR日誌問題參考https://github.com/akka/akka/issues/15938,其中「patriknw」提出設置akka.actor.serialize-creators=off並設置相關.props(...).withDeploy(Deploy.local),本例在類SpringExt中定義方法def props(beanClass: Class[_ <: Actor]): Props = Props.create(classOf[SpringActorProducer], beanClass)//.withDeploy(Deploy.local),也失敗了,仍然報一樣ERROR。

用靜態變量保存ApplicationContext的方式可暫時解決該問題(由於壓根兒不涉及到ApplicationContext的序列化了)。


(以上知識基於版本akka 2.5.19)

相關文章
相關標籤/搜索