https://github.com/Wasabi1234...java
同時擁有兩個或者多個線程,若是程序在單核處理器上運行多個線程將交替地換入或者換出內存,這些線程是同時「存在"的,每一個線程都處於執行過程當中的某個狀態,若是運行在多核處理器上,此時,程序中的每一個線程都將分配到一個處理器核上,所以能夠同時運行.git
互聯網分佈式系統架構設計中必須考慮的因素之一,一般是指,經過設計保證系統可以同時並行處理不少請求.github
CPU的頻率太快了,快到主存跟不上
如此,在處理器時鐘週期內,CPU經常須要等待主存,浪費資源。因此cache的出現,是爲了緩解CPU和內存之間速度的不匹配問題(結構:cpu-> cache-> memory ).緩存
若是某個數據被訪問,那麼與它相鄰的數據很快也可能被訪問安全
用於保證多個 CPU cache 之間緩存共享數據的一致數據結構
該緩存行只被緩存在該 CPU 的緩存中,而且是被修改過的,與主存中數據是不一致的,需在將來某個時間點寫回主存,該時間是容許在其餘CPU 讀取主存中相應的內存以前,當這裏的值被寫入主存以後,該緩存行狀態變爲 E多線程
緩存行只被緩存在該 CPU 的緩存中,未被修改過,與主存中數據一致
可在任什麼時候刻當被其餘 CPU讀取該內存時變成 S 態,被修改時變爲 M態架構
該緩存行可被多個 CPU 緩存,與主存中數據一致併發
處理器爲提升運算速度而作出違背代碼原有順序的優化app
以上兩者一般和線程池搭配
下面開始作併發模擬
package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @Slf4j @NotThreadSafe public class ConcurrencyTest { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時併發執行的線程數 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量,給出容許併發的線程數目 final Semaphore semaphore = new Semaphore(threadTotal); //統計計數結果 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); //將請求放入線程池 for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { //信號量的獲取 semaphore.acquire(); add(); //釋放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); //關閉線程池 executorService.shutdown(); log.info("count:{}", count); } /** * 統計方法 */ private static void add() { count++; } }
運行發現結果隨機,因此非線程安全
當多個線程訪問某個類時,無論運行時環境採用何種調度方式
或者這些進程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同
,這個類都能表現出正確的行爲
,那麼就稱這個類是線程安全的
提供了互斥訪問,同一時刻只能有一個線程來對它進行操做
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @Slf4j @ThreadSafe public class AtomicExample2 { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時併發執行的線程數 */ public static int threadTotal = 200; /** * 工做內存 */ public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { System.out.println(); semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); //主內存 log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @author shishusheng * @date 18/4/3 */ @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { // 2 count.compareAndSet(0, 2); // no count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // 4 count.compareAndSet(2, 4); // no count.compareAndSet(3, 5); log.info("count:{}", count.get()); } }
synchronized:依賴 JVM
package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j @ThreadSafe public class CountExample3 { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時併發執行的線程數 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private synchronized static void add() { count++; } }
synchronized 修正計數類方法
子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!
Lock: 依賴特殊的 CPU 指令,代碼實現
個值
一個線程對主內存的修改能夠及時的被其餘線程觀察到
JMM關於synchronized的規定
用共享變量時須要從主內存中從新讀取最新的值(加鎖與解鎖是同一把鎖
)
經過加入內存屏障和禁止重排序優化來實現
屏障指令,將本地內存中的共享變量值刷新到主內存
屏障指令,從主內存中讀取共享變量
volatile boolean inited = false; //線程1: context = loadContext(); inited= true; // 線程2: while( !inited ){ sleep(); } doSomethingWithConfig(context)
一個線程觀察其餘線程中的指令執行順序,因爲指令重排序的存在,該觀察結果通常雜亂無序
JMM容許編譯器和處理器對指令進行重排序,可是重排序過程不會影響到單線程程序的執行,卻會影響到多線程併發執行的正確性
package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; /** * 懶漢模式 -》 雙重同步鎖單例模式 * 單例實例在第一次使用時進行建立 * @author shishusheng */ @NotThreadSafe public class SingletonExample4 { /** * 私有構造函數 */ private SingletonExample4() { } // 一、memory = allocate() 分配對象的內存空間 // 二、ctorInstance() 初始化對象 // 三、instance = memory 設置instance指向剛分配的內存 // JVM和cpu優化,發生了指令重排 // 一、memory = allocate() 分配對象的內存空間 // 三、instance = memory 設置instance指向剛分配的內存 // 二、ctorInstance() 初始化對象 /** * 單例對象 */ private static SingletonExample4 instance = null; /** * 靜態的工廠方法 * * @return */ public static SingletonExample4 getInstance() { // 雙重檢測機制 // B if (instance == null) { // 同步鎖 synchronized (SingletonExample4.class) { if (instance == null) { // A - 3 instance = new SingletonExample4(); } } } return instance; } }
同步組件
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 指定時間內處理任務 * * @author shishusheng * */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { // 嘗試獲取一個許可 if (semaphore.tryAcquire()) { test(threadNum); // 釋放一個許可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
看出是順序執行的