Java
內存模型Java
內存模型的基礎在Java
中,全部實例域、靜態域和數組元素都存儲在堆內存中,堆內存在線程之間共享;局部變量、方法定義參數和異常處理器參數不會在線程之間共享。 從抽象角度來看,JMM
定義了線程和主內存之間的抽象關係:線程之間的共享變量存儲在主內存中,每一個線程都有一個私有的本地內存,本地內存涵蓋了緩存、寫緩衝區、寄存器以及其它的硬件和編譯器優化。 JMM
經過控制主內存與每一個線程的本地內存之間的交互,來爲Java
程序員提供內存可見性保證。node
指編譯器和處理器爲了優化程序性能而對指令序列進行從新排序的一種手段:程序員
JMM
的編譯器從新排序規則會禁止特定類型的編譯器重排序,對於處理器重排序,JMM
的處理器重排序規則會要求Java
編譯器在生成指令時,插入特定類型的內存屏障。 現代的處理器使用寫緩衝區臨時保存向內存寫入的數據,但每一個處理器上的寫緩衝區,僅僅對它所在的處理器可見。 因爲寫緩衝區僅對本身的處理器可見,它會致使處理器執行內存操做的順序可能會與內存實際的操做執行順序不一致,因爲現代的處理器都會使用寫緩衝區,所以現代的處理器都會容許對寫-讀操做進行重排序,但不容許對存在數據依賴的操做作重排序。算法
happens-before
簡介用來闡述操做之間的內存可見性,若是一個操做執行的結果須要對另外一個操做可見,那麼這兩個操做必需要存在happens-before
關係,這兩個操做既能夠在一個線程以內,也能夠在不一樣線程之間,但並不等於前一個操做必需要在後一個操做以前執行。編程
編譯器和處理器不會改變存在數據依賴關係的兩個操做的執行順序,可是僅針對單個處理器中執行的指令序列和單個線程中執行的操做。數組
as-if-serial
不管怎麼重排序,單線程程序的執行結果不能改變。緩存
在單線程中,對存在控制依賴的操做重排序,不會改變執行結果;但在多線程程序中,對存在控制依賴的操做重排序,可能會改變程序的執行結果。安全
順序一致性是一個理論參考模型,在設計的時候,處理器的內存模型和編程語言的內存模型都會以順序一致性內存做爲參照。 若是程序是正確同步的,程序的執行將具備順序一致性:即程序的執行結果與該程序在順序一致性內存模型中的執行結果相同。 若是程序是正確同步的,程序的執行將具備順序一致性:即程序的執行結果與該程序在順序一致性內存模型中的執行結果相同。 順序一致模型有兩大特性:bash
對於未同步或未正確同步的多線程程序,JMM
只提供最小安全性:線程執行時讀取到的值,要麼是以前某個線程寫入的值,要麼是默認值。 JMM
不保證未同步程序的執行結果與該程序在順序一致性模型中的執行結果一致。 未同步程序在兩個模型中的執行特徵有以下差別:數據結構
JMM
不保證單線程內的操做會按程序的順序執行。JMM
不保證全部線程能看到一致的操做執行順序。JMM
不保證對64位的long/double
型變量的寫操做具備原子性,而順序一致性模型保證對全部內存讀/寫操做都具備原子性。Java
併發編程基礎I/O
操做)的線程須要設置較高優先級,而偏重計算(須要較多CPU
時間或者偏運算)的線程則設置較低的優先級,確保處理器不會被獨佔。New
:初始狀態,線程被建立,可是沒有調用start()
方法。Runnable
:運行狀態,Java
線程將操做系統中的就緒和運行兩種狀態統稱爲「運行中」。Blocked
:阻塞狀態,表示線程阻塞於鎖。Waiting
:等待狀態,表示線程進入等待狀態,進入該狀態表示當前線程須要等待其它線程作出一些指定動做(通知或中斷)。Time_Waiting
:超時等待狀態,能夠在指定的時間自行返回。Terminated
:終止狀態,表示當前線程已經執行完畢。interrupt()
方法對其進行中斷操做。isInterrupt
來進行判斷是否被中斷,也能夠調用靜態方法Thread.interrupt
對當前線程的中斷標識位進行復位,若是該線程已經處於終止狀態,即便該線程被中斷過,在調用該線程對象的isInterrupt
時依舊返回false
。InterruptedException
異常以前,Java
虛擬機會先將該線程的中斷標識位清除。boolean
變量來控制是否須要中止任務並終止該線程。Java
支持多個線程同時訪問一個對象或者對象的成員變量,因爲每一個線程能夠擁有這個變量的拷貝,因此在程序的執行過程當中,一個線程看到的變量並不必定是最新的。volatile
能夠用來修飾字段,就是告知程序任何對該變量的訪問須要從共享內存中獲取,而對它的改變必須同步刷新回共享內存,它能保證全部線程對變量訪問的可見性。synchronized
能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。Object
(Object
由synchronized
保護)的訪問,首先要得到Object
的監視器,若是獲取失敗,線程進入同步隊列,線程狀態變爲Blocked
,當訪問Object
的前驅(得到了鎖的線程)釋放了鎖,則該釋放操做喚醒阻塞在同步隊列中的線程,使其從新嘗試對監視器的獲取。notify()
:通知一個在對象上等待的線程,使其從wait()
方法返回,而返回的前提是該線程獲取到了對象上的鎖。notifyAll()
:通知全部等待在該對象上的鎖。wait()
:調用該方法的線程進入Waiting
狀態,只有等待另外線程的通知或被中斷纔會返回,調用wait()
方法後,會釋放對象的鎖。wait(long)
:超時等待一段時間,若是沒有通知就返回。wait(long, int)
:對於超時時間更精細粒度的控制,能夠達到納秒。wait
和notify/notifyAll()
的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。wait()
方法,被通知後仍要檢查條件。 (3) 條件知足則執行對應的邏輯。synchronized(對象) {
while(條件不知足) {
對象.wait();
}
對應的處理邏輯;
}
複製代碼
synchronized(對象) {
改變條件;
對象.notifyAll();
}
複製代碼
PipedOutputStream、PipeInputStream、PipedReader、PipedWriter
,前兩種面向字節,後兩種面向字符。A
執行了Thread.join()
,其含義是:當前線程A
等待Thread
線程終止後,才從Thread.join
返回,線程Thread
除了提供join()
方法外,還提供了join(long millis)
和join(long millis, int nanos)
兩個具有超時特性的方法,若是在給定的超時時間內沒有終止,那麼將會從超時方法中返回。ThreadLocal
,即線程變量,是一個以ThreadLocal
對象爲鍵、任意對象爲值的存儲結構,這個結構被附帶在線程上,也就是說一個線程能夠根據一個ThreadLocal
對象查詢到綁定在這個線程上的一個值,能夠經過set(T)
方法來設置一個值,在當前線程下再經過get()
方法獲取到原先設置的值。Java
中的鎖Lock
接口鎖是用來控制多個線程訪問共享資源的方式,雖然它缺乏了隱式獲取釋放鎖的便捷性,可是卻擁有了鎖獲取與釋放的可操做性、可中斷地獲取鎖以及超時獲取鎖等多種synchronized
關鍵字不具有的同步特性。多線程
在finally
塊中釋放鎖,目的是保證在獲取到鎖以後,最終可以被釋放。
Lock
接口提供的synchronized
關鍵字不具有的主要特性
嘗試非阻塞地獲取鎖:當前線程嘗試獲取鎖,若是這一時刻沒有被其它線程獲取到,則成功獲取並持有鎖。
能被中斷地獲取鎖:與synchronized
不一樣,獲取到鎖的線程可以響應中斷,當獲取到鎖的線程被中斷時,中斷異常將會被拋出,同時鎖會被釋放。
在指定的截止時間以前獲取鎖:若是截止時間到了仍舊沒法獲取鎖,則返回。
Lock
的API
void lock()
:獲取鎖,調用該方法當前線程將會獲取鎖,當鎖得到後,從該方法返回。
void lockInterruptibly()
:可中斷地獲取鎖,該方法會響應中斷,即在鎖的獲取中能夠中斷當前線程。
boolean tryLock()
:嘗試非阻塞地獲取鎖,調用該方法後馬上返回,若是可以獲取則返回true
,不然返回false
。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException
:當前線程在超時時間內得到了鎖;當前線程在超時時間內被中斷;超時時間結束,返回false
。
void unlock()
:釋放鎖。
Condition newCondition()
:獲取等待/通知組件,該組件和當前的鎖綁定,當前線程只有得到了鎖,才能調用該組件的wait()
方法,而調用後,當前線程將釋放鎖。
AbstractQueuedSynchronizer
,是用來構建鎖或者其它同步組件的基礎框架,它使用了一個int
成員變量表示同步狀態,經過內置的FIFO
隊列來完成資源獲取線程的排隊工做。getState()
:獲取當前同步狀態。setState(int newState)
:設置當前同步狀態。compareAndSetState(int except, int update)
:使用CAS
設置當前狀態,該方法可以保證狀態設置的原始性。CAS
的設置尾節點的方法。FIFO
,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。acquire(int arg)
方法能夠獲取同步狀態,該方法對中斷不敏感,即因爲線程獲取同步狀態失敗而進入同步隊列後,後續對線程進行中斷操做時,線程不會從同步隊列中移除。public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
它的主要邏輯是:
tryAcquire
方法,該方法保證線程安全的獲取同步狀態,這個方法須要隊列同步器的實現者來重寫。Node.EXCLUSIVE
)並經過addWaiter(Node node)
方法將該節點加入到同步隊列的尾部。private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//1.確保節點可以線程安全地被添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.經過死循環來確保節點的正確添加,在"死循環"中只有經過`CAS`將節點設置爲尾節點以後,當前線程才能從該方法返回,不然當前線程不斷地進行嘗試。
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
acquireQueued(Node node, int arg)
方法,使得該節點以死循環的方式獲取同步狀態。final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1.獲得當前節點的前驅節點
final Node p = node.predecessor();
//2.若是當前節點的前驅節點是頭節點,只有在這種狀況下獲取同步狀態成功
if (p == head && tryAcquire(arg)) {
//3.將當前節點設爲頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
能夠看到,當前線程在「死循環」中嘗試獲取同步狀態,而只有前驅節點是頭節點纔可以嘗試獲取同步狀態,這是因爲:
FIFO
原則,經過簡單地判斷本身的前驅是否爲頭節點,這樣就使得節點的釋放規則符合FIFO
,而且也便於對過早通知的處理(過早通知是指前驅節點不是頭節點的線程因爲中斷而被喚醒)當同步狀態獲取成功以後,當前線程從acquire(int arg)
方法返回,若是對於鎖這種併發組件而言,表明着當前線程獲取了鎖。
經過調用同步器的release(int arg)
方法能夠釋放同步狀態,該方法執行時,會喚醒頭節點的後繼節點線程,unparkSuccessor(Node node)
方法使用LockSupport
來喚醒處於等待狀態的線程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
總結: 1.在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中進行自旋; 2.移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。 3.在釋放同步狀態時,同步器調用
tryRelease(int arg)
方法來釋放同步狀態,而後喚醒頭節點的後繼節點。
acquireShared(int arg)
方法能夠共享式地獲取同步狀態:public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
tryAcquireShared
返回int
類型,若是同步狀態獲取成功,那麼返回值大於等於0,不然進入自旋狀態;成功獲取到同步狀態並退出自旋狀態的條件是當前節點的前驅節點爲頭節點,而且返回值大於等於0.
releaseShared(int arg)
方法釋放同步狀態,tryReleaseShared
必需要確保同步狀態線程安全釋放,通常是經過循環或CAS
來保證的,由於釋放同步狀態的操做會同時來自多個線程。public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
doAcquireNanos(int arg, long nanosTimeout)
方法能夠超時獲取同步狀態,即在指定的時間段內獲取同步狀態。synchronized
以外,對該線程進行中斷操做,此時線程中斷的標誌位會被修改,但線程依舊會阻塞在synchronized
上;若是經過acquireInterruptibly(int arg)
方法獲取,若是在等待過程當中被中斷,會馬上返回,並拋出InterruptedException
異常。private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1.計算出截止時間.
final long deadline = System.nanoTime() + nanosTimeout;
//2.加入節點
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//3.取出前驅節點
final Node p = node.predecessor();
//4.若是獲取成功則直接返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//5.若是到了超時時間,則直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//6.若是在自旋過程當中被中斷,那麼拋出異常返回
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
經過上面的代碼能夠知道,它和獨佔式獲取的區別在於未獲取到同步狀態時的處理邏輯:獨佔式獲取在獲取不到是會一直自旋等待;而超時獲取則會使當前線程等待nanosTimeout
納秒,若是當前線程在這個時間內沒有獲取到同步狀態,將會從等待邏輯中自動返回。
TwinsLock
TwinsLock
只容許至多兩個線程同時訪問,超過兩個線程的訪問將會被阻塞。
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
//初始值爲2.
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for(;;) {
//1.得到當前的狀態.
int current = getState();
//2.newCount表示剩餘可獲取同步狀態的線程數
int newCount = current - arg;
//3.若是小於0,那麼返回獲取同步狀態失敗;不然經過CAS確保設置的正確性.
if (newCount < 0 || compareAndSetState(current, newCount)) {
//4.當返回值大於等於0表示獲取同步狀態成功.
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
//將可獲取同步狀態的線程數加1.
int newCount = current + current;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@NonNull
@Override
public Condition newCondition() {
return null;
}
}
複製代碼
測試用例:
public static void createTwinsLock() {
final Lock lock = new TwinsLock();
class TwinsLockThread extends Thread {
@Override
public void run() {
Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
while (true) {
lock.lock();
try {
Thread.sleep(1000);
Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
lock.unlock();
}
}
}
}
for (int i = 0; i < 10; i++) {
Thread thread = new TwinsLockThread();
thread.start();
}
}
複製代碼
ReentrantLock
表示該鎖可以支持一個線程對資源的重複加鎖。重進入須要解決兩個問題:
n
次獲取了鎖,隨後在第n
次釋放該鎖後,其它線程可以獲取到該鎖。FIFO
。true
,表示有線程比當前線程更早地請求獲取鎖,所以須要等待前驅線程獲取並釋放鎖以後才能繼續獲取鎖;而對於非公平鎖,只要CAS
設置同步狀態成功便可。ReentrantReadWrireLock
,它支持公平性選擇、重進入、鎖降級(寫鎖可以降級爲讀鎖)。ReadWriteLock
僅定義了獲取讀鎖和寫鎖的兩個方法,即readLock
和writeLock
,而其實現ReentrantReadWriteLock
:
getReadLockCount
:返回當前讀鎖被獲取的次數。getReadHoldCount
:返回當前線程獲取讀鎖的次數。isWriteLocked
:判斷寫鎖是否被獲取。getWriteHoldCount
:返回當前線程獲取寫鎖的次數。下面是一個讀寫鎖的簡單用例:
public class ReadWriteCache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
public static Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public static Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public static void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
複製代碼
Condition
接口Condition
定義了等待/通知兩種類型的方法,當前線程調用這些方法時,須要提早獲取到Condition
對象關聯的鎖,Condition
是依賴Lock
對象的。 當調用await()
方法後,當前線程會釋放鎖並在此等待,而其餘線程調用Condition
對象的signal
方法,通知當前線程後,當前線程才從await
方法返回,而且在返回前已經獲取了鎖。 獲取一個Condition
必須經過Lock
的newCondition
方法,下面是一個有界隊列的示例:
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) { //若是當前隊列內的個數等於最大長度,那麼釋放鎖.
notFull.await();
}
if (++addIndex == items.length) { //若是已經到了尾部,那麼從頭開始.
addIndex = 0;
}
++count;
notEmpty.signal(); //通知阻塞在"空"條件上的線程.
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); //若是當前隊列的個數等於0,那麼釋放鎖.
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal(); //通知阻塞在"滿"條件上的線程.
return (T) x;
} finally {
lock.unlock();
}
}
}
複製代碼
Condition
的方法:
await()
:當前線程進入等待狀態直到被通知signal
或中斷,當前線程進入運行狀態且從await
返回的狀況:
其餘線程調用該Condition
的signal
或signalAll
方法。
其它線程中斷當前線程(interrupt
)。
若是當前等待線程從await
方法返回,那麼代表當前線程已經獲取了Condition
對象所對應的鎖。
awaitUninerruptibly
:對中斷不敏感
long await Nanos(long)
:加入了超時的判斷,返回值是(nanosTimeout
- 實際耗時),若是返回值是0或者負數,那麼能夠認定爲超時。
boolean awaitUntil(Data)
:直到某個固定時間。
signal
:喚醒一個等待在Condition
上的線程。
signalAll
:喚醒全部等待在Condition
上的線程。
Condition
的實現ConditionObject
是AbstractQueuedSynchronizer
的內部類,每一個Condition
對象都包含着一個隊列。
在隊列中的每一個節點都包含了一個線程的引用,該線程就是在Condition
對象上等待的線程,同步隊列和等待隊列中節點的類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node
。 因爲Condition
的實現是同步器的內部類,所以每一個Condition
實例都可以訪問同步器提供的方法,至關於每一個Condition
都擁有所屬同步器的引用。 當調用await
方法時,將會以當前線程構造節點,並將節點從尾部加入到等待隊列,也就是將同步隊列移動到**Condition
**隊列當中。
調用該方法的前提是當前線程必須獲取了鎖,也就是同步隊列中的首節點,它不是直接加入到等待隊列當中,而是經過addConditionWaiter()
方法把當前線程構形成一個新的節點並將其加入到等待隊列當中。
調用該方法的前提是當前線程必須獲取了鎖,接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport
喚醒節點中的線程。 被喚醒的線程,將從await
方法中的while
中返回,進而調用同步器的acquireQueued
方法加入到獲取同步狀態的競爭中。 Condition
的signalAll
方法,至關於對等待隊列中的每一個節點均執行一次signal
方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點。
Java
併發容器和框架ConcurrentHashMap
ConcurrentHashMap
是線程安全而且高效的HashMap
,其它的相似容器有如下缺點:
HashMap
在併發執行put
操做時,會致使Entry
鏈表造成環形數據結構,就會產生死循環獲取Entry
。HashTable
使用synchronized
來保證線程安全,但在線程競爭激烈的狀況下HashTable
的效率很是低下。 ConcurrentHashMap
高效的緣由在於它採用鎖分段技術,首先將數據分紅一段一段地存儲,而後給每段數據配一把鎖,當一個線程佔用鎖而且訪問一段數據的時候,其餘段的數據也能被其餘線程訪問。ConcurrentHashMap
的結構ConcurrentHashMap
是由Segment
數組結構和HashEntry
數組結構組成:
Segment
是一種可重入鎖,在ConcurrentHashMap
裏面扮演鎖的角色;HashEntry
則用於存儲鍵值對數據。一個ConcurrentHashMap
裏包含一個Segment
數組,它的結構和HashMap
相似,是一種數組和鏈表結構。 一個Segment
裏包含一個HashEntry
數組,每一個HashEntry
是一個鏈表結構的元素,每一個Segment
守護着一個HashEntry
裏的元素,當對HashEntry
數組的數據進行修改時,必須首先得到與它對應的Segment
鎖。
ConcurrentHashMap
的操做get
get
的高效在於整個get
過程當中不須要加鎖,除非讀到的值是空纔會加鎖重讀。緣由是它的get
方法將要使用的共享變量都設爲volatile
,可以在線程間保持可見性,可以被多線程同時讀,而且不會讀到過時的值,例如用於統計當前Segment
大小的count
字段和用於存儲值的HashEntry
的value
。 put
put
方法裏須要對共享變量進行寫入操做,因此爲了線程安全,在操做共享變量以前必須加鎖,put
首先定位到Segment
,而後在Segment
裏進行插入操做。 size
先嚐試2次經過不鎖住Segment
的方式來統計各個Segment
的大小,若是統計的過程當中,容器的count
發生了變化,則再用加鎖的方式來統計全部Segment
的大小。
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,它採用CAS
算法來實現。
入隊主要作兩件事情:
tail
節點,若是tail
節點的next
節點不爲空,則將入隊節點設置成tail
節點;若是tail
節點的next
節點爲空,則將入隊節點設置成tail
的next
節點。在多線程狀況下,若是有一個線程正在入隊,那麼它必須先獲取尾節點,而後設置尾節點的下一個節點爲入隊節點,但這時可能有另一個線程插隊了,那麼隊列的尾節點就會發生變化,這時第一個線程要暫停入隊操做,而後從新獲取尾節點。 整個入隊操做主要作兩件事:
CAS
算法將入隊節點設置成尾節點的next
節點,如不成功則重試。阻塞隊列是一個支持兩個附加操做的隊列,這兩個附加的操做支持阻塞的插入和移除方法:
在阻塞隊列不可用時,附加操做提供了4種處理方式:拋出異常、返回特殊值、一直阻塞、超時退出。每種方式經過調用不一樣的方法來實現。 Java
裏面提供了7種阻塞隊列。
Fork/Join
框架用於並行執行任務的框架,是把一個大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大人物結果的框架。 Fork/Join
使用兩個類來完成事情:
ForkJoinTask
:它提供了fork()
和join()
操做的機制,一般狀況下,咱們繼承它的子類:有返回結果的RecursiveTask
和沒有返回結果的RecursiveAction
。ForkJoinPool
:ForkJoinTask
須要經過ForkJoinPool
來添加。 ForkJoinTask
在執行的時候可能會拋出異常,可是咱們沒有辦法在主線程裏直接捕獲異常,因此ForkJoinTask
提供了isCompletedAbnormally()
方法來檢查任務是否已經拋出異常或已經取消了。 ForkJoinPool
由ForkJoinTask
數組和ForkJoinWorkerThread
數組組成,ForkJoinTask
數組負責將存放程序提交給ForkJoinPool
的任務,而ForkJoinWorkerThread
數組負責執行這些任務。Java
中的13個原子操做類Atomic
包裏提供了:原子更新基本類型、原子更新數組、原子更新引用和原子更新屬性。
AtomicBoolean
AtomicInteger
AtomicLong
基本方法:
int addAndGet(int delta)
:以原子方式將輸入的值與當前的值相加,並返回結果。boolean compareAndSet(int expect, int update)
:若是當前的數值等於預期值,則以原子方式將該值設置爲輸入的值。int getAndIncrement()
:以原子方式加1,並返回自增前的值。void lazySet(int newValue)
:最終會設置成newValue
,可能會致使其餘線程在以後的一小段時間內仍是讀到舊值。int getAndSet(int newValue)
:以原子方式設置爲newValue
的值,並返回舊值。AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
基本方法:
int addAndGet(int i, int delta)
:以原子方式將輸入值和索引i
的元素相加。boolean compareAndSet(int i, int expect, int update)
:若是當前值等於預期值,則以原子方式將數組位置i
的元素設置成update
值。用於原子更新多個變量,提供了3種類型:
AtomicReference
:原子更新引用類型。AtomicReferenceFieldUpdater
:原子更新引用類型裏的字段。AtomicMarkableReference
:原子更新帶有標記位的引用類型。AtomicIntegerFieldUpdater
:原子更新整形的字段的更新器。AtomicLongFieldUpdater
:原子更新長整形字段的更新器。AtomicStampedReference
:原子更新帶有版本號的引用類型。原子地更新字段須要兩步:
newUpdater
建立一個更新器,而且須要設置想要更新的類和屬性。public volatile
來修飾。Java
中的併發工具類Java
中的線程池線程池的優勢:下降資源消耗,提升響應速度,提升線程的可管理性。
線程池的處理流程以下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //1.添加進入核心線程. if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //2.添加進入隊列. int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //3.添加進入非核心線程. reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 複製代碼
在以上的三步中,除了加入隊列不用獲取全局鎖之外,其它兩種狀況都須要獲取,爲了儘量地避免獲取全局鎖,在ThreadPoolExecutor
完成預熱以後(當前運行的線程數大於corePoolSize
),幾乎全部的execute
方法調用都是加入到隊列當中。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
corePoolSize
:當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其它空閒的基本線程可以執行新任務也會建立。runnableTaskQueue
:用於保存等待執行的任務的阻塞隊列,能夠選擇:ArrayBlockingQueue
:基於數組結構的有界阻塞隊列。LinkedBlockingQueue
:基於鏈表結構的阻塞隊列,吞吐量高於前者。SynchronousQueue
:不存儲元素的阻塞隊列,每一個插入操做必須等待另外一個線程調用了移除操做,靜態工廠方法Executors.newCachedThreadPool
使用了這個隊列。PriorityBlockingQueue
:一個具備優先級的無限阻塞隊列。maxPoolSize
:容許建立的最大線程數。ThreadFactory
:用於設置建立線程的工廠。RejectExecutionHandler
:飽和策略。keepAliveTime
:線程池的工做線程空閒後,保持存活的時間。TimeUnit
:線程保持活動的單位。execute(Runnable runnable)
:提交不須要返回值的任務。Future<Object> future = executor.submit(haveReturnValuetask)
:用於提交須要返回值的任務,線程池會返回一個future
類型任務,能夠用它來判斷任務是否執行成功,而且能夠經過get
方法來獲取返回值,get
方法會阻塞當前線程直到任務完成。shutdownNow
:首先將線程池的狀態設爲STOP
,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表。shutdown
:將線程池的狀態置爲SHUTDOWN
,而後中斷全部沒有正在執行任務的線程。Executor
框架(1)在上層,Java
多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor
框架)將這些任務映射爲固定數量的線程。 (2)在HotSpot VM
的線程模型中,Java
線程再被一對一映射爲本地操做系統線程,Java
線程啓動時會建立一個本地操做系統線程,當該線程終止時,這個操做系統線程也會被回收。 (3)操做系統會調度全部線程並將它們分配給可用的CPU
。
Executor
框架由三個部分組成:
Runnable
接口或Callable
接口。Executor
,以及繼承自Executor
的ExecutorService
,還有它的兩個關鍵類ThreadPoolExecutor
(用來執行任務)和ScheduledThreadPoolExecutor
(能夠在給定的延遲後運行命令,或者按期執行命令)。Future
和實現類FutureTask
。ThreadPoolExecutor
詳解經過工具類Executors
,能夠建立如下三種類型的ThreadPoolExecutor
,調用靜態建立方法以後,會返回ExecutorService
FixedThreadPool
可重用固定線程數的線程池;若是當前運行的線程數少於corePoolSize
,則建立新線程來執行任務;若是等於corePoolSize
,將任務加入到無界隊列LinkedBlockingQueue
當中;多餘的空閒線程將會被當即終止。SingleThreadPool
單個woker
線程的executor
;corePoolSize
和maximumPoolSize
爲1;採用無界隊列做爲工做隊列。CacheThreadPool
採用沒有容量的SynchronousQueue
做爲線程池的工做隊列,其corePoolSize
爲0,maximumPool
是無界的;其中的空閒線程最多等待60s。 若是主線程提交任務的速度高於maximumPool
中線程處理任務的速度時,CacheThreadPool
會不斷建立新線程,極端狀況下,CacheThreadPool
會由於建立過多線程而耗盡CPU
資源。ScheduledThreadPoolExecutor
詳解用來在給定的延遲以後執行任務,或者按期執行任務,而且能夠在指定的構造函數中指定多個對應的後臺線程數。 它採用DelayQueue
這個無界隊列做爲工做隊列,其執行分爲兩個部分:
ScheduledThreadPoolExecutor
的scheduleAtFixedRate()
或者scheduleWithFIxedDelay
,它會向DelayQueue
中添加ScheduledFutureTask
。DelayQueue
中獲取ScheduledFutureTask
。