經過前幾篇的學習,相信你們對Akka應該有所瞭解了,都說解決併發哪家強,JVM上面找Akka,那麼Akka到底在解決併發問題上幫咱們作了什麼呢?html
衆所周知,在處理併發問題上面,最核心的一部分就是如何處理共享內存,不少時候咱們都須要花費不少時間和精力在共享內存上,那麼在學習Akka對共享內存是如何管理以前,咱們先來看看Java中是怎麼處理這個問題的。java
相信對Java併發有所瞭解的同窗都應該知道在Java5推出JSR 133後,Java對內存管理有了更高標準的規範了,這使咱們開發併發程序也有更好的標準了,不會有一些模糊的定義致使的沒法肯定的錯誤。緩存
首先來看看一下Java內存模型的簡單構圖:安全
從圖中咱們能夠看到咱們線程都有本身的一個工做內存,這就比如高速緩存,它是對主內存部分數據的拷貝,線程對本身工做內存的操做速度遠遠快於對主內存的操做,但這也每每會引發共享變量不一致的問題,好比如下一個場景:多線程
int a = 0;
public void setA() {
a = a + 1;
}複製代碼
上面是一個很簡單的例子,a是一個全局變量,而後咱們有一個方法去修改這個值,每次增長一,假如咱們用100個線程去運行這段代碼,那a最終的結果會是多少呢?
100?顯然不必定,它多是80,90,或者其餘數,這就形成共享變量不一致的問題,那麼爲何會致使這個問題呢,就是咱們上面所說的,線程去修改a的時候可能就只是修改了本身工做內存中a的副本,但並無將a的值及時的刷新到主內存中,這便會致使其餘線程可能讀到未被修改a的值,最終出現變量不一致問題。併發
那麼Java中是怎麼處理這種問題,如何保證共享變量的一致性的呢?框架
大致上Java中有3類同步機制,但它們所解決的問題並不相同,咱們先來看一看這三種機制:ide
寫過Java程序的同窗對這個關鍵詞應該再熟悉不過了,其基本含義就是不可變,不可變變量,好比:學習
final int a = 10;
final String b = "hello";複製代碼
不可變的含義在於當你對這些變量或者對象賦初值後,不能再從新去賦值,但對於對象來講,咱們不能修改的是它的引用,可是對象內的內容仍是能夠修改的。下面是一個簡單的例子:優化
final User u = new User(1,"a");
u.id = 2; //能夠修改
u = new User(2,"b"); //不可修改複製代碼
因此在利用final關鍵詞用來保證共享變量的一致性時必定要了解清楚本身的需求,選擇合適的方法,另外final變量必須在定義或者構建對象的時候進行初始化,否則會報錯。
不少同窗在遇到共享變量不一致的問題後,都會說我在聲明變量前加一個volatile就行了,但事實真是這樣嘛?答案顯然不是。那咱們來看看volatile到底爲咱們作了什麼。
前面咱們說過每一個線程都有本身的工做內存,不少時候線程去修改一個變量的值只是修改了本身工做內存中副本的值,這便會致使主內存的值並非最新的,其餘線程讀取到的變量便會出現問題。volatile幫咱們解決了這個問題,它有兩個特色:
舉個例子:
volatile int a = 0;
public void setA() {
a = a + 1;
}複製代碼
如今線程在執行這段代碼時,都會強制去主內存中讀取變量的值,修改後也會立刻更新到主內存中去,可是這真的能解決共享變量不一致的問題嘛,其實否則,好比咱們有這麼一個場景:兩個線程同時讀取了主內存中變量最新的值,這是咱們兩個線程都去執行修改操做,最後結果會是什麼呢?這裏就留給你們本身去思考了,其實也很簡單的。
那麼volatile在什麼場景下能保證線程安全,按照官方來講,有如下兩個條件:
多的方面這裏我就不展開了,推薦兩篇我以爲寫的還不錯的文章:volatile的使用及其原理volatile的適用場景
不少同窗在學習Java併發過程當中最早接觸的就是synchronized關鍵詞了,它確實能解決咱們上述的併發問題,那它到時如何幫咱們保證共享變量的一致性的呢?
簡而言之的說,線程在訪問請求用synchronized關鍵詞修飾的方法,代碼塊都會要求得到一個監視器鎖,當線程得到了監視器鎖後,它纔有權限去執行相應的方法或代碼塊,並在執行結束後釋放監視器鎖,這便能保證共享內存的一致性了,由於本文主要是講Akka的共享內存,過多的篇幅就不展開了,這裏推薦一篇解析synchronized原理很不錯的文章,有興趣的同窗能夠去看看:Synchronized及其實現原理
Akka中的共享內存是基於Actor模型的,Actor模型提倡的是:經過通信來實現共享內存,而不是用共享內存來實現通信,這點是跟Java解決共享內存最大的區別,舉個例子:
在Java中咱們要去操做共享內存中數據時,每一個線程都須要不斷的獲取共享內存的監視器鎖,而後將操做後的數據暴露給其餘線程訪問使用,用共享內存來實現各個線程之間的通信,而在Akka中咱們能夠將共享可變的變量做爲一個Actor內部的狀態,利用Actor模型自己串行處理消息的機制來保證變量的一致性。
固然要使用Akka中的機制也必須知足一下兩條原則:
第二個原則很好理解,就是上面咱們說的Actor內部是串行處理消息,那咱們來看看第一個原則,爲何要保證消息的發送先於消息的接收,是爲了防止咱們在建立消息的時候發生了不肯定的錯誤,接收者將可能接收到不正確的消息,致使發生奇怪的異常,主要表現爲消息對象未初始化完整時,若沒有這條規則保證,Actor收到的消息便會不完整。
經過前面的學習咱們知道Actor是一種比線程更輕量級,抽象程度更高的一種結構,它幫咱們規避了咱們本身去操做線程,那麼Akka底層究竟是怎麼幫咱們去保證共享內存的一致性的呢?
一個Actor它可能會有不少線程同時向它發送消息,以前咱們也說到Actor自己是串行處理的消息的,那它是如何保障這種機制的呢?
Mailbox在Actor模型是一個很重要的概念,咱們都知道向一個Actor發送的消息首先都會被存儲到它所對應的Mailbox中,那麼咱們先來看看MailBox的定義結構(本文所引用的代碼都在akka.dispatch.Mailbox.scala中,有興趣的同窗也能夠去研究一下):
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}複製代碼
很清晰Mailbox內部維護了一個messageQueue這樣的消息隊列,並繼承了Scala自身定義的ForkJoinTask任務執行類和咱們很熟悉的Runnable接口,由此能夠看出,Mailbox底層仍是利用Java中的線程進行處理的。那麼咱們先來看看它的run方法:
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}複製代碼
爲了配合理解,咱們這裏先來看一下定義:
@inline
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def isClosed: Boolean = currentStatus == Closed複製代碼
這裏咱們能夠看出Mailbox自己會維護一個狀態Mailbox.Status,是一個Int變量,並且是可變的,而且用到volatile來保證了它的可見性:
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default複製代碼
如今咱們在回去看上面的代碼,run方法的執行過程,首先它會去讀取MailBox此時的狀態,由於是一個Volatile read,因此能保證讀取到的是最新的值,而後它會先處理任何的系統消息,這部分不須要咱們太過關心,以後即是執行咱們發送的消息,這裏咱們須要詳細看一下processMailbox()的實現:
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue() //去出下一條消息
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs) //遞歸處理下一條消息
}
}複製代碼
從上述代碼中咱們能夠清晰的看到,當知足消息處理的狀況下就會進行消息處理,從消息隊列列取出下一條消息就是上面的dequeue()
,而後將消息發給具體的Actor進行處理,接下去又是處理系統消息,而後判斷是否還有知足狀況須要下一條消息,如有則再次進行處理,能夠當作一個遞歸操做,@tailrec
也說明了這一點,它表示的是讓編譯器進行尾遞歸優化。
如今咱們來看一下一條消息從發送到最終處理在Akka中究竟是怎麼執行的,下面的內容是我經過閱讀Akka源碼加自身理解得出的,這裏先畫了一張流程圖:
消息的大體流程我都在圖中給出,還有一些細節,必須序列化消息,獲取狀態等就沒有具體說明了,有興趣的同窗能夠本身去閱讀如下Akka的源碼,我的以爲Akka的源碼閱讀性仍是很好的,好比:
固然也有一些困擾,咱們在不瞭解各個類,接口之間的關係時,閱讀體驗就會變得很糟糕,固然我用IDEA很快就解決了這個問題。
咱們這裏來看看關鍵的部分:Actor是如何保證串行處理消息的?
上圖中有一根斷定,是否已有線程在執行任務?咱們來看看這個斷定的具體邏輯:
@tailrec
final def setAsScheduled(): Boolean = { //是否有線程正在調度執行該MailBox的任務
val s = currentStatus
/* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. */
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}複製代碼
從註釋和代碼的邏輯上咱們能夠看出當已有線程在執行返回false,若沒有則去更改狀態爲以調度,直到被其餘線程搶佔或者更改爲功,其中updateStatus()是線程安全的,咱們能夠看一下它的實現,是一個CAS操做:
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)複製代碼
到這裏咱們應該能夠大體清楚Actor內部是如何保證共享內存的一致性了,Actor接收消息是多線程的,但處理消息是單線程的,利用MailBox中的Status來保障這一機制。
經過上面的內容咱們能夠總結出如下幾點: