java併發編程

基礎篇

基礎概念

進程:資源分配的最小單位

線程: cpu調度的最小單位,共享進程中的資源,必須依附進程

並行: 同一時刻運行進行的任務

併發:在單位時間段內處理的任務數(例: 每秒的併發數)

啓動線程

啓動線程的三種方式 1.extends Thread 2.implements Runnable 3.implements Callable

Callable 允許有返回值

//方式1.extends Thread
	public static class MyThread extends Thread {
		@Override
		public void run() {
			super.run();
			System.out.println(Thread.currentThread().getName() + "extends thread");
		}
	}
	//方式2.implements Runnable
	public static class MyRunnable implements Runnable {
		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName() + "implements Runnable");
		}
	}
	//方式3.implements Callable - 有返回值
	public static class MyCalladle implements Callable<String> {
		@Override
		public String call() throws Exception {
			System.out.println(Thread.currentThread().getName() + "implements Callable");
			return "CallResult";
		}
	}

	public static void main(String[] args) 
        throws InterruptedException, ExecutionException {
        
		MyThread myThread = new MyThread();
		myThread.start(); //啓動 extends Thread

		MyRunnable myRunnable = new MyRunnable();
		new Thread(myRunnable).start();//啓動 implements Runnable

		MyCalladle myCalladle = new MyCalladle();
		FutureTask<String> futureTask = new FutureTask<>(myCalladle);
		new Thread(futureTask).start(); //啓動 implements Callable
        
        /*futureTask.get() 獲取線程返回結果. 該方法爲阻塞方法,只有當對應的線程運行結束後纔會繼續運行*/
		System.out.println("GET myCalladle result = " + futureTask.get());
	}
安全的停止線程

1.線程結束後,自動停止

2.interrupt() 不會去真正中斷線程,只是把一箇中斷標誌符改爲true. 使用isInterrupted() 或 static方法interrupted() 來檢查中斷標誌位

3.stop() 過時的,已經不贊成使用,因爲該方法會使線程進入休眠狀態,但該線程佔用的資源不會釋放,容易造成死鎖和資源浪費的情況

/** * 使用interrupt()停止線程 示例 * * 在某些情況下我們需要無限循環的方式處理業務,這種業務處理我們往往把它放在獨立的線程中運行. * 但因爲某些原因可能需要停止該線程(業務處理),這時就需要用 interrupt 方法實現,使方法退出無限 * 循環,讓線程達到運行結束而自動停止的狀態 */
	private static class MyThread extends Thread {

		public MyThread(String name) {
			super(name);
		}

		@Override
		public void run() {
			while (!isInterrupted()) {
				System.out.println(Thread.currentThread().getName() + "run...");
			}
			System.out.println("stop run..." + Thread.currentThread().isInterrupted());
		}
        
	}

	public static void main(String[] args) {
		MyThread myThread = new MyThread("tianjieyi");
		myThread.start();
		
        try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		myThread.interrupt();
	}
線程常用方法和線程狀態

在這裏插入圖片描述

yield() 是讓當前線程讓出cpu時間,os依然可以選中當前線程

join() 當一個線程(A)調用另一個線程(B)的join()後,線程A會等待線程B運行結束後獲得執行權.阻塞方法

join()方法還有, join(long millis)和join(longmillis,int nanos)兩個具備超時特性的,當時間一到不管線程B是否執行結束,線程A都會獲得執行權

線程的優先級

Thread的 setPriority() 方法可以設置線程的優先級,,優先級的範圍從1~10,默認優先級是5.

但在不同的JVM和操作系統上,線程規則存在差異,線程優先級不一定正確.有些操作系統甚至會忽略線程優先級的設定.所以setPriority()並不可靠.

守護線程

Thread的 setDaemon(true) 方法可以設置線程爲守護線程.

守護線程: 是一種支持型線程,因爲它主要被用作程序中後臺調度以及支持性工作,比如垃圾回收線程就是一個很稱職的守護者.

當所有非守護線程停止後,Java虛擬機將會退出,這時守護線程也將會退出.但因Java虛擬機退出而造成的守護線程停止情況下,守護線程中的 finally不能得到執行.因此在構建Daemon線程時不能依賴finally來確保執行關閉和資源釋放的操作

//示例: 測試因Java虛擬機退出而造成的守護線程停止情況下,守護線程中的 finally不能得到執行
	private static class myThread extends Thread {

		@Override
		public void run() {
			try {
				while (!isInterrupted()) {
					System.out.println(Thread.currentThread().getName() + "run...");
				}
			} finally {
				System.out.println("finally...");
			}
		}

	}
	
	public static void main(String[] args) throws InterruptedException {
		myThread myThread = new myThread();
		myThread.setDaemon(true);
		myThread.start();
		SleepTools.ms(1);

		// myThread.interrupt(); // 主動停止守護線程 finally 會得到執行
		System.out.println("main...");
	}

協作通信

synchronized

synchronized 是java提供的內置鎖

在併發運行時,使用synchronized修飾的變量,方法,代碼塊.可以保證被修飾部分在同一時間只能有一個線程可以訪問

//synchronized的基本使用
public class SynTest {

	private int age = 100000;

	public int getAge() {
		return age;
	}

	public synchronized void add() { // 實例方法, 鎖的是對象
		age++;
	}

	public synchronized void remove() {
		age--;
	}

	public static class myThread extends Thread {

		private SynTest synTest;

		public myThread(String name, SynTest synTest) {
			super(name);
			this.synTest = synTest;
		}

		@Override
		public void run() {
			for (int i = 0; i < 100000; i++) {
				synTest.add();
			}
		}

	}

	public static void main(String[] args) {
		SynTest synTest = new SynTest();
		myThread myThread = new myThread("endThread", synTest);
		myThread.start();

		for (int i = 0; i < 100000; i++) {
			synTest.remove();
		}
		System.out.println(Thread.currentThread().getName() + " = " + synTest.getAge());
	}

}
/** * 鎖與鎖之間的關係 * * <p>同一個類中同時存在類鎖和實例鎖,併發運行時兩者會不影響可以同時訪問運行 * * <p>同一個對象的不同實例,併發運行時鎖是互不影響的可以同時訪問運行 * 例: synClassAndInstance 和 synClassAndInstance2是同一個對象的不同實例 * 當兩個線程併發訪問SynClassAndInstance對象的實例鎖1instance1方法時,兩個線程是可以同時 * 訪問instance1方法的 * * SynClassAndInstance synClassAndInstance = new SynClassAndInstance(); * Instance1 instance11 = new Instance1(synClassAndInstance); * new Thread(instance11).start(); * * SynClassAndInstance synClassAndInstance2 = new SynClassAndInstance(); * Instance1 instance12 = new Instance1(synClassAndInstance2); * new Thread(instance12).start(); * * * <P>實例(對象)鎖,鎖的是一個實例.當一個實例(對象)鎖定時,該對象的任何方法都不會被調用. * 例:instance1 和 instance2 是同一個對象的兩個不同的方法,當兩個不同的線程(A B)併發去執行 * instance1和instance2 方法時,只有等線程A執行完instance1方法後釋放了鎖線程B纔會去執行 * instance2方法 * * SynClassAndInstance synClassAndInstance = new SynClassAndInstance(); * Instance1 instance1 = new Instance1(synClassAndInstance); * new Thread(instance1).start(); * * Instance2 instance2 = new Instance2(synClassAndInstance); * new Thread(instance2).start(); */
public class SynClassAndInstance {

	private static synchronized void synClass() { // 類鎖
		SleepTools.second(1);
		System.out.println("synClass go…………");
		SleepTools.second(1);
		System.out.println("synClass end…………");
	}

	private synchronized void instance1() { // 實例(對象)鎖1
		SleepTools.second(1);
		System.out.println("instance1 go…………");
		SleepTools.second(1);
		System.out.println("instance1 end…………");
	}

	private synchronized void instance2() { // 實例(對象)鎖2
		SleepTools.second(1);
		System.out.println("instance2 go…………");
		SleepTools.second(1);
		System.out.println("instance2 end…………");
	}

	private static class SynClass extends Thread {
		@Override
		public void run() {
			System.out.println("SynClass start…………");
			synClass();
		}
	}

	private static class Instance1 implements Runnable {
		private SynClassAndInstance synClassAndInstance;

		public Instance1(SynClassAndInstance synClassAndInstance) {
			this.synClassAndInstance = synClassAndInstance;
		}

		@Override
		public void run() {
			System.out.println("Instance1 start…………");
			synClassAndInstance.instance1();
		}
	}

	private static class Instance2 implements Runnable {

		private SynClassAndInstance synClassAndInstance;

		public Instance2(SynClassAndInstance synClassAndInstance) {
			this.synClassAndInstance = synClassAndInstance;
		}

		@Override
		public void run() {
			System.out.println("Instance2 start…………");
			synClassAndInstance.instance2();
		}

	}

	public static void main(String[] args) {
		SynClass synClass = new SynClass();
		synClass.start();

		SynClassAndInstance synClassAndInstance = new SynClassAndInstance();
		Instance1 instance1 = new Instance1(synClassAndInstance);
		new Thread(instance1).start();

		SynClassAndInstance synClassAndInstance2 = new SynClassAndInstance();
		Instance1 instance12 = new Instance1(synClassAndInstance2);
		new Thread(instance12).start();

		Instance2 instance2 = new Instance2(synClassAndInstance);
		new Thread(instance2).start();

		Instance2 instance22 = new Instance2(synClassAndInstance2);
		new Thread(instance22).start();
	}

}
volatile

多線程去訪問 volatile 修飾的變量時,會去訪問共享內存中 變量的值,但不會在修改後寫回內存

volatile關鍵字,最輕量的同步機制

ThreadLocal

使用 ThreadLocal 定義的變量 在多線程中就像線程中本身的變量一樣,在各個線程中的使用,是互不影響的

// ThreadLocal的使用示例
public class MyThreadLocal {

	/* 聲明一個所有線程都共享的 ThreadLocal類型的static變量,並初始化爲1 */
	ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
		protected Integer initialValue() {
			return 1;
		}
	};

	private static class MyThread extends Thread {

		private MyThreadLocal myThreadLocal;

		public MyThread(MyThreadLocal myThreadLocal, String name) {
			super(name);
			this.myThreadLocal = myThreadLocal;
		}

		@Override
		public void run() {
			int a = myThreadLocal.threadLocal.get();
			a = a + Integer.valueOf(Thread.currentThread().getName());
			myThreadLocal.threadLocal.set(a);
			System.out.println("Thread - " + Thread.currentThread().getName() + " threadLocal的值:"+ myThreadLocal.threadLocal.get());
		}

	}

	public static void main(String[] args) {
		MyThreadLocal myThreadLocal = new MyThreadLocal();
		new MyThread(myThreadLocal, "1").start();
		new MyThread(myThreadLocal, "2").start();
		new MyThread(myThreadLocal, "3").start();
		new MyThread(myThreadLocal, "4").start();
		new MyThread(myThreadLocal, "5").start();
	}

}
等待和通知 - wait/notify/notifyAll

當一個線程調用wait()方法後,將會進入等待狀態,阻塞運行

當調用notify/notifyAll方法後,等待狀態下的線程將會得到通知繼續運行

notify() 喚醒一個線程,notifyAll()喚醒所有線程.因CPU的調度機制notify()不能指定線程喚醒,所以應儘量使用notifyAll()方法,除非可以確定當前等待的線程只有一個

// wait()/notifyAll() 方法的使用示例

// 快遞實體類
public class Express {

	public final static String CITY = "ShangHai";
	private int km;/* 快遞運輸里程數 */
	private String site;/* 快遞到達地點 */

	public Express() {
	}

	public Express(int km, String site) {
		this.km = km;
		this.site = site;
	}

	/* 變化公里數,然後通知處於wait狀態並需要處理公里數的線程進行業務處理 */
	public synchronized void changeKm() {
		this.km = 101;
		notifyAll();
	}

	/* 變化地點,然後通知處於wait狀態並需要處理地點的線程進行業務處理 */
	public synchronized void changeSite() {
		this.site = "BeiJing";
		notifyAll();
	}

	public synchronized void waitKm() {
		while (this.km <= 100) {// 公里數小於100不做處理
			try {
				wait();
				System.out.println("Check Km thread[" + Thread.currentThread().getId() + "] is be notified");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the Km is " + this.km + ",I will change db");
	}

	public synchronized void waitSite() {
		while (this.site.equals(CITY)) {// 快遞到達目的地
			try {
				wait();
				System.out.println("Check Site thread[" + Thread.currentThread().getId() + "] is be notified");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("the site is " + this.site + ",I will call user");
	}

}


//測試類
public class TestWN {
	private static Express express = new Express(0, Express.CITY);

	/* 檢查里程數變化的線程,不滿足條件,線程一直等待 */
	private static class CheckKm extends Thread {
		@Override
		public void run() {
			express.waitKm();
		}
	}

	/* 檢查地點變化的線程,不滿足條件,線程一直等待 */
	private static class CheckSite extends Thread {
		@Override
		public void run() {
			express.waitSite();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 3; i++) {
			new CheckSite().start();
		}
		for (int i = 0; i < 3; i++) {
			new CheckKm().start();
		}

		SleepTools.ms(1000);
		express.changeKm();// 快遞地點變化
	}
}
join

join() 當一個線程(A)調用另一個線程(B)的join()後,線程A會等待線程B運行結束後獲得執行權,阻塞方法

join()方法還有, join(long millis)和join(longmillis,int nanos)兩個具備超時特性的,當時間一到不管線程B是否執行結束,線程A都會獲得執行權

//join()方法的使用示例
public class MyJoin {

	static class JumpQueue implements Runnable {

		private Thread thread;

		public JumpQueue(Thread thread) {
			this.thread = thread;
		}

		public void run() {
			try {
                 // 調用傳入線程的join方法,必須等這個方法返回後,當前線程才能繼續執行
				thread.join();
			} catch (InterruptedException e) {
			}
			System.out.println(Thread.currentThread().getName() + " terminate.");
		}
	}

	public static void main(String[] args) throws Exception {
		Thread previous = Thread.currentThread();// 現在previous是主線程
		for (int i = 0; i < 10; i++) {
			// 每個線程擁有前一個線程的引用,需要等待前一個線程終止,才能從等待中返回
			Thread thread = new Thread(new JumpQueue(previous), String.valueOf(i));
			System.out.println("Thread:" + thread.getName() + " wait the Thread: " + previous.getName());
			thread.start();
			previous = thread;
		}

		SleepTools.second(2);// 讓主線程休眠2秒
		System.out.println(Thread.currentThread().getName() + " terminate.");
	}
}
yield() 、sleep()、wait()、notify()/notifyAll() 等方法 和鎖之間的關係與影響

yield() 調用前如果持有鎖, 調用後不會釋放鎖
sleep() 調用前如果持有鎖, 調用後不會釋放鎖
wait() 調用前必須持有鎖,調用了wait()方法以後,鎖會被釋放
notify() 調用前必須持有鎖, 包含了notify()的方法結束以後,鎖纔會被釋放

//yield() 、sleep()、wait()、notify()/notifyAll()等方法和鎖之間的關係與影響 示例
public class Myyswn {

	public synchronized void test() {
		System.out.println(Thread.currentThread().getName() + " -----------");

		// Thread.yield();
		// SleepTools.second(1);

		// try {
		// Thread.sleep(1000);
		// } catch (InterruptedException e) {
		// }

		try {
			wait();
		} catch (InterruptedException e) {
		}

		System.out.println(Thread.currentThread().getName() + " ............");
	}

	public synchronized void test1() {
		notifyAll();
		SleepTools.second(2);
		System.out.println(Thread.currentThread().getName() + " notifyAll....");
	}

	private static class MyRunnable implements Runnable {

		private Myyswn myYield;

		public MyRunnable(Myyswn myYield) {
			super();
			this.myYield = myYield;
		}

		@Override
		public void run() {
			myYield.test();
		}
	}

	public static void main(String[] args) {
		Myyswn myYield = new Myyswn();
		new Thread(new MyRunnable(myYield)).start();
		new Thread(new MyRunnable(myYield)).start();

		SleepTools.ms(100);
		myYield.test1();
	}

}

工具類

Fork/Join

Fork/Join框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務,再將一個個的小任務的運算結果進行join彙總

RecursiveTask 和 RecursiveAction 是繼承 ForkJoinTask 的不同實現

其中 RecursiveTask 有返回值,RecursiveAction 沒有返回值

1545789384597

1545789410652

//RecursiveTask使用示例,獲取指定目錄下的文件個數
public class SumDirsFiles extends RecursiveTask<Integer> {

	private File file;

	public SumDirsFiles(File file) {
		this.file = file;
	}

	@Override
	protected Integer compute() {
		File[] listFiles = file.listFiles();
		if (listFiles == null) {
			return 0;
		}

		int count = 0; //文件個數
		int dirCount = 0; //目錄個數
		List<SumDirsFiles> list = new ArrayList<>();
		for (File file : listFiles) {
			if (file.isDirectory()) {
				list.add(new SumDirsFiles(file));
				dirCount++;
			} else {
				count++;
			}
		}
		System.out.println("線程: " + Thread.currentThread().getName() + ",目錄 :" + file.getAbsolutePath() + "下包含目錄個數:"
				+ dirCount + "文件個數:" + count);
		
        if (!list.isEmpty()) {
             //將本次目錄下的目錄全部invokeAll,再次執行SumDirsFiles的運算
			Collection<SumDirsFiles> invokeAll = invokeAll(list); 
			for (SumDirsFiles sumDirsFiles : invokeAll) {
				count = count + sumDirsFiles.join();
			}
		}
        
		return count;
	}

	public static void main(String[] args) {
		SumDirsFiles sumDirsFiles = new SumDirsFiles(new File("G:/old/olddesk/"));
		ForkJoinPool forkJoinPool = new ForkJoinPool();
        
		long currentTimeMillis = System.currentTimeMillis();
		forkJoinPool.invoke(sumDirsFiles); // invoke同步執行,會阻塞

		SleepTools.ms(100);
		long currentTimeMillis2 = System.currentTimeMillis();
		System.out.println("I am do work" + currentTimeMillis2 + "相差 ……" + (currentTimeMillis2 - currentTimeMillis));
		System.out.println("總文件數 = " + sumDirsFiles.join());

		long currentTimeMillis3 = System.currentTimeMillis();
		System.out.println("end " + currentTimeMillis3 + "相差 …… " + (currentTimeMillis3 - currentTimeMillis));
	}

}
// 使用RecursiveAction的示例,查詢指定文件夾下所有的txt文件
public class FindDirsFiles extends RecursiveAction {

	private File path;

	public FindDirsFiles(File path) {
		this.path = path;
	}

	@Override
	protected void compute() {
		List<FindDirsFiles> subTasks = new ArrayList<>();

		File[] files = path.listFiles();
		if (files != null) {
			for (File file : files) {
				if (file.isDirectory()) {
					subTasks.add(new FindDirsFiles(file));
				} else {
					if (file.getAbsolutePath().endsWith("txt")) {
						System.out.println(Thread.currentThread().getName() + "," + file.getAbsolutePath());
					}
				}
			}
			if (!subTasks.isEmpty()) {
				invokeAll(subTasks);
			}
		}
	}

	public static void main(String[] args) {
		FindDirsFiles findDirsFiles = new FindDirsFiles(new File("G:/old/olddesk/"));
		ForkJoinPool forkJoinPool = new ForkJoinPool();

		long currentTimeMillis = System.currentTimeMillis();
		System.out.println("Task is Running......" + currentTimeMillis);
		forkJoinPool.execute(findDirsFiles); // execute 異步執行,不會阻塞

		SleepTools.ms(100);
		long currentTimeMillis2 = System.currentTimeMillis();
		System.out.println("I am do work" + currentTimeMillis2 + "相差 …… " + (currentTimeMillis2 - currentTimeMillis));

		findDirsFiles.join(); // join 獲取執行結果,會阻塞

		long currentTimeMillis3 = System.currentTimeMillis();
		System.out.println("end " + currentTimeMillis3 + "相差 …… " + (currentTimeMillis3 - currentTimeMillis));
	}

}
CountDownLatch

允許一個或多個線程等待其他線程完成各自的操作後繼續自己的線程,類似加強版的join

這裏的等待其他線程完成各自的操作,並不需要其他線程都執行結束,只需要countDown()方法執行完畢就行

//CountDownLatch的使用示例,共5個子線程,6個閉鎖釦除點,扣除完畢後,主線程和業務線程才能繼續執行
public class MyCountDownLatch {

	static CountDownLatch countDownLatch = new CountDownLatch(6);

	// 線程1,運行一次扣除一個閉鎖釦除點
	private static class initRunnable implements Runnable {

		@Override
		public void run() {
			SleepTools.ms(5);
			System.out.println("Thread " + Thread.currentThread().getName() + " init work...... 扣除了一個");
			countDownLatch.countDown(); // 減少(扣除)一個 閉鎖釦除點,不會阻塞方法的繼續運行

			SleepTools.ms(5); // 需要5毫秒處理邏輯
			System.out.println("Thread " + Thread.currentThread().getName() + " ready other work......");
		}

	}

	// 線程1,運行一次扣除兩個閉鎖釦除點
	private static class initRunnable2 implements Runnable {

		@Override
		public void run() {
			SleepTools.ms(5);
			System.out.println("兩個扣除點的Thread " + Thread.currentThread().getName() + " - init work1...... 扣除了一個");
			countDownLatch.countDown(); // 減少(扣除)一個 閉鎖釦除點

			SleepTools.ms(5);
			System.out.println("兩個扣除點的Thread " + Thread.currentThread().getName() + " - init work2......扣除了一個");
			countDownLatch.countDown(); // 減少(扣除)一個 閉鎖釦除點

			SleepTools.ms(5);
			System.out.println("兩個扣除點的Thread " + Thread.currentThread().getName() + " ready other work......");
		}

	}

	/** * 業務線程,需要等待其他線程的運行 */
	private static class businessRunnable implements Runnable {

		@Override
		public void run() {
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("Thread " + Thread.currentThread().getName() + " handing business.....");
		}
	}

	public static void main(String[] args) throws InterruptedException {
		new Thread(new initRunnable2()).start();
		for (int i = 0; i < 4; i++) {
			new Thread(new initRunnable()).start();
		}
		new Thread(new businessRunnable()).start(); // 啓動業務線程

		countDownLatch.await(); // 主線程需要等待其他線程的運行完畢再繼續運行
		System.out.println("Thread " + Thread.currentThread().getName() + " goto other work......");
	}

}
CyclicBarrier

CyclicBarrier(int parties)

讓一組線程到達一個屏障點(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,所有被屏障攔截的線程纔會繼續運行.

CyclicBarrier(int parties, Runnable barrierAction)

可以在一組線程到達屏障點時,優先執行barrierAction線程,當barrierAction線程執行完畢後,屏障纔會開門,所有被屏障攔截的線程纔會繼續執行

//使用CyclicBarrier(int parties, Runnable barrierAction)的示例
public class MyCyclicBarrier {

	private static CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new CollectThread());
	private static ConcurrentMap<String, Long> chm = new ConcurrentHashMap<>();

	private static class MyRunnable implements Runnable {

		@Override
		public void run() {

			System.out.println("Thread " + Thread.currentThread().getId() + " ....init " + System.currentTimeMillis());

			long id = Thread.currentThread().getId();
			chm.put("" + id, id);
			Random r = new Random();
			if (r.nextBoolean()) {
				SleepTools.second(2); // 模擬業務,執行消耗時間
				System.out.println("Thread " + id + " ....do something ");
			}

			try {
				cyclicBarrier.await();
			} catch (InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}

			System.out.println("Thread " + id + " ....do other something " + System.currentTimeMillis());
		}

	}

	private static class CollectThread implements Runnable {

		@Override
		public void run() {
			StringBuilder result = new StringBuilder();
			for (Map.Entry<String, Long> workResult : chm.entrySet()) {
				result.append("[" + workResult.getValue() + "] ");
			}
			SleepTools.second(2); // 模擬業務,執行消耗時間
			System.out.println("Result ... " + result.toString());
		}

	}

	public static void main(String[] args) {
		for (int i = 0; i < 6; i++) {
			new Thread(new MyRunnable()).start();
		}
	}

}
Semaphore

用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。常用於做流量控制,特別是公共資源有限的應用場景,比如數據庫連接池,有界緩存等。

//使用Semaphore實現的一個數據庫連接池
public class DBPoolSemaphore {

	private final static int POOL_SIZE = 10;
	private static LinkedList<Connection> pool = new LinkedList<>();

	private final Semaphore useful, useless;// 兩個指示器,分別表示池子還有可用連接和已用連接

	static {
		for (int i = 0; i < POOL_SIZE; i++) {
			pool.addLast(SqlConnectImpl.fetchConnection());
		}
	}

	public DBPoolSemaphore() {
		this.useful = new Semaphore(10);
		this.useless = new Semaphore(0);
	}

	/** * 從池中獲取一個連接 * * @return * @throws InterruptedException */
	public Connection takeConnect() throws InterruptedException {
		useful.acquire(); // useful - 1
		Connection connection;
		synchronized (pool) {
			connection = pool.removeFirst();
		}
		useless.release(); // useless + 1
		return connection;
	}

	// 歸還連接
	public void returnConnect(Connection connection) throws InterruptedException {
		if (connection == null) {
			return;
		}

		System.out.println("當前有" + useful.getQueueLength() + "個線程等待數據庫連接!!" + "可用連接數:" + useful.availablePermits());
		useless.acquire(); // useless - 1
		synchronized (pool) {
			pool.addLast(connection);
		}
		useful.release(); // useful + 1
	}

}
//數據庫連接的平庸實現
public class SqlConnectImpl implements Connection {

	/* 拿一個數據庫連接 */
	public static final Connection fetchConnection() {
		return new SqlConnectImpl();
	}
    
    ........
}
//測試,Semaphore
public class AppTest {

	private static DBPoolSemaphore dbPoolSemaphore = new DBPoolSemaphore();

	private static class myRunnable implements Runnable {

		@Override
		public void run() {
			try {
				long start = System.currentTimeMillis();
				Random r = new Random();
				Connection takeConnect = dbPoolSemaphore.takeConnect();
				System.out.println(Thread.currentThread().getName() + " 獲取了數據庫連接,共耗時["
						+ (System.currentTimeMillis() - start) + "ms]");

				SleepTools.ms(100 + r.nextInt(100)); // 模擬數據查詢耗時
				dbPoolSemaphore.returnConnect(takeConnect);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		for (int i = 0; i < 50; i++) {
			new Thread(new myRunnable()).start();
		}
	}

}
Exchange

用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。

建議同時只能有兩個線程進行Exchange數據交換,當多餘兩個線程時不能確定獲得的數據是從你想要的線程中獲取的

Exchange還提供了一個超時的exchange(V x, long timeout, TimeUnit unit)方法

//使用Exchange的示例
public class MyExchange {

	private static Exchanger<String> ex = new Exchanger<>();

	private static class myRunnable1 implements Runnable {

		@Override
		public void run() {
			String s = "A";
			try {
				String exchange = ex.exchange(s);
				System.out.println(Thread.currentThread().getName() + " 交換前 : " + s + " 交換後 :" + exchange);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private static class myRunnable2 implements Runnable {

		@Override
		public void run() {
			String s = "B";
			try {
				String exchange = ex.exchange(s);
				System.out.println(Thread.currentThread().getName() + " 交換前 : " + s + " 交換後 :" + exchange);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}
	}

	public static void main(String[] args) {
		new Thread(new myRunnable1()).start();
		new Thread(new myRunnable2()).start();
	}

}
Callable、Future和FutureTask

Future接口用來表示異步計算的結果.可以對具體的任務進行取消,查詢是否完成和獲得任務結果
​ boolean cancel(boolean mayInterruptIfRunning); //試圖去取消任務返回true取消成功,false取消失敗
​ boolean isCancelled(); //如果任務在完成前取消,返回true
​ boolean isDone(); //任務是否結束,結束返回true。結束的原因有多種,可能是正常結束,異常,取消
​ V get() ; //等待任務結束獲取結果
​ V get(long timeout, TimeUnit unit) ; //get() 的超時方法

FutureTask爲Future 和 Runnable 的實現類.提供了FutureTask(Callable callable) 和FutureTask(Runnable runnable, V result) 的實現

// Callable、Future、FutureTask的使用示例 
public class MyFutureTask {

	private static class myCalable implements Callable<Integer> {

		@Override
		public Integer call() throws Exception {
			int sum = 0;
			Thread.sleep(1000);
			// SleepTools.ms(1000); throws InterruptedException
			for (int i = 0; i < 5000; i++) {
				sum = sum + i;
			}
			System.out.println(Thread.currentThread().getName() + " 計算結果= " + sum);
			return sum;
		}

	}

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		myCalable myCalable = new myCalable();
		FutureTask<Integer> futureTask = new FutureTask<>(myCalable);
		new Thread(futureTask).start();

		Random random = new Random();
		if (random.nextBoolean()) {
			System.out.println("獲取線程結果=" + futureTask.get());
		} else {
            /*cancel(true)中斷線程 cancel()是用interrupt()實現的 所以要注意當線程中拋出InterruptedException異常時,線程不會中斷,需要在拋出異常時再次調用中斷 操作纔可*/
            futureTask.cancel(true);
		   System.out.println("中斷了計算線程");
		}
	}

}

線程安全

什麼是線程安全

不管調用者如何調用一個類,這個類都能表現出正常的行爲

如何保證線程安全
  • 無狀態類

    ​ 沒有成員變量只有成員方法的類被稱之爲無狀態類

  • 讓類不可變

    ​ 有成員變量,但類中的成員變量都是使用final修飾的不可變變量,稱該類不可變

  • volatile

  • 加鎖

  • CAS

死鎖

簡單的死鎖

解決方式:在多個線程中,保證所有線程拿鎖的順序一直

//反例 :簡單的死鎖
public class NormalDeadLock {

	private static Object o1 = new Object(); // 第一個鎖
	private static Object o2 = new Object(); // 第一個鎖

	private static class MyRunnable implements Runnable {

		@Override
		public void run() {

			synchronized (o1) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				SleepTools.ms(100);
				synchronized (o2) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				}
			}

		}

	}

	private static class MyRunnable1 implements Runnable {

		@Override
		public void run() {

			synchronized (o2) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				SleepTools.ms(100);
				synchronized (o1) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				}
			}

		}

	}

	public @Override
		public void run() {

			synchronized (o1) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				SleepTools.ms(100);
				synchronized (o2) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				}
			}

		}

	}

	private static class MyRunnable1 implements Runnable {

		@Override
		public void run() {

			synchronized (o2) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				SleepTools.ms(100);
				synchronized (o1) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				}
			}

		}

	}

	public static void main(String[] args) {
		new Thread(new MyRunnable()).start();
		new Thread(new MyRunnable1()).start();
	}

}
//正例:避免簡單的死鎖
public class NormalDeadLock {

	private static Object o1 =@Override
		public void run() {

			synchronized (o1) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				SleepTools.ms(100);
				synchronized (o2) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				}
			}

		}

	}

	private static class MyRunnable1 implements Runnable {

		@Override
		public void run() {

			synchronized (o2) {
				System.out.println(Thread.currentThread().getName() + " I get hold of o2...");
				SleepTools.ms(100);
				synchronized (o1) {
					System.out.println(Thread.currentThread().getName() + " I get hold of o1...");
				}
			}

		}

	}

	public static void main(String[] args) {
		new Thread(new MyRunnable()).start();
		new Thread(new MyRunnable1()).start();
	}

}
//正例:避免簡單的死鎖
public class NormalDeadLock {

	private static Object o1 = new Object(); // 第一個鎖
	private static Object o2 = new Object(); // 第一個鎖

	private static class MyRunnable implements Runnable {

		@Override
		public void run() {

			synchronized (o1) {
				System.out.println(Thread.currentThread(
相關文章
相關標籤/搜索