java多線程那點事兒

前段時間應隔壁部門大佬的邀約,簡單地幫他們部門的童靴梳理了下多線程相關的內容,客串了一把講師【由於部門內有很多是c#轉java的童鞋,因此講的稍微淺顯了些】html

ok,按照我的習慣先來大綱java

知識點:mysql

1)進程 多線程的相關概念 涉及到CPU調度 稍微談下JVM內存模型 程序計數器
2)多線程的三種實現手段及其代碼實現 這邊能夠談下futurtask的底層源碼
3)經常使用鎖概念及實現說明 隱式鎖 顯式鎖 樂觀鎖 悲觀鎖 CAS 可重入鎖 不可重入鎖 讀寫鎖 線程安全 引伸的談下分佈式鎖 及分佈式鎖的原理,經常使用的三種實現手段 volatile關鍵字及其底層源碼
4)線程池的概念,線程池的使用 扒一扒線程池的源碼 緩存隊列 核心線程池 線程池建立任務的過程 線程池的生命週期等redis

多線程: 多線程是什麼? 多線程是一個程序(進程)運行時產生了不止一個線程。算法

進程和線程區別 一個正在執行的程序,進程是控制程序的執行順序。這個順序又被稱爲一個控制單元。sql

並行和併發的概念: 並行:多個CPU實例或者多臺機器同時執行一段處理邏輯 併發:經過CPU調度算法,讓用戶看上去是同時執行的,在CPU層面不是同時。數據庫

這邊衍生的能夠談下JVM中內存模型的程序計數器 就是記錄java執行字節碼的行號指示器。編程

jvm內存模型: 線程私有:
程序計數器:記錄程序執行過程當中的字節碼的行號指示器
java虛擬機棧: 主要是是被調用的java方法表明的是一個個棧幀 局部變量表 操做數棧 動態連接 方法出口等等 java.lang.StackOverflowError
本地方法棧: 主要是是被調用的native方法表明的是一個個棧幀c#

線程公有
堆 : 對象實例
方法區:最重要的就是運行時常量池 gc緩存

爲何要是用多線程:
1)充分的利用CPU資源,若是隻有一個線程的話,第二個任務必須等第一個任務完成以後才能進行。
2)進程之間沒法共享數據,可是線程能夠
3)建立進程須要爲這個進程分配系統資源,建立線程的代價小

多線程的實現手段【3種手段】
1)Thread

package com.Allen.test;

import java.util.concurrent.TimeUnit;

public class testThread extends Thread{
	public static void main(String[] args) {
		testThread t1=new testThread();
		testThread t2=new testThread();
		t1.start();
		t2.start();
	}
	
	
	 public void run(){
		 System.out.println("start");
		 try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		 System.out.println("end");
	}
}
複製代碼

2)runnable

package com.Allen.test;

import java.util.concurrent.TimeUnit;

public class testRunnable {
	public static void main(String[] args) {
		testAllen th1=new testAllen();
		for(int i=0;i<5;i++){
			Thread t1=new Thread(th1);
			t1.start();
		}
	}
}

class testAllen implements Runnable{

	@Override
	public void run() {
		 System.out.println("start");
		 try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		 System.out.println("end");
	}	
}
複製代碼

3)future callable

package com.Allen.test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class testFuture {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		futureTask task=new futureTask();
//		//線程池  單線程的線程池
//		ExecutorService service=Executors.newCachedThreadPool();
//		Future<Integer> future=service.submit(task);
//		//說到下面這個方法就要提及線程池的狀態  四個種狀態
//		service.shutdown();
		
		FutureTask<Integer>future=new FutureTask<>(task);
		Thread t1=new Thread(future);
		t1.start();
		System.out.println("run task ....");
		TimeUnit.SECONDS.sleep(1);
		System.out.println("result : "+future.get());
	}
}

class futureTask implements Callable<Integer>{

	@Override
	public Integer call() throws Exception {
		TimeUnit.SECONDS.sleep(2);
		int result=0;
		//模擬一個龐大的計算
		for(int i=0;i<100;i++){
			for(int j=0;j<i;j++){
				result+=j;
			}
		}
		return result;
	}
	
}
複製代碼

扒一扒futureTask的源碼
首先咱們看run方法

1)run方法中state不是new或者線程再次給他設置啓動出錯直接讓他return掉
2)完後方法後調用set方法 把結果賦值給outcome

這邊用了CAS方式來設置stat狀態

拿到棧頂的元素一個個去喚醒

報錯的話返回異常信息

接下來咱們談下多線程操做中比較核心的東西
多線程操做共享變量的問題
鎖機制
最經常使用的鎖機制 synchronized及lock
隱式鎖 synchronized 談下底層原理
顯示鎖 lock
重入鎖

我設計了一個實例來直觀的觀察多線程操做共享變量的線程安全問題

package com.Allen.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TestSynchronized {
	public static void main(String[] args) throws InterruptedException {
		for (int s = 0; s < 10; s++) {
			Person person = new Person();
			person.setAge(0);
			ReentrantLock lock = new ReentrantLock();
			for (int i = 0; i < 100; i++) {
				Thread t1 = new Thread(new test111(person,lock));
				t1.start();
			}
			TimeUnit.SECONDS.sleep(3);
			System.out.println(person.getAge());
		}
	}
}

class test111 implements Runnable {
	Person person;
	ReentrantLock lock;

	public test111(Person person,ReentrantLock lock) {
		this.person = person;
		this.lock=lock;
	}

	public void run() {
//		synchronized (person) {
//			person.setAge(person.getAge() + 1);
//		}
		
		lock.lock();
		person.setAge(person.getAge() + 1);
		lock.unlock();
		 //person.setAge(person.getAge()+1);
		 
	}
}
複製代碼

悲觀鎖 樂觀鎖 CAS
一 悲觀鎖
在關係型數據庫管理系統中,悲觀併發控制(悲觀鎖)是一種併發控制的方法。 簡單而言,就是它「悲觀」地默認每次拿數據都認爲別人會修改,因此在每次拿以前去上鎖,這樣別人想拿這個數據就會block直到它拿到鎖。傳統悲觀鎖實現機制大多利用數據庫提供的鎖機制(也只有數據庫層面提供的鎖機制才能真正保證了數據訪問的排他性)。

悲觀鎖流程以下:
1 對任意記錄進行修改以前,嘗試給它加排它鎖。
2 如果加鎖失敗,說明該記錄正在修改,那麼當前須要等待或者拋出異常。
3 若是成功加鎖,那麼就能夠對記錄進行修改,事務完成以後解鎖。
4 期間其餘人須要對該記錄進行修改或者加排查鎖操做,就不準等待咱們解鎖或者直接拋出異常。

以mysql爲例
使用悲觀鎖,先關閉mysql的自動提交屬性。
set autocommit=0

begin;
select status from t_goods where id=1 for update;
insert into t_orders(id,goods_id)values(null,1);
update t_goods set status=2;
commit;

發起事務 操做 提交事務

select for update 開啓排它鎖方式實現悲觀鎖,mysql InnoDB默認行級鎖【ps:注意一點,行級鎖都是基於索引,若是用不到索引會使用表級鎖吧整張表鎖住】

優勢與缺點:
悲觀併發控制實現上是「先取鎖再訪問」的保守策略,爲數據安全提供保障,可是犧牲了效率,處理加鎖會讓數據庫產生額外的開銷,還增長了死鎖的機會,下降了並行性,若是一個實物鎖定了某行數據,其餘事物必須等待改事務處理完才能處理那一行。適用於狀態修改很是高,衝突很是嚴重的系統。

二 樂觀鎖
假設了多用戶併發事務處理下不會彼此影響,各事務在不產生鎖的狀況下處理各自的那部分數據,
每次去拿數據都認爲別人不會修改,因此不會上鎖,只會在更新的時候判斷一下此期間別人有沒有更新這個數據。若是有其餘事務更新的話,正在提交的事務會回滾。
通常來講樂觀鎖不會使用數據庫提供的機制,咱們一般採用記錄數據版原本實現樂觀鎖 記錄的方式有版本號或者時間戳。

在數據初始化的時候指定一個版本號,每次對數據更新在對版本號作+1操做,並判斷當前的版本號是否是該數據的最新版本。

優勢與缺點;
樂觀併發控制事務之間數據競爭機率較小,能夠一直作下去知道提交纔會鎖定,有點相似於svn

當倆個事務都讀到某一行修改回傳數據庫就會遇到問題。

三 CAS 無鎖化編程
java.util.concurrent包中藉助CAS實現了區別於synchronouse同步鎖的一種樂觀鎖。 無鎖化編程

CAS有三個操做數,內存值V,舊的預期值A,修改的新值B。當且僅當預期值A與內存值V相同時,將內存值V修改成B,不然什麼都不作。

優勢與缺點: 能夠用CAS在無鎖的狀況下實現原子操做,但要明確應用場合,很是簡單的操做且又不想引入鎖能夠考慮使用CAS操做,當想要非阻塞地完成某一操做也能夠考慮CAS。 CAS雖然很高效的解決原子操做,可是CAS仍然存在三大問題。ABA問題,循環時間長開銷大和只能保證一個共享變量的原子操做

  1. ABA問題。由於CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。 從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。 關於ABA問題參考文檔: blog.hesey.net/2011/09/res…

  2. 循環時間長開銷大。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。若是JVM能支持處理器提供的pause指令那麼效率會有必定的提高,pause指令有兩個做用,第一它能夠延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它能夠避免在退出循環的時候因內存順序衝突(memory order violation)而引發CPU流水線被清空(CPU pipeline flush),從而提升CPU的執行效率。

  3. 只能保證一個共享變量的原子操做。當對一個共享變量執行操做時,咱們可使用循環CAS的方式來保證原子操做,可是對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖,或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你能夠把多個變量放在一個對象裏來進行CAS操做。

compareAndSet有點相似於以下

if (this == expect) {
this = update
return true;
} else {
return false;
}

以下附上CAS實例

volatile關鍵字 可是咱們要思考下併發編程下的倆個關鍵問題?
1 線程之間是如何通訊
1.1 共享內存
隱式通訊
1.2 消息傳遞
顯式通訊
2 線程之間是如何同步

在共享內存的併發模型中,同步是顯示作的,synchronized
在消息傳遞的併發模型,因爲消息的發送必需要在消息的接受以前,因此同步是隱式的。

2 定位內存可見性問題:
什麼對象是內存共享的,什麼不是。

主內存:共享變量

私有本地內存:存儲共享變量的副本

可見性原理
1 volatile聲明的變量進行寫操做的時候,jvm會向處理器發送一條lock前綴的指令,會把這個變量所在緩存行的數據回到系統內存。
2 在多處理器的狀況下,保證各個處理器緩存一致性的特色,就會實現緩存一致性協議。

synchronized :可重入鎖,互斥性,可見性。
volatile:原子性,可見性。不能作到複合操做的原子性。性能開銷更小。

synchronized 線程A釋放鎖以後會把本地內存的變量同步到主內存
線程B獲取鎖的時候會把主內存的共享變量同步到本地內存中。

調用monitorenter monitorexit
對象同步方法調用的時候必需要獲取到它的監視器,用完以後會釋放。

獲取不到監聽器會放在隊列中。

多進程下訪問共享變量
分佈式鎖 包括三種經常使用的使用手段
1 ) 經過數據庫的方式
create table lock(
ID
Method_Name 惟一約束
)

每次去操做文件的是否,都去插入表,獲取鎖是否才能去操做文件,
等鎖釋放【刪除這個記錄】才能insert

缺點:
刪除失敗 【等待程序不可用】
重入鎖【能夠對進程進行編號,來判斷重入】

2) 使用zookeeper

臨時有序節點
誰先寫到節點上,獲取鎖
有個watch機制,會判斷節點是否失效,失效以後會讀取下一個節點

3) redis
setnx
誰先set這個值,誰就現獲取鎖
後續set失敗的就是沒有獲取鎖
等待鎖釋放以後你才能set這個值。

線程池

線程池種類及區別:
運行executors類給咱們提供靜態方法調用不一樣的線程池
newsingleThreadExecutor:
返回一個單線程的executor,將多個任務交給這個Executor時,這個線程處理完一個任務以後接着處理下一個任務,若該線程出現異常,將會有一個新的線程替代。
newFixedThreadPool
返回一個包含指定數目線程的線程池,任務數量多於線程數目,則沒有執行的任務必須等待,直到任務完成。
newCahedThreadPool
根據用戶的任務數建立相應的線程數來處理,該線程池不會對線程數加以限制,徹底依賴於JVM能建立的線程數量,可能引發內存不足。
用法以下:

package com.Allen.TestPoolSize;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test {
	public static void main(String[] args) {
		//單線程化的線程池
		ExecutorService service1=Executors.newSingleThreadExecutor();
		service1.execute(new Runnable() {
			@Override
			public void run() {
				System.out.println("aaa");
			}
		});
		//可緩存線程池
		ExecutorService service2=Executors.newCachedThreadPool();
		service2.execute(new Runnable() {
			public void run() {
				System.out.println("bbb");
			}
		});
		//定長線程池
		ExecutorService service3=Executors.newFixedThreadPool(3);
		service3.execute(new Runnable() {
			public void run() {
				System.out.println("ccc");
			}
		});
		//定長線程池支持定時和週期任務
		ScheduledExecutorService service4=Executors.newScheduledThreadPool(5);
		service4.schedule(new Runnable() {
			@Override
			public void run() {
				System.out.println("ddd");	
			}
		}, 3, TimeUnit.SECONDS);	
	}
}
複製代碼

過程 原理
扒一扒源碼
首先咱們寫一個test類

package com.Allen.studyThread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class test {
	public static void main(String[] args) {
		ExecutorService threadpool=Executors.newFixedThreadPool(3);
		for(int i=0;i<10;i++){
			threadpool.submit(new testrunnable("allen_"+i));
		}
		threadpool.shutdown();
	}
}

class testrunnable implements Runnable{
	
	private String name;
	
	public testrunnable(String name){
		this.name=name;
	}
	
	public void run() {
		System.out.println(name+" start...");
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(name+" end...");
	} 
}
複製代碼

看線程池源碼,咱們入口從newFixedThreadPool入口入,咱們看看他初始化作了什麼 進入到Executors類的newFixedThreadPool方法傳入一個參數

這個參數是線程池併發執行的線程數量
例如50個任務,10個多線程,分五次執行,傳入的參數就是10。
由上面的方法能夠看到,它是實例化了ThreadPoolExecutor類
繼承關係 Executor---Executorservice--abstractExecutorservice--Threadpoolexecutor 注意這個就是咱們須要學習線程池的核心類,這裏面傳入了5個參數。
咱們進去看下:

這邊涉及到幾個核心的參數:
1)corepoolsize
2)maximumpoolsize
3)keepalivetime
4)workqueue
5)threadfactory
重要參數及其做用
corepoolsize:核心池大小,默認狀況下,建立了線程池以後,線程池中數量爲0,當任務進來以後會建立一個線程去執行任務,當線程池中的線程數到達corepoolsize以後會把任務添加到緩存隊列中。
maximumpoolsize:線程池中最多能夠建立多少線程。
keepalivetime:線程沒有任務執行,最多保存多久會終止。當線程池中線程數大於corepoolsize,這個機制纔會生效。
workqueue:阻塞隊列,用來存放等待被執行的任務。
threadfactory:線程工廠,用來建立線程。
線程池狀態
1 當線程池被建立後,初始狀態爲running
2 調用shutdown方法以後,處於shutdown狀態,不接受新的任務,等待已有任務完成。
3 調用shutdownnow方法以後,進入stop狀態,不接受新任務,而且嘗試終止正在執行的任務。
4 處於shutdown或者stop狀態,而且全部工做線程均被銷燬,任務緩存隊列被清空,線程池就被設置爲terminated狀態。
任務提交到線程池的具體操做
1 當前線程池中的線程數小於corepoolsize,則把任務建立爲線程執行。
2 當前線程池中的線程數大於等於corepoolsize,則嘗試把任務添加到緩存隊列,添加成功以後,則此任務會等待空閒線程來執行此任務,若是添加失敗,則嘗試建立線程去執行這個任務。
3 當前線程池中線程數大於等於macimumpoolsize,則採起拒絕策略(4種拒絕策略)
3.1 abortpolicy丟棄任務,拋出rejectedExecutionExceptipon
3.2 discardpolicy 拒絕執行,不拋出異常
3.3 discardoldpolicy 丟棄任務緩存隊列中最老的任務,而且嘗試從新提交新任務
3.4 callerrunspolicy 有反饋機制,讓任務提交的速度變慢

而後咱們看下他的submit方法

它其實調用的是execute方法

核心方法以下:

1)判斷運行的線程數量小於核心線程數,小於的話直接加入worker啓動
2)判斷運行線程數量大於核心線程數,上面if分支針對大於corepoolsize,而且緩存隊列加入任務操做成功的狀況,運行中而且把任務加入緩衝隊列成功,正常而言這樣就完成了處理邏輯
爲了保險起見,增長了狀態出現異常的判斷,若是異常,就會繼續remove操做,結果爲true的話,就按照拒絕策來拒絕
3)運行線程數大於corepoolsize而且緩衝隊列也已經滿了
這邊addworker傳遞的是false,意味着它會去判斷maximumpoolsize
使用拒絕策略
咱們主要看增長工做線程的流程

主要是runworker 中

執行任務,而且當執行完畢以後再去獲取新的task繼續執行,gettask方法是有ThreadPoolExecutor這個方法提供

線程調用runWoker,會while循環調用getTask方法從workerQueue裏讀取任務,而後執行任務。只要getTask方法不返回null,此線程就不會退出。咱們去看下getTask方法
若是運行線程數超過客最大的線程數,可是緩衝隊列已經空了,此時遞減worker的數量
若是設置容許線程超時或者線程數量超過了核心線程數,而且線程在規定的時間內都沒有poll到任務隊列爲空,則遞減worker數量

相關文章
相關標籤/搜索