一個設計優秀的工具或框架,應該都有一個易用、強大的插件或擴展體系,akka也不例外。html
akka的擴展方法很是簡單,由於只涉及到兩個組件:Extension、 ExtensionId。其中Extension在每一個ActorSystem中只會加載一次,而後被akka管理。你能夠在ActorSystem啓動的時候以編程的方式加載,也能夠經過配置的方式自動加載。因爲Extension是在ActorSystem層面的擴展,因此須要開發者本身處理線程安全的問題。ExtensionId能夠理解爲Extension的一個惟一標誌,ActorSystem會根據它來判斷Extension是否被加載過,以確保Extension只能加載一次。java
/** * The basic ActorSystem covers all that is needed for locally running actors, * using futures and so on. In addition, more features can hook into it and * thus become visible to actors et al by registering themselves as extensions. * This is accomplished by providing an extension—which is an object * implementing this trait—to `ActorSystem.registerExtension(...)` or by * specifying the corresponding option in the configuration passed to * ActorSystem, which will then instantiate (without arguments) each FQCN and * register the result. * * The extension itself can be created in any way desired and has full access * to the ActorSystem implementation. * * This trait is only a marker interface to signify an Akka Extension. */ trait Extension
上面是Extension的定義,能夠看出它很是簡單,簡單到就是一個trait,沒有任何字段和方法。也就是說咱們實現的對akka的擴展能夠是任意形式的類,並且會被保證加載一次,那麼是如何保證只會加載一次的呢?ExtensionId也許能夠回答這個問題。編程
/** * Identifies an Extension * Lookup of Extensions is done by object identity, so the Id must be the same wherever it's used, * otherwise you'll get the same extension loaded multiple times. */ trait ExtensionId[T <: Extension] { /** * Returns an instance of the extension identified by this ExtensionId instance. */ def apply(system: ActorSystem): T = { java.util.Objects.requireNonNull(system, "system must not be null!").registerExtension(this) } /** * Returns an instance of the extension identified by this ExtensionId instance. * Java API * For extensions written in Scala that are to be used from Java also, * this method should be overridden to get correct return type. * {{{ * override def get(system: ActorSystem): TheExtension = super.get(system) * }}} * */ def get(system: ActorSystem): T = apply(system) /** * Is used by Akka to instantiate the Extension identified by this ExtensionId, * internal use only. */ def createExtension(system: ExtendedActorSystem): T override final def hashCode: Int = System.identityHashCode(this) override final def equals(other: Any): Boolean = this eq other.asInstanceOf[AnyRef] }
ExtensionId也很簡單,首先這是一個trait,且有一個類型變量T,要求T是Extension的子類。其中有一個apply,經過system返回一個T的實例。createExtension沒有實現。那須要繼續深刻registerExtension的代碼。安全
/** * Registers the provided extension and creates its payload, if this extension isn't already registered * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization * of the payload, if is in the process of registration from another Thread of execution */ def registerExtension[T <: Extension](ext: ExtensionId[T]): T
經過registerExtension的定義來看,官方註釋寫的也很清楚,它就是在註冊一個extension,而且建立一個實例。若是這個extension已經註冊過,就再也不註冊。app
@tailrec final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = { findExtension(ext) match { case null ⇒ //Doesn't already exist, commence registration val inProcessOfRegistration = new CountDownLatch(1) extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process case null ⇒ try { // Signal was successfully sent ext.createExtension(this) match { // Create and initialize the extension case null ⇒ throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]") case instance ⇒ extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension instance //Profit! } } catch { case t: Throwable ⇒ extensions.replace(ext, inProcessOfRegistration, t) //In case shit hits the fan, remove the inProcess signal throw t //Escalate to caller } finally { inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal } case other ⇒ registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry } case existing ⇒ existing.asInstanceOf[T] } }
咱們來看看registerExtension的具體實現,它首先經過findExtension查找對應的ExtensionId是否已經註冊,若是已經註冊,則直接返回找到的結果,不然就進行建立。在case null分支中,有一個CountDownLatch。咱們有必要簡要介紹一下這個類的做用和使用方法。框架
「CountDownLatch典型用法1:某一線程在開始運行前等待n個線程執行完畢。將CountDownLatch的計數器初始化爲n new CountDownLatch(n)
,每當一個任務線程執行完畢,就將計數器減1 countdownlatch.countDown()
,當計數器的值變爲0時,在CountDownLatch上 await()
的線程就會被喚醒」ide
也就是說registerExtension是會保證線程安全的,以保證Extension只被加載一次。extensions會經過putIfAbsent方法插入ExtensionId與inProcessOfRegistration的鍵值對,固然了extensions是一個ConcurrentHashMap。若是key不存在,即第一次註冊的時候,則把鍵值對插入並返回null。因此第一次註冊會命中case null,而後把當前ActorSystem傳給createExtension方法建立Extension實例。若是建立成功,就會替換extensions中ExtensionId對應的value爲新建立的Extension實例(替換以前是inProcessOfRegistration這個CountDownLatch),最後執行countDown,計數器變成0。若是建立失敗呢?會拋出一個IllegalStateException異常或其餘異常,收到異常後,會把ExtensionId對應的value變成對應的Throwable信息。那麼若是putIfAbsent插入失敗呢,也就是ExtensionId已經有對應的value了,會遞歸執行registerExtension從新註冊,既然有值了爲啥還要從新註冊呢?由於對應的值有三種狀況:Extension實例、Throwable、CountDownLatch。因此須要從新註冊。工具
另外CountDownLatch必定會有await,那麼啥時候await呢。別急,還有findExtension沒有分析呢。ui
/** * Returns any extension registered to the specified Extension or returns null if not registered */ @tailrec private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { case c: CountDownLatch ⇒ c.await(); findExtension(ext) //Registration in process, await completion and retry case t: Throwable ⇒ throw t //Initialization failed, throw same again case other ⇒ other.asInstanceOf[T] //could be a T or null, in which case we return the null as T }
很顯然,findExtension會對查詢到的結果進行判斷,若是是CountDownLatch就調用await進行等待,等待其餘線程的registerExtension執行完畢,而後遞歸調用findExtension;若是其餘線程註冊完了返回異常,則此處也簡單的拋出異常;若是返回其餘類型的數據,則把它轉化成T的一個實例,也就是咱們自定義的Extension,那若是返回null呢?那就返回null嘍。this
至此registerExtension分析完畢,它以線程安全的方式保證Extension被加載一次,也就是createExtension方法只被調用一次。那麼如何根據ActorSystem建立咱們自定義的Extension就很是靈活了。
咱們來看一下官網的例子。
class CountExtensionImpl extends Extension { //Since this Extension is a shared instance // per ActorSystem we need to be threadsafe private val counter = new AtomicLong(0) //This is the operation this Extension provides def increment() = counter.incrementAndGet() }
上面是咱們自定義的一個Extension,它很是簡單,就是一個計數器,且increment()保證線程安全。
object CountExtension extends ExtensionId[CountExtensionImpl] with ExtensionIdProvider { //The lookup method is required by ExtensionIdProvider, // so we return ourselves here, this allows us // to configure our extension to be loaded when // the ActorSystem starts up override def lookup = CountExtension //This method will be called by Akka // to instantiate our Extension override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl /** * Java API: retrieve the Count extension for the given system. */ override def get(system: ActorSystem): CountExtensionImpl = super.get(system) }
上面是一個ExtensionId,還繼承了ExtensionIdProvider,ExtensionIdProvider源碼以下,其實就是用來查找ExtensionId的,這樣就可以經過配置文件自動加載了。
/** * To be able to load an ExtensionId from the configuration, * a class that implements ExtensionIdProvider must be specified. * The lookup method should return the canonical reference to the extension. */ trait ExtensionIdProvider { /** * Returns the canonical ExtensionId for this Extension */ def lookup(): ExtensionId[_ <: Extension] }
能夠看出createExtension就是new了一個CountExtensionImpl,沒有把ExtendedActorSystem傳給CountExtensionImpl。其實在稍微複雜點的Extension裏面是能夠接收ExtendedActorSystem參數的,有了對ExtendedActorSystem的引用,咱們就能夠調用ExtendedActorSystem的全部公開的方法。若是你要問我ExtendedActorSystem都有哪些公開的方法或者說,有了ExtendedActorSystem能夠作什麼,我是拒絕回答的。有了ExtendedActorSystem你還不是想幹啥就幹啥?哈哈。