java多線程我我的以爲是javaSe中最難的一部分,我之前也是感受學會了,可是真正有多線程的需求殊不知道怎麼下手,實際上仍是對多線程這塊知識瞭解不深入,不知道多線程api的應用場景,不知道多線程的運行流程等等,本篇文章將使用實例+圖解+源碼的方式來解析java多線程。java
文章篇幅較長,你們也能夠有選擇的看具體章節,建議多線程的代碼所有手敲,永遠不要相信你看到的結論,本身編碼後運行出來的,纔是本身的。數據庫
進程api
線程緩存
併發:單核cpu運行多線程時,時間片進行很快的切換。線程輪流執行cputomcat
並行:多核cpu運行 多線程時,真正的在同一時刻運行安全
java提供了豐富的api來支持多線程。服務器
多線程能實現的均可以用單線程來完成,那單線程運行的好好的,爲何java要引入多線程的概念呢?網絡
多線程的好處:多線程
單線程只有一條執行線,過程容易理解,能夠在大腦中清晰的勾勒出代碼的執行流程併發
多線程倒是多條線,並且通常多條線之間有交互,多條線之間須要通訊,通常難點有如下幾點
有時候但願本身變成一個字節穿梭於服務器中,搞清楚前因後果,就像無敵破壞王同樣(沒看過這部電影的能夠看下,腦洞大開)。
任務: 線程的執行體。也就是咱們的核心代碼邏輯
定義任務
Thread實現任務的侷限性
Runnable和Callable解決了Thread的侷限性
可是Runbale相比Callable有如下的侷限性
以下代碼 幾種定義線程的方式
@Slf4j class T extends Thread { @Override public void run() { log.info("我是繼承Thread的任務"); } } @Slf4j class R implements Runnable { @Override public void run() { log.info("我是實現Runnable的任務"); } } @Slf4j class C implements Callable<String> { @Override public String call() throws Exception { log.info("我是實現Callable的任務"); return "success"; } }
建立線程的方式
啓動線程的方式
// 啓動繼承Thread類的任務 new T().start(); // 啓動繼承Thread匿名內部類的任務 可用lambda優化 Thread t = new Thread(){ @Override public void run() { log.info("我是Thread匿名內部類的任務"); } }; // 啓動實現Runnable接口的任務 new Thread(new R()).start(); // 啓動實現Runnable匿名實現類的任務 new Thread(new Runnable() { @Override public void run() { log.info("我是Runnable匿名內部類的任務"); } }).start(); // 啓動實現Runnable的lambda簡化後的任務 new Thread(() -> log.info("我是Runnable的lambda簡化後的任務")).start(); // 啓動實現了Callable接口的任務 結合FutureTask 能夠獲取線程執行的結果 FutureTask<String> target = new FutureTask<>(new C()); new Thread(target).start(); log.info(target.get());
以上各個線程相關的類的類圖以下
多核cpu下,多線程是並行工做的,若是線程數多,單個核又會併發的調度線程,運行時會有上下文切換的概念
cpu執行線程的任務時,會爲線程分配時間片,如下幾種狀況會發生上下文切換。
當發生上下文切換時,操做系統會保存當前線程的狀態,並恢復另外一個線程的狀態,jvm中有塊內存地址叫程序計數器,用於記錄線程執行到哪一行代碼,是線程私有的。
idea打斷點的時候能夠設置爲Thread模式,idea的debug模式能夠看出棧幀的變化
yield()方法會讓運行中的線程切換到就緒狀態,從新爭搶cpu的時間片,爭搶時是否獲取到時間片看cpu的分配。
代碼以下
// 方法的定義 public static native void yield(); Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ Thread.yield(); log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.start(); t2.start(); // 運行結果 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512 11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
如上述結果所示,t2線程每次執行時進行了yield(),線程1執行的機會明顯比線程2要多。
線程的優先級
線程內部用1~10的數來調整線程的優先級,默認的線程優先級爲NORM_PRIORITY:5
cpu比較忙時,優先級高的線程獲取更多的時間片
cpu比較閒時,優先級設置基本沒用
public final static int MIN_PRIORITY = 1; public final static int NORM_PRIORITY = 5; public final static int MAX_PRIORITY = 10; // 方法的定義 public final void setPriority(int newPriority) { }
cpu比較忙時
Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.NORM_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的運行結果 11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
cpu比較閒時
Runnable r1 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.MIN_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的運行結果 線程1優先級低 卻先運行完 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
默認狀況下,java進程須要等待全部線程都運行結束,纔會結束,有一種特殊線程叫守護線程,當全部的非守護線程都結束後,即便它沒有執行完,也會強制結束。
默認的線程都是非守護線程。
垃圾回收線程就是典型的守護線程
// 方法的定義 public final void setDaemon(boolean on) { } Thread thread = new Thread(() -> { while (true) { } }); // 具體的api。設爲true表示未守護線程,當主線程結束後,守護線程也結束。 // 默認是false,當主線程結束後,thread繼續運行,程序不中止 thread.setDaemon(true); thread.start(); log.info("結束");
線程的阻塞能夠分爲好多種,從操做系統層面和java層面阻塞的定義可能不一樣,可是廣義上使得線程阻塞的方式有下面幾種
使線程休眠,會將運行中的線程進入阻塞狀態。當休眠時間結束後,從新爭搶cpu的時間片繼續運行
// 方法的定義 native方法 public static native void sleep(long millis) throws InterruptedException; try { // 休眠2秒 // 該方法會拋出 InterruptedException異常 即休眠過程當中可被中斷,被中斷後拋出異常 Thread.sleep(2000); } catch (InterruptedException異常 e) { } try { // 使用TimeUnit的api可替代 Thread.sleep TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { }
join是指調用該方法的線程進入阻塞狀態,等待某線程執行完成後恢復運行
// 方法的定義 有重載 // 等待線程執行完才恢復運行 public final void join() throws InterruptedException { } // 指定join的時間。指定時間內 線程還未執行完 調用方線程不繼續等待就恢復運行 public final synchronized void join(long millis) throws InterruptedException{}
Thread t = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } r = 10; }); t.start(); // 讓主線程阻塞 等待t線程執行完才繼續執行 // 去除該行,執行結果爲0,加上該行 執行結果爲10 t.join(); log.info("r:{}", r); // 運行結果 13:09:13.892 [main] INFO thread.TestJoin - r:10
// 相關方法的定義 public void interrupt() { } public boolean isInterrupted() { } public static boolean interrupted() { }
打斷標記:線程是否被打斷,true表示被打斷了,false表示沒有
isInterrupted() 獲取線程的打斷標記 ,調用後不會修改線程的打斷標記
interrupt()方法用於中斷線程
interrupted() 獲取線程的打斷標記,調用後清空打斷標記 即若是獲取爲true 調用後打斷標記爲false (不經常使用)
interrupt實例: 有個後臺監控線程不停的監控,當外界打斷它時,就結束運行。代碼以下
@Slf4j class TwoPhaseTerminal{ // 監控線程 private Thread monitor; public void start(){ monitor = new Thread(() ->{ // 不停的監控 while (true){ Thread thread = Thread.currentThread(); // 判斷當前線程是否被打斷 if (thread.isInterrupted()){ log.info("當前線程被打斷,結束運行"); break; } try { Thread.sleep(1000); // 監控邏輯中被打斷後,打斷標記爲true log.info("監控"); } catch (InterruptedException e) { // 睡眠時被打斷時拋出異常 在該處捕獲到 此時打斷標記仍是false // 在調用一次中斷 使得中斷標記爲true thread.interrupt(); } } }); monitor.start(); } public void stop(){ monitor.interrupt(); } }
上面說了一些基本的api的使用,調用上面的方法後都會使得線程有對應的狀態。
線程的狀態可從 操做系統層面分爲五種狀態 從java api層面分爲六種狀態。
Thread類中的內部枚舉State
public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
Runnable 線程調用了start()方法後進入該狀態,該狀態包含了三種狀況
六種線程狀態和方法的對應關係
主要總結Thread類中的核心方法
方法名稱 | 是否static | 方法說明 |
---|---|---|
start() | 否 | 讓線程啓動,進入就緒狀態,等待cpu分配時間片 |
run() | 否 | 重寫Runnable接口的方法,線程獲取到cpu時間片時執行的具體邏輯 |
yield() | 是 | 線程的禮讓,使得獲取到cpu時間片的線程進入就緒狀態,從新爭搶時間片 |
sleep(time) | 是 | 線程休眠固定時間,進入阻塞狀態,休眠時間完成後從新爭搶時間片,休眠可被打斷 |
join()/join(time) | 否 | 調用線程對象的join方法,調用者線程進入阻塞,等待線程對象執行完或者到達指定時間才恢復,從新爭搶時間片 |
isInterrupted() | 否 | 獲取線程的打斷標記,true:被打斷,false:沒有被打斷。調用後不會修改打斷標記 |
interrupt() | 否 | 打斷線程,拋出InterruptedException異常的方法都可被打斷,可是打斷後不會修改打斷標記,正常執行的線程被打斷後會修改打斷標記 |
interrupted() | 否 | 獲取線程的打斷標記。調用後會清空打斷標記 |
stop() | 否 | 中止線程運行 不推薦 |
suspend() | 否 | 掛起線程 不推薦 |
resume() | 否 | 恢復線程運行 不推薦 |
currentThread() | 是 | 獲取當前線程 |
Object中與線程相關方法
方法名稱 | 方法說明 |
---|---|
wait()/wait(long timeout) | 獲取到鎖的線程進入阻塞狀態 |
notify() | 隨機喚醒被wait()的一個線程 |
notifyAll(); | 喚醒被wait()的全部線程,從新爭搶時間片 |
問題有可能出如今多個線程訪問共享資源
臨界區: 一段代碼若是對共享資源的多線程讀寫操做,這段代碼就被稱爲臨界區。
注意的是 指令交錯指的是 java代碼在解析成字節碼文件時,java代碼的一行代碼在字節碼中可能有多行,在線程上下文切換時就有可能交錯。
線程安全指的是多線程調用同一個對象的臨界區的方法時,對象的屬性值必定不會發生錯誤,這就是保證了線程安全。
以下面不安全的代碼
// 對象的成員變量 private static int count = 0; public static void main(String[] args) throws InterruptedException { // t1線程對變量+5000次 Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count++; } }); // t2線程對變量-5000次 Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count--; } }); t1.start(); t2.start(); // 讓t1 t2都執行完 t1.join(); t2.join(); System.out.println(count); } // 運行結果 -1399
上面的代碼 兩個線程,一個+5000次,一個-5000次,若是線程安全,count的值應該仍是0。
可是運行不少次,每次的結果不一樣,且都不是0,因此是線程不安全的。
線程安全的類必定全部的操做都線程安全嗎?
開發中常常會說到一些線程安全的類,如ConcurrentHashMap,線程安全指的是類裏每個獨立的方法是線程安全的,可是方法的組合就不必定是線程安全的。
成員變量和靜態變量是否線程安全?
若是存在多線程共享
局部變量是否線程安全?
局部變量引用的對象未必是線程安全的
同步鎖也叫對象鎖,是鎖在對象上的,不一樣的對象就是不一樣的鎖。
該關鍵字是用於保證線程安全的,是阻塞式的解決方案。
讓同一個時刻最多隻有一個線程能持有對象鎖,其餘線程在想獲取這個對象鎖就會被阻塞,不用擔憂上下文切換的問題。
注意: 不要理解爲一個線程加了鎖 ,進入 synchronized代碼塊中就會一直執行下去。若是時間片切換了,也會執行其餘線程,再切換回來會緊接着執行,只是不會執行到有競爭鎖的資源,由於當前線程還未釋放鎖。
當一個線程執行完synchronized的代碼塊後 會喚醒正在等待的線程
synchronized實際上使用對象鎖保證臨界區的原子性 臨界區的代碼是不可分割的 不會由於線程切換所打斷
基本使用
// 加在方法上 實際是對this對象加鎖 private synchronized void a() { } // 同步代碼塊,鎖對象能夠是任意的,加在this上 和a()方法做用相同 private void b(){ synchronized (this){ } } // 加在靜態方法上 實際是對類對象加鎖 private synchronized static void c() { } // 同步代碼塊 實際是對類對象加鎖 和c()方法做用相同 private void d(){ synchronized (TestSynchronized.class){ } } // 上述b方法對應的字節碼源碼 其中monitorenter就是加鎖的地方 0 aload_0 1 dup 2 astore_1 3 monitorenter 4 aload_1 5 monitorexit 6 goto 14 (+8) 9 astore_2 10 aload_1 11 monitorexit 12 aload_2 13 athrow 14 return
線程安全的代碼
private static int count = 0; private static Object lock = new Object(); private static Object lock2 = new Object(); // t1線程和t2對象都是對同一對象加鎖。保證了線程安全。此段代碼不管執行多少次,結果都是0 public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count++; } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count--; } } }); t1.start(); t2.start(); // 讓t1 t2都執行完 t1.join(); t2.join(); System.out.println(count); }
重點:加鎖是加在對象上,必定要保證是同一對象,加鎖才能生效
線程間通訊能夠經過共享變量+wait()¬ify()來實現
wait()將線程進入阻塞狀態,notify()將線程喚醒
當多線程競爭訪問對象的同步方法時,鎖對象會關聯一個底層的Monitor對象(重量級鎖的實現)
以下圖所示 Thread0,1先競爭到鎖執行了代碼後,2,3,4,5線程同時來執行臨界區的代碼,開始競爭鎖
注意:
static final Object lock = new Object(); new Thread(() -> { synchronized (lock) { log.info("開始執行"); try { // 同步代碼內部才能調用 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("繼續執行核心邏輯"); } }, "t1").start(); new Thread(() -> { synchronized (lock) { log.info("開始執行"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("繼續執行核心邏輯"); } }, "t2").start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("開始喚醒"); synchronized (lock) { // 同步代碼內部才能調用 lock.notifyAll(); } // 執行結果 14:29:47.138 [t1] INFO TestWaitNotify - 開始執行 14:29:47.141 [t2] INFO TestWaitNotify - 開始執行 14:29:49.136 [main] INFO TestWaitNotify - 開始喚醒 14:29:49.136 [t2] INFO TestWaitNotify - 繼續執行核心邏輯 14:29:49.136 [t1] INFO TestWaitNotify - 繼續執行核心邏輯
wait 和 sleep的區別?
兩者都會讓線程進入阻塞狀態,有如下區別
LockSupport是juc下的工具類,提供了park和unpark方法,能夠實現線程通訊
與wait和notity相比的不一樣點
指的是有生產者來生產數據,消費者來消費數據,生產者生產滿了就不生產了,通知消費者取,等消費了再進行生產。
消費者消費不到了就不消費了,通知生產者生產,生產到了再繼續消費。
public static void main(String[] args) throws InterruptedException { MessageQueue queue = new MessageQueue(2); // 三個生產者向隊列裏存值 for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, "值" + id)); }, "生產者" + i).start(); } Thread.sleep(1000); // 一個消費者不停的從隊列裏取值 new Thread(() -> { while (true) { queue.take(); } }, "消費者").start(); } } // 消息隊列被生產者和消費者持有 class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); // 容量 private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; } /** * 生產 */ public void put(Message message) { synchronized (list) { while (list.size() == capacity) { log.info("隊列已滿,生產者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); log.info("生產消息:{}", message); // 生產後通知消費者 list.notifyAll(); } } public Message take() { synchronized (list) { while (list.isEmpty()) { log.info("隊列已空,消費者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.info("消費消息:{}", message); // 消費後通知生產者 list.notifyAll(); return message; } } } // 消息 class Message { private int id; private Object value; }
爲了更形象的表達加同步鎖的概念,這裏舉一個生活中的例子,儘可能把以上的概念具體化出來。
這裏舉一個每一個人很是感興趣的一件東西。 錢!!!(馬老師除外)。
現實中,咱們去銀行門口的自動取款機取錢,取款機的錢就是共享變量,爲了保障安全,不可能兩個陌生人同時進入同一個取款機內取錢,因此只能一我的進入取錢,而後鎖上取款機的門,其餘人只能在取款機門口等待。
取款機有多個,裏面的錢互不影響,鎖也有多個(多個對象鎖),取錢人在多個取款機裏同時取錢也沒有安全問題。
假如每一個取錢的陌生人都是線程,當取錢人進入取款機鎖了門後(線程得到鎖),取到錢後出門(線程釋放鎖),下一我的競爭到鎖來取錢。
假設工做人員也是一個線程,若是取錢人進入後發現取款機錢不足了,這時通知工做人員來向取款機里加錢(調用notifyAll方法),取錢人暫停取錢,進入銀行大堂阻塞等待(調用wait方法)。
銀行大堂裏的工做人員和取錢人都被喚醒,從新競爭鎖,進入後若是是取錢人,因爲取款機沒錢,還得進入銀行大堂等待。
當工做人員得到取款機的鎖進入後,加了錢後會通知大廳裏的人來取錢(調用notifyAll方法)。本身暫停加錢,進入銀行大堂等待喚醒加錢(調用wait方法)。
這時大堂裏等待的人都來競爭鎖,誰獲取到誰進入繼續取錢。
和現實中不一樣的就是這裏沒有排隊的概念,誰搶到鎖誰進去取。
可重入鎖 : 一個線程獲取到對象的鎖後,執行方法內部在須要獲取鎖的時候是能夠獲取到的。如如下代碼
private static final ReentrantLock LOCK = new ReentrantLock(); private static void m() { LOCK.lock(); try { log.info("begin"); // 調用m1() m1(); } finally { // 注意鎖的釋放 LOCK.unlock(); } } public static void m1() { LOCK.lock(); try { log.info("m1"); m2(); } finally { // 注意鎖的釋放 LOCK.unlock(); } }
synchronized 也是可重入鎖,ReentrantLock有如下優勢
api
// 默認非公平鎖,參數傳true 表示未公平鎖 ReentrantLock lock = new ReentrantLock(false); // 嘗試獲取鎖 lock() // 釋放鎖 應放在finally塊中 必須執行到 unlock() try { // 獲取鎖時可被打斷,阻塞中的線程可被打斷 LOCK.lockInterruptibly(); } catch (InterruptedException e) { return; } // 嘗試獲取鎖 獲取不到就返回false LOCK.tryLock() // 支持超時時間 一段時間沒獲取到就返回false tryLock(long timeout, TimeUnit unit) // 指定條件變量 休息室 一個鎖能夠建立多個休息室 Condition waitSet = ROOM.newCondition(); // 釋放鎖 進入waitSet等待 釋放後其餘線程能夠搶鎖 yanWaitSet.await() // 喚醒具體休息室的線程 喚醒後 重寫競爭鎖 yanWaitSet.signal()
實例:一個線程輸出a,一個線程輸出b,一個線程輸出c,abc按照順序輸出,連續輸出5次
這個考的就是線程的通訊,利用 wait()/notify()和控制變量能夠實現,此處使用ReentrantLock便可實現該功能。
public static void main(String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5); // 構建三個條件變量 Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); // 開啓三個線程 new Thread(() -> { awaitSignal.print("a", a, b); }).start(); new Thread(() -> { awaitSignal.print("b", b, c); }).start(); new Thread(() -> { awaitSignal.print("c", c, a); }).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } awaitSignal.lock(); try { // 先喚醒a a.signal(); } finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { // 循環次數 private int loopNumber; public AwaitSignal(int loopNumber) { this.loopNumber = loopNumber; } /** * @param print 輸出的字符 * @param current 當前條件變量 * @param next 下一個條件變量 */ public void print(String print, Condition current, Condition next) { for (int i = 0; i < loopNumber; i++) { lock(); try { try { // 獲取鎖以後等待 current.await(); System.out.print(print); } catch (InterruptedException e) { } next.signal(); } finally { unlock(); } } }
說到死鎖,先舉個例子,
下面是代碼實現
static Beer beer = new Beer(); static Story story = new Story(); public static void main(String[] args) { new Thread(() ->{ synchronized (beer){ log.info("我有酒,給我故事"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (story){ log.info("小王開始喝酒講故事"); } } },"小王").start(); new Thread(() ->{ synchronized (story){ log.info("我有故事,給我酒"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (beer){ log.info("老王開始喝酒講故事"); } } },"老王").start(); } class Beer { } class Story{ }
死鎖致使程序沒法正常運行下去
檢測工具能夠檢查到死鎖信息
jmm 體如今如下三個方面
停不下來的程序
static boolean run = true; public static void main(String[] args) throws InterruptedException { Thread t = new Thread(() -> { while (run) { // .... } }); t.start(); Thread.sleep(1000); // 線程t不會如預想的停下來 run = false; }
如上圖所示,線程有本身的工做緩存,當主線程修改了變量並同步到主內存時,t線程沒有讀取到,因此程序停不下來
JVM在不影響程序正確性的狀況下可能會調整語句的執行順序,該狀況也稱爲 指令重排序
static int i; static int j; // 在某個線程內執行以下賦值操做 i = ...; j = ...; 有可能將j先賦值
原子性你們應該比較熟悉,上述同步鎖的synchronized代碼塊就是保證了原子性,就是一段代碼是一個總體,原子性保證了線程安全,不會受到上下文切換的影響。
該關鍵字解決了可見性和有序性,volatile經過內存屏障來實現的
會在對象寫操做以後加寫屏障,會對寫屏障的以前的數據都同步到主存,而且保證寫屏障的執行順序在寫屏障以前
會在對象讀操做以前加讀屏障,會在讀屏障以後的語句都從主存讀,並保證讀屏障以後的代碼執行在讀屏障以後
注意: volatile不能解決原子性,即不能經過該關鍵字實現線程安全。
volatile應用場景:一個線程讀取變量,另外的線程操做變量,加了該關鍵字後保證寫變量後,讀變量的線程能夠及時感知。
cas (compare and swap) 比較並交換
爲變量賦值時,從內存中讀取到的值v,獲取到要交換的新值n,執行 compareAndSwap()方法時,比較v和當前內存中的值是否一致,若是一致則將n和v交換,若是不一致,則自旋重試。
cas底層是cpu層面的,即不使用同步鎖也能夠保證操做的原子性。
private AtomicInteger balance; // 模擬cas的具體操做 @Override public void withdraw(Integer amount) { while (true) { // 獲取當前值 int pre = balance.get(); // 進行操做後獲得新值 int next = pre - amount; // 比較並設置成功 則中斷 不然自旋重試 if (balance.compareAndSet(pre, next)) { break; } } }
無鎖的效率是要高於以前的鎖的,因爲無鎖不會涉及線程的上下文切換
cas是樂觀鎖的思想,sychronized是悲觀鎖的思想
cas適合不多有線程競爭的場景,若是競爭很強,重試常常發生,反而下降效率
juc併發包下包含了實現了cas的原子類
經常使用api
new AtomicInteger(balance) get() compareAndSet(pre, next) // i.incrementAndGet() ++i // i.decrementAndGet() --i // i.getAndIncrement() i++ // i.getAndDecrement() ++i i.addAndGet() // 傳入函數式接口 修改i int getAndUpdate(IntUnaryOperator updateFunction) // cas 的核心方法 compareAndSet(int expect, int update)
cas存在ABA問題,即比較並交換時,若是原值爲A,有其餘線程將其修改成B,在有其餘線程將其修改成A。
此時實際發生過交換,可是比較和交換因爲值沒改變能夠交換成功
解決方式
AtomicStampedReference/AtomicMarkableReference
上面兩個類解決ABA問題,原理就是爲對象增長版本號,每次修改時增長版本號,就能夠避免ABA問題
或者增長個布爾變量標識,修改後調整布爾變量值,也能夠避免ABA問題
線程池是java併發最重要的一個知識點,也是難點,是實際應用最普遍的。
線程的資源很寶貴,不可能無限的建立,必需要有管理線程的工具,線程池就是一種管理線程的工具,java開發中常常有池化的思想,如 數據庫鏈接池、Redis鏈接池等。
預先建立好一些線程,任務提交時直接執行,既能夠節約建立線程的時間,又能夠控制線程的數量。
線程池的好處
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
構造器參數的意義
參數名 | 參數意義 |
---|---|
corePoolSize | 核心線程數 |
maximumPoolSize | 最大線程數 |
keepAliveTime | 救急線程的空閒時間 |
unit | 救急線程的空閒時間單位 |
workQueue | 阻塞隊列 |
threadFactory | 建立線程的工廠,主要定義線程名 |
handler | 拒絕策略 |
下面 咱們經過一個實例來理解線程池的參數以及線程池的接收任務的過程
如上圖 銀行辦理業務。
線程池經過一個int變量的高3位來表示線程池的狀態,低29位來存儲線程池的數量
狀態名稱 | 高三位 | 接收新任務 | 處理阻塞隊列任務 | 說明 |
---|---|---|---|---|
Running | 111 | Y | Y | 正常接收任務,正常處理任務 |
Shutdown | 000 | N | Y | 不會接收任務,會執行完正在執行的任務,也會處理阻塞隊列裏的任務 |
stop | 001 | N | N | 不會接收任務,會中斷正在執行的任務,會放棄處理阻塞隊列裏的任務 |
Tidying | 010 | N | N | 任務所有執行完畢,當前活動線程是0,即將進入終結 |
Termitted | 011 | N | N | 終結狀態 |
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
線程池建立、接收任務、執行任務、回收線程的步驟
注意: 不是剛建立的線程是核心線程,後面建立的線程是非核心線程,線程是沒有核心非核心的概念的,這是我長期以來的誤解。
拒絕策略
提交任務的方法
// 執行Runnable public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } // 提交Callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 內部構建FutureTask RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 內部構建FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 內部構建FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
注意: 下面幾種方式都不推薦使用
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
shutdown()
會讓線程池狀態爲shutdown,不能接收任務,可是會將工做線程和阻塞隊列裏的任務執行完 至關於優雅關閉
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow()
會讓線程池狀態爲stop, 不能接收任務,會當即中斷執行中的工做線程,而且不會執行阻塞隊列裏的任務, 會返回阻塞隊列的任務列表
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
線程池難就難在參數的配置,有一套理論配置參數
cpu密集型 : 指的是程序主要發生cpu的運算
核心線程數: CPU核心數+1
IO密集型: 遠程調用RPC,操做數據庫等,不須要使用cpu進行大量的運算。 大多數應用的場景
核心線程數=核數cpu指望利用率 總時間/cpu運算時間
可是基於以上理論仍是很難去配置,由於cpu運算時間很差估算
實際配置大小可參考下表
cpu密集型 | io密集型 | |
---|---|---|
線程數數量 | 核數<=x<=核數*2 | 核心數50<=x<=核心數 100 |
隊列長度 | y>=100 | 1<=y<=10 |
1.線程池參數經過分佈式配置,修改配置無需重啓應用
線程池參數是根據線上的請求數變化而變化的,最好的方式是 核心線程數、最大線程數 隊列大小都是可配置的
主要配置 corePoolSize maxPoolSize queueSize
java提供了可方法覆蓋參數,線程池內部會處理好參數 進行平滑的修改
public void setCorePoolSize(int corePoolSize) { }
2.增長線程池的監控
3.io密集型可調整爲先新增任務到最大線程數後再將任務放到阻塞隊列
代碼 主要可重寫阻塞隊列 加入任務的方法
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } final ReentrantLock lock = this.lock; lock.lock(); try { int currentPoolThreadSize = executor.getPoolSize(); // 若是提交任務數小於當前建立的線程數, 說明還有空閒線程, if (executor.getTaskCount() < currentPoolThreadSize) { // 將任務放入隊列中,讓線程去處理任務 return super.offer(runnable); } // 核心改動 // 若是當前線程數小於最大線程數,則返回 false ,讓線程池去建立新的線程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 不然,就將任務放入隊列中 return super.offer(runnable); } finally { lock.unlock(); } }
3.拒絕策略 建議使用tomcat的拒絕策略(給一次機會)
// tomcat的源碼 @Override public void execute(Runnable command) { if ( executor != null ) { try { executor.execute(command); } catch (RejectedExecutionException rx) { // 捕獲到異常後 在從隊列獲取,至關於重試1取不到任務 在執行拒絕任務 if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full."); } } else throw new IllegalStateException("StandardThreadPool not started."); }
建議修改從隊列取任務的方式: 增長超時時間,超時1分鐘取不到在進行返回
public boolean offer(E e, long timeout, TimeUnit unit){}
文章篇幅較長,給看到這裏的小夥伴點個大大的贊!因爲做者水平有限,加之第一次寫博客,文章中不免會有錯誤之處,歡迎小夥伴們反饋指正。
若是以爲文章對你有幫助,麻煩 點贊、評論、轉發、在看 走起