Package java.util.concurrent
---> AtomicInteger
Lock
ReadWriteLock
java
保證可見性、不保證原子性、禁止指令重排程序員
保證可見性算法
當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看到修改的值小程序
當不添加volatile關鍵字時示例:api
package com.jian8.juc;
import java.util.concurrent.TimeUnit;
/** * 1驗證volatile的可見性 * 1.1 若是int num = 0,number變量沒有添加volatile關鍵字修飾 * 1.2 添加了volatile,能夠解決可見性 */
public class VolatileDemo {
public static void main(String[] args) {
visibilityByVolatile();//驗證volatile的可見性
}
/** * volatile能夠保證可見性,及時通知其餘線程,主物理內存的值已經被修改 */
public static void visibilityByVolatile() {
MyData myData = new MyData();
//第一個線程
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
try {
//線程暫停3s
TimeUnit.SECONDS.sleep(3);
myData.addToSixty();
System.out.println(Thread.currentThread().getName() + "\t update value:" + myData.num);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, "thread1").start();
//第二個線程是main線程
while (myData.num == 0) {
//若是myData的num一直爲零,main線程一直在這裏循環
}
System.out.println(Thread.currentThread().getName() + "\t mission is over, num value is " + myData.num);
}
}
class MyData {
// int num = 0;
volatile int num = 0;
public void addToSixty() {
this.num = 60;
}
}
複製代碼
輸出結果:數組
thread1 come in
thread1 update value:60
//線程進入死循環
複製代碼
當咱們加上volatile
關鍵字後,volatile int num = 0;
輸出結果爲:緩存
thread1 come in
thread1 update value:60
main mission is over, num value is 60
//程序沒有死循環,結束執行
複製代碼
不保證原子性安全
原子性:不可分割、完整性,即某個線程正在作某個具體業務時,中間不能夠被加塞或者被分割,須要總體完整,要麼同時成功,要麼同時失敗bash
驗證示例(變量添加volatile關鍵字,方法不添加synchronized):服務器
package com.jian8.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** * 1驗證volatile的可見性 * 1.1 若是int num = 0,number變量沒有添加volatile關鍵字修飾 * 1.2 添加了volatile,能夠解決可見性 * * 2.驗證volatile不保證原子性 * 2.1 原子性指的是什麼 * 不可分割、完整性,即某個線程正在作某個具體業務時,中間不能夠被加塞或者被分割,須要總體完整,要麼同時成功,要麼同時失敗 */
public class VolatileDemo {
public static void main(String[] args) {
// visibilityByVolatile();//驗證volatile的可見性
atomicByVolatile();//驗證volatile不保證原子性
}
/** * volatile能夠保證可見性,及時通知其餘線程,主物理內存的值已經被修改 */
//public static void visibilityByVolatile(){}
/** * volatile不保證原子性 * 以及使用Atomic保證原子性 */
public static void atomicByVolatile(){
MyData myData = new MyData();
for(int i = 1; i <= 20; i++){
new Thread(() ->{
for(int j = 1; j <= 1000; j++){
myData.addSelf();
myData.atomicAddSelf();
}
},"Thread "+i).start();
}
//等待上面的線程都計算完成後,再用main線程取得最終結果值
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"\t finally num value is "+myData.num);
System.out.println(Thread.currentThread().getName()+"\t finally atomicnum value is "+myData.atomicInteger);
}
}
class MyData {
// int num = 0;
volatile int num = 0;
public void addToSixty() {
this.num = 60;
}
public void addSelf(){
num++;
}
AtomicInteger atomicInteger = new AtomicInteger();
public void atomicAddSelf(){
atomicInteger.getAndIncrement();
}
}
複製代碼
執行三次結果爲:
//1.
main finally num value is 19580
main finally atomicnum value is 20000
//2.
main finally num value is 19999
main finally atomicnum value is 20000
//3.
main finally num value is 18375
main finally atomicnum value is 20000
//num並無達到20000
複製代碼
禁止指令重排
有序性:在計算機執行程序時,爲了提升性能,編譯器和處理器經常會對指令作重排,通常分如下三種
graph LR
源代碼 --> id1["編譯器優化的重排"]
id1 --> id2[指令並行的重排]
id2 --> id3[內存系統的重排]
id3 --> 最終執行的指令
style id1 fill:#ff8000;
style id2 fill:#fab400;
style id3 fill:#ffd557;
複製代碼
單線程環境裏面確保程序最終執行結果和代碼順序執行的結果一致。
處理器在進行重排順序是必需要考慮指令之間的數據依賴性
多線程環境中線程交替執行,因爲編譯器優化重排的存在,兩個線程中使用的變量可否保證一致性時沒法肯定的,結果沒法預測
重排代碼實例:
聲明變量:int a,b,x,y=0
線程1 | 線程2 |
---|---|
x = a; | y = b; |
b = 1; | a = 2; |
結 果 | x = 0 y=0 |
若是編譯器對這段程序代碼執行重排優化後,可能出現以下狀況:
線程1 | 線程2 |
---|---|
b = 1; | a = 2; |
x= a; | y = b; |
結 果 | x = 2 y=1 |
這個結果說明在多線程環境下,因爲編譯器優化重排的存在,兩個線程中使用的變量可否保證一致性是沒法肯定的
volatile實現禁止指令重排,從而避免了多線程環境下程序出現亂序執行的現象
內存屏障(Memory Barrier)又稱內存柵欄,是一個CPU指令,他的做用有兩個:
因爲編譯器和處理器都能執行指令重排優化。若是在指令前插入一條Memory Barrier,則會告訴編譯器和CPU,無論什麼指令都不能和這條Memory Barrier指令重排順序,也就是說經過插入內存屏障禁止在內存屏障先後的指令執行重排序優化。內存屏障另一個做用是強制刷出各類CPU的緩存數據,所以任何CPU上的線程都能讀取到這些數據的最新版本。
graph TB
subgraph
bbbb["對Volatile變量進行讀操做時,<br>回在讀操做以前加入一條load屏障指令,<br>從內存中讀取共享變量"]
ids6[Volatile]-->red3[LoadLoad屏障]
red3-->id7["禁止下邊全部普通讀操做<br>和上面的volatile讀重排序"]
red3-->red4[LoadStore屏障]
red4-->id9["禁止下邊全部普通寫操做<br>和上面的volatile讀重排序"]
red4-->id8[普通讀]
id8-->普通寫
end
subgraph
aaaa["對Volatile變量進行寫操做時,<br>回在寫操做後加入一條store屏障指令,<br>將工做內存中的共享變量值刷新回到主內存"]
id1[普通讀]-->id2[普通寫]
id2-->red1[StoreStore屏障]
red1-->id3["禁止上面的普通寫和<br>下面的volatile寫重排序"]
red1-->id4["Volatile寫"]
id4-->red2[StoreLoad屏障]
red2-->id5["防止上面的volatile寫和<br>下面可能有的volatile讀寫重排序"]
end
style red1 fill:#ff0000;
style red2 fill:#ff0000;
style red4 fill:#ff0000;
style red3 fill:#ff0000;
style aaaa fill:#ffff00;
style bbbb fill:#ffff00;
複製代碼
JMM(Java Memory Model)自己是一種抽象的概念,並不真實存在,他描述的時一組規則或規範,經過這組規範定義了程序中各個變量(包括實例字段,靜態字段和構成數組對象的元素)的訪問方式。
JMM關於同步的規定:
因爲JVM運行程序的實體是線程,而每一個線程建立時JVM都會爲其建立一個工做內存(有的成爲棧空間),工做內存是每一個線程的私有數據區域,而java內存模型中規定全部變量都存儲在主內存,主內存是共享內存區域,全部線程均可以訪問,但線程對變量的操做(讀取賦值等)必須在工做內存中進行,首先概要將變量從主內存拷貝到本身的工做內存空間,而後對變量進行操做,操做完成後再將變量寫回主內存,不能直接操做主內存中的變量,各個線程中的工做內存中存儲着主內存的變量副本拷貝,所以不一樣的線程件沒法訪問對方的工做內存,線程間的通訊(傳值)必須經過主內存來完成,期間要訪問過程以下圖:
當普通單例模式在多線程狀況下:
public class SingletonDemo {
private static SingletonDemo instance = null;
private SingletonDemo() {
System.out.println(Thread.currentThread().getName() + "\t 構造方法SingletonDemo()");
}
public static SingletonDemo getInstance() {
if (instance == null) {
instance = new SingletonDemo();
}
return instance;
}
public static void main(String[] args) {
//構造方法只會被執行一次
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
//併發多線程後,構造方法會在一些狀況下執行屢次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
SingletonDemo.getInstance();
}, "Thread " + i).start();
}
}
}
複製代碼
其構造方法在一些狀況下會被執行屢次
解決方式:
單例模式DCL代碼
DCL (Double Check Lock雙端檢鎖機制)在加鎖前和加鎖後都進行一次判斷
public static SingletonDemo getInstance() {
if (instance == null) {
synchronized (SingletonDemo.class) {
if (instance == null) {
instance = new SingletonDemo();
}
}
}
return instance;
}
複製代碼
大部分運行結果構造方法只會被執行一次,但指令重排機制會讓程序很小的概率出現構造方法被執行屢次
DCL(雙端檢鎖)機制不必定線程安全,緣由時有指令重排的存在,加入volatile能夠禁止指令重排
緣由是在某一個線程執行到第一次檢測,讀取到instance不爲null時,instance的引用對象可能沒有完成初始化。instance=new SingleDemo();能夠被分爲一下三步(僞代碼):
memory = allocate();//1.分配對象內存空間
instance(memory); //2.初始化對象
instance = memory; //3.設置instance執行剛分配的內存地址,此時instance!=null
複製代碼
步驟2和步驟3不存在數據依賴關係,並且不管重排前仍是重排後程序的執行結果在單線程中並無改變,所以這種重排優化時容許的,若是3步驟提早於步驟2,可是instance尚未初始化完成
可是指令重排只會保證串行語義的執行的一致性(單線程),但並不關心多線程間的語義一致性。
因此當一條線程訪問instance不爲null時,因爲instance示例未必已初始化完成,也就形成了線程安全問題。
單例模式volatile代碼
爲解決以上問題,能夠將SingletongDemo實例上加上volatile
private static volatile SingletonDemo instance = null;
複製代碼
AtomicInteger.conpareAndSet(int expect, indt update)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
複製代碼
第一個參數爲拿到的指望值,若是指望值和內存中變量的值一致,進行update賦值,若是指望值和內存中變量的值不一致,證實數據被修改過,返回fasle,取消賦值
例子:
package com.jian8.juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
/** * 1.CAS是什麼? * 1.1比較並交換 */
public class CASDemo {
public static void main(String[] args) {
checkCAS();
}
public static void checkCAS(){
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t current data is " + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 2014) + "\t current data is " + atomicInteger.get());
}
}
複製代碼
輸出結果爲:
true current data is 2019
false current data is 2019
複製代碼
比較當前工做內存中的值和主內存中的值,若是相同則執行規定操做,不然繼續比較知道主內存和工做內存中的值一直爲止
atomicInteger.getAndIncrement();
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
複製代碼
Unsafe
是CAS核心類,因爲Java方法沒法直接訪問地層系統,須要經過本地(native)方法來訪問,Unsafe至關於一個後門,基於該類能夠直接操做特定內存數據。Unsafe類存在於sun.misc
包中,其內部方法操做能夠像C的指針同樣直接操做內存,由於Java中CAS操做的執行依賴於Unsafe類的方法。
Unsafe類中的全部方法都是native修飾的,也就是說Unsafe類中的方法都直接調用操做系統底層資源執行相應任務
變量valueOffset,表示該變量值在內存中的偏移地址,由於Unsafe就是根據內存偏移地址獲取數據的
變量value用volatile修飾,保證多線程之間的可見性
CAS是什麼
CAS全稱呼Compare-And-Swap,它是一條CPU併發原語
他的功能是判斷內存某個位置的值是否爲預期值,若是是則更改成新的值,這個過程是原子的。
CAS併發原語體如今JAVA語言中就是sun.misc.Unsafe類中各個方法。調用Unsafe類中的CAS方法,JVM會幫咱們實現CAS彙編指令。這是一種徹底依賴於硬件的功能,經過他實現了原子操做。因爲CAS是一種系統原語,原語屬於操做系統用語範疇,是由若干條指令組成的,用於完成某個功能的一個過程,而且原語的執行必須是連續的,在執行過程當中不容許被中斷,也就是說CAS是一條CPU的原子指令,不會形成數據不一致問題。
//unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
var1 AtomicInteger對象自己
var2 該對象的引用地址
var4 須要變更的數據
var5 經過var1 var2找出的主內存中真實的值
用該對象前的值與var5比較;
若是相同,更新var5+var4而且返回true,
若是不一樣,繼續去之而後再比較,直到更新完成
循環時間長,開銷大
例如getAndAddInt方法執行,有個do while循環,若是CAS失敗,一直會進行嘗試,若是CAS長時間不成功,可能會給CPU帶來很大的開銷
只能保證一個共享變量的原子操做
對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖來保證原子性
ABA問題
CAS算法實現一個重要前提須要去取內存中某個時刻的數據並在當下時刻比較並替換,那麼在這個時間差類會致使數據的變化。
好比線程1從內存位置V取出A,線程2同時也從內存取出A,而且線程2進行一些操做將值改成B,而後線程2又將V位置數據改爲A,這時候線程1進行CAS操做發現內存中的值依然時A,而後線程1操做成功。
儘管線程1的CAS操做成功,可是不表明這個過程沒有問題
示例代碼:
package juc.cas;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicRefrenceDemo {
public static void main(String[] args) {
User z3 = new User("張三", 22);
User l4 = new User("李四", 23);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
}
}
@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}
複製代碼
輸出結果
true User(userName=李四, age=23) false User(userName=李四, age=23) 複製代碼
新增機制,修改版本號
package com.jian8.juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/** * ABA問題解決 * AtomicStampedReference */
public class ABADemo {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
System.out.println("=====如下時ABA問題的產生=====");
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "Thread 1").start();
new Thread(() -> {
try {
//保證線程1完成一次ABA操做
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
}, "Thread 2").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=====如下時ABA問題的解決=====");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本號" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第2次版本號" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第3次版本號" + atomicStampedReference.getStamp());
}, "Thread 3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本號" + stamp);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t修改是否成功" + result + "\t當前最新實際版本號:" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t當前最新實際值:" + atomicStampedReference.getReference());
}, "Thread 4").start();
}
}
複製代碼
輸出結果:
=====如下時ABA問題的產生=====
true 2019
=====如下時ABA問題的解決=====
Thread 3 第1次版本號1
Thread 4 第1次版本號1
Thread 3 第2次版本號2
Thread 3 第3次版本號3
Thread 4 修改是否成功false 當前最新實際版本號:3
Thread 4 當前最新實際值:100
複製代碼
HashSet與ArrayList一致 HashMap
HashSet底層是一個HashMap,存儲的值放在HashMap的key裏,value存儲了一個PRESENT的靜態Object對象
package com.jian8.juc.collection;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/** * 集合類不安全問題 * ArrayList */
public class ContainerNotSafeDemo {
public static void main(String[] args) {
notSafe();
}
/** * 故障現象 * java.util.ConcurrentModificationException */
public static void notSafe() {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, "Thread " + i).start();
}
}
}
複製代碼
報錯:
Exception in thread "Thread 10" java.util.ConcurrentModificationException
複製代碼
併發正常修改致使
一我的正在寫入,另外一個同窗來搶奪,致使數據不一致,併發修改異常
List<String> list = new Vector<>();//Vector線程安全
List<String> list = Collections.synchronizedList(new ArrayList<>());//使用輔助類
List<String> list = new CopyOnWriteArrayList<>();//寫時複製,讀寫分離
Map<String, String> map = new ConcurrentHashMap<>();
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
複製代碼
CopyOnWriteArrayList.add方法:
CopyOnWrite容器即寫時複製,往一個元素添加容器的時候,不直接往當前容器Object[]添加,而是先將當前容器Object[]進行copy,複製出一個新的容器Object[] newElements,而後向新的容器添加元素,添加完元素以後,再將原容器的引用指向新的容器setArray(newElements),這樣作能夠對CopyOnWrite容器進行併發的讀,而不須要加鎖,由於當前容器不會添加任何元素,因此CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不一樣的容器
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
複製代碼
是什麼
公平鎖就是先來後到、非公平鎖就是容許加塞,Lock lock = new ReentrantLock(Boolean fair);
默認非公平。
公平鎖是指多個線程按照申請鎖的順序來獲取鎖,相似排隊打飯。
非公平鎖是指多個線程獲取鎖的順序並非按照申請鎖的順序,有可能後申請的線程優先獲取鎖,在高併發的狀況下,有可能會形成優先級反轉或者節現象。
二者區別
公平鎖:Threads acquire a fair lock in the order in which they requested it
公平鎖,就是很公平,在併發環境中,每一個線程在獲取鎖時,會先查看此鎖維護的等待隊列,若是爲空,或者當前線程就是等待隊列的第一個,就佔有鎖,不然就會加入到等待隊列中,之後會按照FIFO的規則從隊列中取到本身。
非公平鎖:a nonfair lock permits barging: threads requesting a lock can jump ahead of the queue of waiting threads if the lock happens to be available when it is requested.
非公平鎖比較粗魯,上來就直接嘗試佔有額,若是嘗試失敗,就再採用相似公平鎖那種方式。
other
對Java ReentrantLock而言,經過構造函數指定該鎖是否公平,默認是非公平鎖,非公平鎖的優勢在於吞吐量比公平鎖大
對Synchronized而言,是一種非公平鎖
遞歸鎖是什麼
指的時同一線程外層函數得到鎖以後,內層遞歸函數仍然能獲取該鎖的代碼,在同一個線程在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖,也就是說,線程能夠進入任何一個它已經擁有的鎖所同步着的代碼塊
ReentrantLock/Synchronized 就是一個典型的可重入鎖
可重入鎖最大的做用是避免死鎖
代碼示例
package com.jian8.juc.lock;
####
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread 1").start();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread 2").start();
}
}
class Phone{
public synchronized void sendSMS()throws Exception{
System.out.println(Thread.currentThread().getName()+"\t -----invoked sendSMS()");
Thread.sleep(3000);
sendEmail();
}
public synchronized void sendEmail() throws Exception{
System.out.println(Thread.currentThread().getName()+"\t +++++invoked sendEmail()");
}
}
複製代碼
package com.jian8.juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static void main(String[] args) {
Mobile mobile = new Mobile();
new Thread(mobile).start();
new Thread(mobile).start();
}
}
class Mobile implements Runnable{
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
public void get() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t invoked get()");
set();
}finally {
lock.unlock();
}
}
public void set(){
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"\t invoked set()");
}finally {
lock.unlock();
}
}
}
複製代碼
概念
獨佔鎖:指該鎖一次只能被一個線程所持有,對ReentrantLock和Synchronized而言都是獨佔鎖
共享鎖:只該鎖可被多個線程所持有
ReentrantReadWriteLock其讀鎖是共享鎖,寫鎖是獨佔鎖
互斥鎖:讀鎖的共享鎖能夠保證併發讀是很是高效的,讀寫、寫讀、寫寫的過程是互斥的
代碼示例
package com.jian8.juc.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** * 多個線程同時讀一個資源類沒有任何問題,因此爲了知足併發量,讀取共享資源應該能夠同時進行。 * 可是 * 若是有一個線程象取寫共享資源來,就不該該自由其餘線程能夠對資源進行讀或寫 * 總結 * 讀讀能共存 * 讀寫不能共存 * 寫寫不能共存 */
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, "Thread " + i).start();
}
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, "Thread " + i).start();
}
}
}
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/** * 寫操做:原子+獨佔 * 整個過程必須是一個完整的統一體,中間不準被分割,不準被打斷 * * @param key * @param value */
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在寫入:" + key);
TimeUnit.MILLISECONDS.sleep(300);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t寫入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在讀取:" + key);
TimeUnit.MILLISECONDS.sleep(300);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t讀取完成: " + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
public void clear() {
map.clear();
}
}
複製代碼
spinlock
是指嘗試獲取鎖的線程不會當即阻塞,而是採用循環的方式去嘗試獲取鎖,這樣的好處是減小線程上下文切換的消耗,缺點是循環會消耗CPU
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
手寫自旋鎖:
package com.jian8.juc.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** * 實現自旋鎖 * 自旋鎖好處,循環比較獲取直到成功爲止,沒有相似wait的阻塞 * * 經過CAS操做完成自旋鎖,A線程先進來調用mylock方法本身持有鎖5秒鐘,B隨後進來發現當前有線程持有鎖,不是null,因此只能經過自旋等待,知道A釋放鎖後B隨後搶到 */
public class SpinLockDemo {
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.mylock();
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
e.printStackTrace();
}
spinLockDemo.myUnlock();
}, "Thread 1").start();
try {
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){
e.printStackTrace();
}
new Thread(() -> {
spinLockDemo.mylock();
spinLockDemo.myUnlock();
}, "Thread 2").start();
}
//原子引用線程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void mylock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName()+"\t invoked myunlock()");
}
}
複製代碼
它容許一個或多個線程一直等待,知道其餘線程的操做執行完後再執行。例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部的框架服務以後再執行
CountDownLatch主要有兩個方法,當一個或多個線程調用await()方法時,調用線程會被阻塞。其餘線程調用countDown()方法會將計數器減1,當計數器的值變爲0時,因調用await()方法被阻塞的線程纔會被喚醒,繼續執行
代碼示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// general();
countDownLatchTest();
}
public static void general(){
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t上完自習,離開教室");
}, "Thread-->"+i).start();
}
while (Thread.activeCount()>2){
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
System.out.println(Thread.currentThread().getName()+"\t=====班長最後關門走人");
}
public static void countDownLatchTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t被滅");
countDownLatch.countDown();
}, CountryEnum.forEach_CountryEnum(i).getRetMessage()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t=====秦統一");
}
}
複製代碼
CycliBarrier
可循環(Cyclic)使用的屏障。讓一組線程到達一個屏障(也可叫同步點)時被阻塞,知道最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活,線程進入屏障經過CycliBarrier的await()方法
代碼示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
cyclicBarrierTest();
}
public static void cyclicBarrierTest() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("====召喚神龍=====");
});
for (int i = 1; i <= 7; i++) {
final int tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t收集到第" + tempInt + "顆龍珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "" + i).start();
}
}
}
複製代碼
能夠代替Synchronize和Lock
信號量主要用於兩個目的,一個是用於多個共享資源的互斥做用,另外一個用於併發線程數的控制
代碼示例:
搶車位示例:
package com.jian8.juc.conditionThread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模擬三個停車位
for (int i = 1; i <= 6; i++) {//模擬6部汽車
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t搶到車位");
try {
TimeUnit.SECONDS.sleep(3);//停車3s
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t停車3s後離開車位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "Car " + i).start();
}
}
}
複製代碼
首先是一個隊列,而一個阻塞隊列在數據結構中所起的做用大體以下圖
graph LR
Thread1-- put -->id1["阻塞隊列"]
subgraph BlockingQueue
id1
end
id1-- Take -->Thread2
蛋糕師父--"放(櫃滿阻塞)"-->id2[蛋糕展現櫃]
subgraph 櫃
id2
end
id2--"取(櫃空阻塞)"-->顧客
複製代碼
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素
當阻塞隊列是空時,從隊列中獲取元素的操做會被阻塞
當阻塞隊列是滿時,從隊列中添加元素的操做會被阻塞
試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。
試圖往已滿的阻塞隊列中添加新元素的線程一樣會被阻塞,直到其餘的線程從列中移除一個或者多個元素或者徹底清空隊列後使隊列從新變得空閒起來並後續新增
在多線程領域:所謂阻塞,在某些狀況下會掛起線程,一旦知足條件,被掛起的線程又會自動被喚醒
爲何須要BlockingQueue
好處時咱們不須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都給你一手包辦了
在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須本身控制這些細節,尤爲還要兼顧效率和線程安全,而這回給咱們程序帶來不小的複雜度
方法類型 | 拋出異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take | poll(time,unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
方法類型 | status |
---|---|
拋出異常 | 當阻塞隊列滿時,再往隊列中add會拋IllegalStateException: Queue full 當阻塞隊列空時,在網隊列裏remove會拋 NoSuchElementException |
特殊值 | 插入方法,成功true失敗false 移除方法,成功返回出隊列的元素,隊列裏沒有就返回null |
一直阻塞 | 當阻塞隊列滿時,生產者線程繼續往隊列裏put元素,隊列會一直阻塞線程知道put數據或響應中斷退出 當阻塞隊列空時,消費者線程試圖從隊列take元素,隊列會一直阻塞消費者線程知道隊列可用。 |
超時退出 | 當阻塞隊列滿時,隊列會阻塞生產者線程必定時間,超過限時後生產者線程會退出 |
種類分析
Integer.MAX_VALUE
)阻塞隊列。SychronousQueue
理論:SynchronousQueue沒有容量,與其餘BlockingQueue不一樣,SychronousQueue是一個不存儲元素的BlockingQueue,每個put操做必需要等待一個take操做,不然不能繼續添加元素,反之亦然。
代碼示例
package com.jian8.juc.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/** * ArrayBlockingQueue是一個基於數組結構的有界阻塞隊列,此隊列按FIFO原則對元素進行排序 * LinkedBlockingQueue是一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue * SynchronousQueue是一個不存儲元素的阻塞隊列,滅個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於 * 1.隊列 * 2.阻塞隊列 * 2.1 阻塞隊列有沒有好的一面 * 2.2 不得不阻塞,你如何管理 */
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}
複製代碼
生產者消費者模式
傳統版
package com.jian8.juc.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * 一個初始值爲零的變量,兩個線程對其交替操做,一個加1一個減1,來5輪 * 1. 線程 操做 資源類 * 2. 判斷 幹活 通知 * 3. 防止虛假喚起機制 */
public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorB " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerB " + i).start();
}
}
}
class ShareData {//資源類
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
//1.判斷
while (number != 0) {
//等待不能生產
condition.await();
}
//2.幹活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
//1.判斷
while (number == 0) {
//等待不能消費
condition.await();
}
//2.消費
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
複製代碼
阻塞隊列版
package com.jian8.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生產線程啓動");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消費線程啓動");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("5s後main叫停,線程結束");
try {
myResource.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyResource {
private volatile boolean flag = true;//默認開啓,進行生產+消費
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入隊列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入隊列" + data + "失敗");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t大老闆叫停了,flag=false,生產結束");
}
public void myConsumer() throws Exception {
String result = null;
while (flag) {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t超過2s沒有取到蛋糕,消費退出");
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t消費隊列" + result + "成功");
}
}
public void stop() throws Exception {
flag = false;
}
}
複製代碼
線程池
消息中間件
區別
原始構成
synchronized時關鍵字屬於jvm
monitorenter,底層是經過monitor對象來完成,其實wait/notify等方法也依賴於monitor對象只有在同步或方法中才能調wait/notify等方法
monitorexit
Lock是具體類,是api層面的鎖(java.util.)
使用方法
等待是否可中斷
加鎖是否公平
鎖綁定多個條件Condition
package com.jian8.juc.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * synchronized和lock區別 * <p===lock可綁定多個條件=== * 對線程之間按順序調用,實現A>B>C三個線程啓動,要求以下: * AA打印5次,BB打印10次,CC打印15次 * 緊接着 * AA打印5次,BB打印10次,CC打印15次 * 。。。。 * 來十輪 */
public class SyncAndReentrantLockDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
shareData.print15();
}
}, "C").start();
}
}
class ShareData {
private int number = 1;//A:1 B:2 C:3
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
//判斷
while (number != 1) {
condition1.await();
}
//幹活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
//判斷
while (number != 2) {
condition2.await();
}
//幹活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
//判斷
while (number != 3) {
condition3.await();
}
//幹活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
複製代碼
package com.jian8.juc.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/** * 多線程中,第三種得到多線程的方式 */
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//FutureTask(Callable<V> callable)
FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyThread2());
new Thread(futureTask, "AAA").start();
// new Thread(futureTask, "BBB").start();//複用,直接取值,不要重啓兩個線程
int a = 100;
int b = 0;
//b = futureTask.get();//要求得到Callable線程的計算結果,若是沒有計算完成就要去強求,會致使堵塞,直到計算完成
while (!futureTask.isDone()) {//當futureTask完成後取值
b = futureTask.get();
}
System.out.println("*******Result" + (a + b));
}
}
class MyThread implements Runnable {
@Override
public void run() {
}
}
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Callable come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
}
}
複製代碼
線程池作的工做主要是控制運行的線程的數量,處理過程當中將任務放入隊列,而後在線程建立後啓動給這些任務,若是線程數量超過了最大數量,超出數量的線程排隊等候,等其餘線程執行完畢,再從隊列中取出任務來執行
主要特色
線程複用、控制最大併發數、管理線程
架構說明
Java中的線程池是經過Executor框架實現的,該框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor
graph BT
類-Executors
類-ScheduledThreadPoolExecutor-->類-ThreadPoolExecutor
類-ThreadPoolExecutor-->類-AbstractExecutorService
類-AbstractExecutorService-.->接口-ExecutorService
類-ScheduledThreadPoolExecutor-.->接口-ScheduledExecutorService
接口-ScheduledExecutorService-->接口-ExecutorService
接口-ExecutorService-->接口-Executor
複製代碼
編碼實現
實現有五種,Executors.newScheduledThreadPool()是帶時間調度的,java8新推出Executors.newWorkStealingPool(int),使用目前機器上可用的處理器做爲他的並行級別
重點有三種
Executors.newFixedThreadPool(int)
執行長期的任務,性能好不少
建立一個定長線程池,可控制線程最大併發數,超出的線程回在隊列中等待。
newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是想到等的,他使用的是LinkedBlockingQueue
Executors.newSingleThreadExecutor()
一個任務一個任務執行的場景
建立一個單線程話的線程池,他只會用惟一的工做線程來執行任務,保證全部任務按照指定順序執行
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,使用LinkedBlockingQueue
Executors.newCachedThreadPool()
執行不少短時間異步的小程序或負載較輕的服務器
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒縣城,若無可回收,則新建線程。
newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當縣城空閒超過60s,就銷燬線程
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
複製代碼
graph LR
subgraph 使用者
main(提交任務)
end
main-->core{核心線程是否已滿}
subgraph 線程池
core--是-->queue{隊列是否已滿}
queue--是-->pool{線程池是否已滿}
pool--是-->reject["按照拒絕策略處理<br>沒法執行的任務"]
core--否-->id[建立線程執行任務]
queue--否-->任務入隊列等待
pool--否-->建立線程執行任務
end
複製代碼
流程
在建立了線程池以後,等待提交過來的任務請求。
當調用execute()方法添加一個請求任務時,線程池會作出以下判斷
2.1 若是正在運行的線程數量小於corePoolSize,那麼立刻建立線程運行這個任務;
2.2 若是正在運行的線程數量大於或等於corePoolSize,那麼將這個任務放入隊列;
2.3若是此時隊列滿了且運行的線程數小於maximumPoolSize,那麼仍是要建立非核心線程馬上運行此任務
2.4若是隊列滿了且正在運行的線程數量大於或等於maxmumPoolSize,那麼啓動飽和拒絕策略來執行
當一個線程完成任務時,他會從隊列中卻下一個任務來執行
當一個線程無事可作超過必定的時間(keepAliveTime)時,線程池會判斷:
若是當前運行的線程數大於corePoolSize,那麼這個線程會被停掉;因此線程池的全部任務完成後他最大會收縮到corePoolSize的大小
什麼是線程策略
等待隊列也已經排滿了,再也塞不下新任務了,同時線程池中的max線程也達到了,沒法繼續爲新任務服務。這時咱們就須要拒絕策略機制合理的處理這個問題。
JDK內置的拒絕策略
AbortPolicy(默認)
直接拋出RejectedExecutionException異常阻止系統正常運行
CallerRunsPolicy
」調用者運行「一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而下降新任務的流量
DiscardOldestPolicy
拋棄隊列中等待最久的任務,而後把當前任務加入隊列中嘗試再次提交當前任務
DiscardPolicy
直接丟棄任務,不予任何處理也不拋異常。若是容許任務丟失,這是最好的一種方案
均實現了RejectedExecutionHandler接口
一個都不用,咱們生產上只能使用自定義的!!!!
爲何?
線程池不容許使用Executors建立,試試經過ThreadPoolExecutor的方式,指定任務隊列的大小,規避資源耗盡風險
FixedThreadPool和SingleThreadPool容許請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量請求;;CachedThreadPool和ScheduledThreadPool容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量線程,致使OOM
package com.jian8.juc.thread;
import java.util.concurrent.*;
/** * 第四種得到java多線程的方式--線程池 */
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(3, 5, 1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
//new ThreadPoolExecutor.AbortPolicy();
//new ThreadPoolExecutor.CallerRunsPolicy();
//new ThreadPoolExecutor.DiscardOldestPolicy();
//new ThreadPoolExecutor.DiscardPolicy();
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t辦理業務");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
複製代碼
CPU密集型
CPU密集的意思是該任務須要大量的運算,而沒有阻塞,CPU一直全速運行
CPU密集任務只有在真正多核CPU上纔可能獲得加速(經過多線程)
而在單核CPU上,不管你開幾個模擬的多線程該任務都不可能獲得加速,由於CPU總的運算能力就那些
CPU密集型任務配置儘量少的線程數量:
通常公式:CPU核數+1個線程的線程池
IO密集型
因爲IO密集型任務線程並非一直在執行任務,則應配置經可能多的線程,如CPU核數 * 2
IO密集型,即該任務須要大量的IO,即大量的阻塞。
在單線程上運行IO密集型的任務會致使浪費大量的 CPU運算能力浪費在等待。
因此在IO密集型任務中使用多線程能夠大大的加速程序運行,即便在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間。
IO密集型時,大部分線程都阻塞,故須要多配置線程數:
參考公式:CPU核數/(1-阻塞係數) 阻塞係數在0.8~0.9之間
八核CPU:8/(1-0,9)=80
死鎖是什麼
死鎖是指兩個或兩個以上的進程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力干涉那他們都將沒法推動下去,若是系統資源充足,進程的資源請求都可以獲得知足,死鎖出現的可能性就很低,不然就會因爭奪有限的資源而陷入死鎖。
graph TD
threadA(線程A)
threadB(線程B)
lockA((鎖A))
lockB((鎖B))
threadA--持有-->lockA
threadB--試圖獲取-->lockA
threadB--持有-->lockB
threadA--試圖獲取-->lockB
複製代碼
產生死鎖的主要緣由
死鎖示例
package com.jian8.juc.thread;
import java.util.concurrent.TimeUnit;
/** * 死鎖是指兩個或兩個以上的進程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力干涉那他們都將沒法推動下去, */
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldThread(lockA,lockB),"Thread-AAA").start();
new Thread(new HoldThread(lockB,lockA),"Thread-BBB").start();
}
}
class HoldThread implements Runnable {
private String lockA;
private String lockB;
public HoldThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "\t本身持有:" + lockA + "\t嘗試得到:" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "\t本身持有:" + lockB + "\t嘗試得到:" + lockA);
}
}
}
}
複製代碼
解決
jps -l
定位進程號jstack 進程號
找到死鎖查看jconsole
工具