1、概念html
1.定義:當多個線程訪問某個類時,無論運行時環境採用何種調度方式或者這些進程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同,這個類都能表現出正確的行爲,那麼就稱這個類是線程安全的。java
2.線程安全性:算法
原子性:提供了互斥訪問,同一時刻只能有一個線程來對它進行操做。 安全
可見性:一個線程對主內存的修改能夠及時的被其餘線程觀察到。 數據結構
有序性:一個線程觀察其餘線程中的指令執行順序,因爲指令重排排序的存在,該觀察結果通常雜亂無序。多線程
2、原子性-Atomic併發
1.原子性--Atomic包ide
原子英文單詞爲:atomic,剛恰好Java下定義了這樣的類,好比:AtomicXXX:CAS;AtomicLong、LongAdder。高併發
2.爲何要使用這個呢?或者說在什麼場景下使用呢?工具
在併發場景中,當多線程須要對同一份資源作操做時,就會產生線程安全問題。以最簡單的int i++爲例,i++並非原子操做,編譯出來後分爲三步:1,獲取值;2,修改值;3,設置值。若是有多線程執行i++,則一般不會獲得正確的結果。舉例以下:
/*** @author 繁榮Aaron*/public class ActiomTest {static Logger logger = LoggerFactory.getLogger(ActiomTest.class);private static int n = 0;public static void main(String[] args) throws Exception {Thread t1 = new Thread() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) {n++;try { Thread.currentThread().sleep(10); } catch (InterruptedException e) { }}}};Thread t2 = new Thread() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) {n++;try { Thread.currentThread().sleep(10); } catch (InterruptedException e) { }}}};t1.start();t2.start();t1.join();t2.join();logger.info("n = {}", n);}}
結果以下:
並非咱們所須要的結果:2000。因此必須用方法進行解決。
解決方式,以下:
1.使用synchronized關鍵字,具體使用參考先前的文章
(https://my.oschina.net/u/2380961/blog/1594040)。
2.JDK併發包裏提供了不少線程安全的類。如:int對應線程安全的AtomicInteger。相似的還有:AtomicBoolean,AtomicLong,AtomicReference。
3.如何使用?
舉例,以下:
public class AtomicExample1 {// 請求總數 public static int clientTotal = 5000;// 同時併發執行的線程數 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {//semaphore.acquire();//add();count.incrementAndGet();//semaphore.release();} catch (Exception e) {e.printStackTrace();}//countDownLatch.countDown();});}//countDownLatch.await();executorService.shutdown();System.out.println(count.get()); }private static void add() {int i = count.incrementAndGet();// count.getAndIncrement();//System.out.println(i);}}
一開始,去掉Semaphore 和CountDownLatch 兩個工具類。總共執行5000次,那麼輸出的結果也應該是5000。可是真實結果卻不是,若是多執行幾回機會出現以下的錯誤,結果倒是4997:
因此上面若是缺乏CountDownLatch 這個工具類,是沒法達到線程安全的,就算是AtomicInteger類。具體緣由,我沒有弄清楚,就算是加上volatile關鍵字也不行的:
只要打開了CountDownLatch 關鍵字才能夠,下面的程序是線程安全的:
@ThreadSafepublic class AtomicExample1 {// 請求總數public static int clientTotal = 5000;// 同時併發執行的線程數public static int threadTotal = 200;public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {//semaphore.acquire();//add();count.incrementAndGet();//semaphore.release();} catch (Exception e) {e.printStackTrace();}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();System.out.println(count.get());}private static void add() {int i = count.incrementAndGet();// count.getAndIncrement();//System.out.println(i);}}
#API
public final int get() //獲取當前的值public final int getAndSet(int newValue)//獲取當前的值,並設置新的值public final int getAndIncrement()//獲取當前的值,並自增public final int getAndDecrement() //獲取當前的值,並自減public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值integer.incrementAndGet(); //先+1,而後在返回值,至關於++iinteger.decrementAndGet();//先-1,而後在返回值,至關於--iinteger.addAndGet(1);//先+n,而後在返回值,
總結:
1.使用的是線程池技術,特別須要注意的是就算是AtomicInteger類若是是單獨的使用,也是線程不安全的。關於上面的緣由 爲何會致使線程不安全後面講了線程池在敘說。
2.CountDownLatch 關鍵字只是保證了線程的執行,並不線程的原子性,那麼究竟是什麼緣由使AtomicInteger保持原子性呢?
4.atomic原理之CAS
CAS,Compare and Swap即比較並交換,設計併發算法時經常使用到的一種技術,java.util.concurrent包徹底創建在CAS之上,沒有CAS也就沒有此包,可見CAS的重要性。
當前的處理器基本都支持CAS,只不過不一樣的廠家的實現不同罷了。CAS有三個操做數:內存值V、舊的預期值A、要修改的值B,當且僅當預期值A和內存值V相同時,將內存值修改成B並返回true,不然什麼都不作並返回false。固然更加底層的,就是Unsafe實現的,看下Unsafe下的三個方法:
public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);#該方法爲本地方法,有四個參數,分別表明:對象、對象的地址、預期值、修改值public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);
Java內部原理代碼,以下:
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;
這裏, unsafe是java提供的得到對對象內存地址訪問的類,註釋已經清楚的寫出了,它的做用就是在更新操做時提供「比較並替換」的做用。實際上就是AtomicInteger中的一個工具。
valueOffset是用來記錄value自己在內存的便宜地址的,這個記錄,也主要是爲了在更新操做在內存中找到value的位置,方便比較。
注意:value是用來存儲整數的時間變量,這裏被聲明爲volatile,就是爲了保證在更新操做時,當前線程能夠拿到value最新的值(併發環境下,value可能已經被其餘線程更新了)。
下面找一個方法getAndIncrement來研究一下AtomicInteger是如何實現的,好比咱們經常使用的addAndGet方法:
public final int addAndGet(int delta) {for (;;) {int current = get();int next = current + delta;if (compareAndSet(current, next))return next;}}
這段代碼如何在不加鎖的狀況下經過CAS實現線程安全,咱們不妨考慮一下方法的執行:
一、AtomicInteger裏面的value原始值爲3,即主內存中AtomicInteger的value爲3,根據Java內存模型,線程1和線程2各自持有一份value的副本,值爲3
二、線程1運行到第三行獲取到當前的value爲3,線程切換
三、線程2開始運行,獲取到value爲3,利用CAS對比內存中的值也爲3,比較成功,修改內存,此時內存中的value改變比方說是4,線程切換
四、線程1恢復運行,利用CAS比較發現本身的value爲3,內存中的value爲4,獲得一個重要的結論-->此時value正在被另一個線程修改,因此我不能去修改它
五、線程1的compareAndSet失敗,循環判斷,由於value是volatile修飾的,因此它具有可見性的特性,線程2對於value的改變能被線程1看到,只要線程1發現當前獲取的value是4,內存中的value也是4,說明線程2對於value的修改已經完畢而且線程1能夠嘗試去修改它
六、最後說一點,好比說此時線程3也準備修改value了,不要緊,由於比較-交換是一個原子操做不可被打斷,線程3修改了value,線程1進行compareAndSet的時候必然返回的false,這樣線程1會繼續循環去獲取最新的value並進行compareAndSet,直至獲取的value和內存中的value一致爲止
整個過程當中,利用CAS機制保證了對於value的修改的線程安全性。
CAS的缺陷
CAS雖然高效地解決了原子操做,可是仍是存在一些缺陷的,主要表如今三個方法:循環時間太長、只能保證一個共享變量原子操做、ABA問題。
循環時間太長
若是CAS一直不成功呢?這種狀況絕對有可能發生,若是自旋CAS長時間地不成功,則會給CPU帶來很是大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。
只能保證一個共享變量原子操做
看了CAS的實現就知道這隻能針對一個共享變量,若是是多個共享變量就只能使用鎖了,固然若是你有辦法把多個變量整成一個變量,利用CAS也不錯。例如讀寫鎖中state的高地位
ABA問題
CAS須要檢查操做值有沒有發生改變,若是沒有發生改變則更新。可是存在這樣一種狀況:若是一個值原來是A,變成了B,而後又變成了A,那麼在CAS檢查的時候會發現沒有改變,可是實質上它已經發生了改變,這就是所謂的ABA問題。對於ABA問題其解決方案是加上版本號,即在每一個變量都加上一個版本號,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。
缺陷的解方式:CAS的ABA隱患問題,解決方案則是版本號,Java提供了AtomicStampedReference來解決。AtomicStampedReference經過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題。對於上面的案例應該線程1會失敗。
#四個參數,分別表示:預期引用、更新後的引用、預期標誌、更新後的標誌public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedStamp == current.stamp &&((newReference == current.reference &&newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));}
代碼案例:
Thread tsf1 = new Thread(new Runnable() {@Overridepublic void run() {try {//讓 tsf2先獲取stamp,致使預期時間戳不一致TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}// 預期引用:100,更新後的引用:110,預期標識getStamp() 更新後的標識getStamp() + 1atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);}});Thread tsf2 = new Thread(new Runnable() {@Overridepublic void run() {int stamp = atomicStampedReference.getStamp();try {TimeUnit.SECONDS.sleep(2); //線程tsf1執行完} catch (InterruptedException e) {e.printStackTrace();}System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));}});tsf1.start();tsf2.start();
3、原子性-鎖
鎖,主要講兩個關鍵字:synchronized(依賴JVM);Lock(依賴特殊的CPU指令,代碼實現 ,ReentrantLock)。
1.synchronized
synchronized能夠保證方法或者代碼塊在運行時,同一時刻只有一個方法能夠進入到臨界區,同時它還能夠保證共享變量的內存可見性。具體的使用參考博客地址(https://my.oschina.net/u/2380961/blog/1594040)。
使用須要主要的地方:
修飾代碼塊:大括號括起來的代碼,做用於調用的對象。
修飾方法:整個方法,做用於調用的對象。
修飾靜態方法:整個靜態方法,做用於全部對象。
修飾類:括號括起來的部分,做用於全部對象。
當一個線程訪問同步代碼塊時,它首先是須要獲得鎖才能執行同步代碼,當退出或者拋出異常時必需要釋放鎖,那麼它是如何來實現這個機制的呢?咱們先看一段簡單的代碼:
public class SynchronizedTest {public synchronized void test1(){}public void test2(){synchronized (this){}}}
利用javap工具查看生成的class文件信息來分析Synchronize的實現:
從上面能夠看出,同步代碼塊是使用monitorenter和monitorexit指令實現的,同步方法(在這看不出來須要看JVM底層實現)依靠的是方法修飾符上的ACC_SYNCHRONIZED實現。具體體現,以下:
進入,獲取鎖:
每一個對象有一個監視器鎖(monitor)。當monitor被佔用時就會處於鎖定狀態,線程執行monitorenter指令時嘗試獲取monitor的全部權,過程以下:
一、若是monitor的進入數爲0,則該線程進入monitor,而後將進入數設置爲1,該線程即爲monitor的全部者。
二、若是線程已經佔有該monitor,只是從新進入,則進入monitor的進入數加1.
3.若是其餘線程已經佔用了monitor,則該線程進入阻塞狀態,直到monitor的進入數爲0,再從新嘗試獲取monitor的全部權。
釋放鎖:
執行monitorexit的線程必須是objectref所對應的monitor的全部者。
指令執行時,monitor的進入數減1,若是減1後進入數爲0,那線程退出monitor,再也不是這個monitor的全部者。其餘被這個monitor阻塞的線程能夠嘗試去獲取這個 monitor 的全部權。
經過這兩段描述,咱們應該能很清楚的看出Synchronized的實現原理,Synchronized的語義底層是經過一個monitor的對象來完成,其實wait/notify等方法也依賴於monitor對象,這就是爲何只有在同步的塊或者方法中才能調用wait/notify等方法,不然會拋出java.lang.IllegalMonitorStateException的異常的緣由。
同步代碼塊:monitorenter指令插入到同步代碼塊的開始位置,monitorexit指令插入到同步代碼塊的結束位置,JVM須要保證每個monitorenter都有一個monitorexit與之相對應。任何對象都有一個monitor與之相關聯,當且一個monitor被持有以後,他將處於鎖定狀態。線程執行到monitorenter指令時,將會嘗試獲取對象所對應的monitor全部權,即嘗試獲取對象的鎖;
同步方法:synchronized方法則會被翻譯成普通的方法調用和返回指令如:invokevirtual、areturn指令,在VM字節碼層面並無任何特別的指令來實現被synchronized修飾的方法,而是在Class文件的方法表中將該方法的access_flags字段中的synchronized標誌位置1,表示該方法是同步方法並使用調用該方法的對象或該方法所屬的Class在JVM的內部對象表示Klass作爲鎖對象。
(摘自:http://www.cnblogs.com/javaminer/p/3889023.html)
2.Lock
首先,這篇將不介紹Lock的使用,具體的APi使用參考這邊博客地址:Java多線程知識點整理(Lock鎖):
https://my.oschina.net/u/2380961/blog/1595357
@Slf4j@ThreadSafepublic class LockExample2 {// 請求總數 public static int clientTotal = 5000;// 同時併發執行的線程數 public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock();public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {semaphore.acquire();add();semaphore.release();} catch (Exception e) {//log.error("exception", e);}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();//log.info("count:{}", count);}private static void add() {lock.lock();try {count++;} finally {lock.unlock();}}}
2.1 爲何要使用Lock鎖?
Java的內置鎖一直都是備受爭議的,在JDK 1.6以前,synchronized這個重量級鎖其性能一直都是較爲低下,雖然在1.6後,進行大量的鎖優化策略,可是與Lock相比synchronized仍是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基於JVM機制),可是它卻缺乏了獲取鎖與釋放鎖的可操做性,可中斷、超時獲取鎖,且它爲獨佔式在高併發場景下性能大打折扣。
AbstractQueuedSynchronizer,簡稱AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一個內部類是這個抽象類的子類。因爲AQS是基於FIFO隊列的實現,所以必然存在一個個節點,Node就是一個節點,Node裏面有不少方法。
整個AQS是典型的模板模式的應用,設計得十分精巧,對於FIFO隊列的各類操做在AQS中已經實現了,AQS的子類通常只須要重寫tryAcquire(int arg)和tryRelease(int arg)兩個方法便可。
AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。
AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
2.二、AbstactQueuedSynchronizer的基本數據結構
1.AbstractQueuedSynchronizer的等待隊列是CLH隊列的變種,CLH隊列一般用於自旋鎖,AbstractQueuedSynchronizer的等待隊列用於阻塞同步器。
2.每一個節點中持有一個名爲"status"的字段用因而否一條線程應當阻塞的追蹤,可是status字段並不保證加鎖。
3.一條線程若是它處於隊列頭的下一個節點,那麼它會嘗試去acquire,可是acquire並不保證成功,它只是有權利去競爭。
4.要進入隊列,你只須要自動將它拼接在隊列尾部便可;要從隊列中移除,你只須要設置header字段。
2.三、AbstractQueuedSynchronizer供子類實現的方法
AbstractQueuedSynchzonizer是基於模板模式的實現,不過它的模板模式寫法有點特別,整個類中沒有任何一個abstract的抽象方法,取而代之的是,須要子類去實現的那些方法經過一個方法體拋出UnsupportedOperationException(集合的使用也會拋出這個異常)異常來讓子類知道。
這個類的acquire很差翻譯,因此就直接原詞放上來了,由於acquire是一個動詞,後面並無帶賓語,所以不知道具體acquire的是什麼。按照我我的理解,acquire的意思應當是根據狀態字段state去獲取一個執行當前動做的資格。
好比ReentrantLock的lock()方法最終會調用acquire方法,那麼:
1.線程1去lock(),執行acquire,發現state=0,所以有資格執行lock()的動做,將state設置爲1,返回true。
2.線程2去lock(),執行acquire,發現state=1,所以沒有資格執行lock()的動做,返回false。
2.四、獨佔模式acquire實現流程
看一下AbstractQuueuedSynchronizer的acquire方法實現流程,acquire方法是用於獨佔模式下進行操做的:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire方法前面說過了,是子類實現的一個方法,若是tryAcquire返回的是true(成功),即代表當前線程得到了一個執行當前動做的資格,天然也就不須要構建數據結構進行阻塞等待。
若是tryAcquire方法返回的是false,那麼當前線程沒有得到執行當前動做的資格,接着執行"acquireQueued(addWaiter(Node.EXCLUSIVE), arg))"這句代碼,這句話很明顯,它是由兩步構成的:
addWaiter,添加一個等待者。
acquireQueued,嘗試從等待隊列中去獲取執行一次acquire動做。
利用LockSupport(這個使用到了sun.misc.Unsafe UNSAFE;)的park方法讓當前線程阻塞。
總結:這個方法的主要目的是爲了構建成一個數據結構,同時獲取鎖的狀態。
2.五、獨佔模式release流程
上面整理了獨佔模式的acquire流程,看到了等待的Node是如何構建成一個數據結構的,下面看一下釋放的時候作了什麼,release方法的實現爲:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
tryRelease一樣是子類去實現的,表示當前動做我執行完了,要釋放我執行當前動做的資格,講這個資格讓給其它線程,而後tryRelease釋放成功,獲取到head節點,若是head節點的waitStatus不爲0的話,執行unparkSuccessor方法,顧名思義unparkSuccessor意爲unpark頭結點的繼承者。
流程:
1.頭節點的waitStatus<0,將頭節點的waitStatus設置爲0 。
2.拿到頭節點的下一個節點s,若是s==null或者s的waitStatus>0(被取消了),那麼從隊列尾巴開始向前尋找一個waitStatus<=0的節點做爲後繼要喚醒的節點。
最後,若是拿到了一個不等於null的節點s,就利用LockSupport的unpark方法讓它取消阻塞。
總結
1、對比
synchronized:不可中斷鎖,適合競爭不激烈,可讀性好。
Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態。
Atomic:競爭激烈時能維持常態,比Lock性能好,只能同步一個值。