【java多線程】Java1.5後的多線程框架

內容主要包括之Executors,Executor,ExecutorService,CompletionService,Future,CountDownLauch,Callable,Runnablejava

在講1.5以後的多線程模式以前,先簡單的說一下1.5以前的線程模式,簡單的說就是Thread+Runnable模式。Thread是線程,Runnable能夠看作是線程要執行的任務。1.5以前的線程模式的一些缺點:linux

1 管理多線程有諸多不便,好比:多個線程建立,須要屢次new Thread;線程生命週期的管理,和多線程同步,須要對Java多線程的實現有比較深刻的瞭解多線程

2 Runnable沒有結果返回,須要某種機制等待線程執行結束,經過通知或者回調獲取線程執行的結果框架

Executors線程框架
因爲時間關係,UML簡化了,這裏畫出了本人認爲最重要的幾個類dom



類的簡要說明
Executors
能夠看作是工廠類,主要用於建立和管理Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callableide

interface ExecutrService
我的以爲看以把ExecutorService當作是多線程的容器,主要用於管理多線程,能夠往容器中提交線程,由容器負責線程的生命週期,包括啓動,執行,結束線程. 經過上面的類圖能夠看到咱們能夠往容器中提交實現Callable或者Runnable接口的類;往容器中提交Callable的實例,將會返回一個Future的對象,這個Future對象能夠用於簡單的控制Callable對應的線程函數

interface Callable
跟Runable相似,一樣是封裝了線程須要的執行任務,執行任務在Callable的call方法中實現,call方法能夠當作是Runnable的run方法,惟一不一樣的是call方法,有一個返回值。咱們知道call或者run方法,是由線程框架本身去調用的,那麼線程執行結束以後,咱們如何獲取call方法的返回值呢?咱們知道,當往容器中提交Callable實例時,會返回Future類的一個實例。經過Future類的實例,咱們能夠獲取線程結束後(也就是call函數執行完畢)的返回值this

interface Future
這個類的基本功能其實在ExecutorService和Callable中已經有提到了,每次往容器中提交Callable或者Runnable任務,都會返回Future的實例;該實例能夠簡單的管理執行Callable或者Runnable的線程,主要用於獲取線程的執行結果,或者查看線程的當前執行狀態spa

interface CompletionService<V>
當咱們向容器中提交多個任務時,建設咱們提交任務的順序是task1,task2,task3,咱們知道容器是多線程執行任務的,因此任務完成的順序有可能和任務提交的順序不同,好比任務完成的順序有多是task2,task1,task3;CompletionService提供了一種機制,能夠理解成維護了一個任務完成隊列,當容器中有任務完成時,會加入到CompletionService的完成隊列中,那麼咱們經過CompletionService的take 或者 poll方法,每次都能獲取剩餘任務中,最早執行完的任務線程

Interface ScheduledExecutorService
用於定時,週期性的執行某一個任務,某些狀況下能夠代替Timer類

CountDownLatch
這個類在類圖中沒有畫出來,主要用於多線程之間的同步。有點相似Semaphore,當計數值等於0時,將觸發事件。

舉例1:

Executor executor = Executors.newFixedThreadPool(10);
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("task executing");
}
};
executor.execute(task);

executor = Executors.newScheduledThreadPool(10);
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);

舉例2:

package com.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 內核多線程求和,將數據劃分紅多個數據塊,每一個數據塊交給一個cpu內核線程去計數和,最後在彙總全部數據的和
* http://www.linuxidc.com/Linux/2014-02/96879.htm
* Java1.5後的多線程框架
**/
public class ConcurrentCalculator {
	private ExecutorService exec;
	private CompletionService<Long> completionService;
	private int cpuCoreNumber;

	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numers, int start, int end) {
			this.numbers = numers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}

	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);
	}

	public Long sum(final int[] numbers) {
		// 根據CPU核心個數拆分任務,建立FutureTask並提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);
			}
		}
		return getResult();
	}

	/**
	 * 迭代每一個只任務,得到部分和,相加返回
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {
			try {
				Long subSum = completionService.take().get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}


舉例3:

package com.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * http://www.linuxidc.com/Linux/2014-02/96879.htm
 * Java1.5後的多線程框架
 * @author liushuo
 */
public class CountDownLatchDemo {
	private static final int PLAYER_AMOUNT = 5;

	public CountDownLatchDemo() {
		// TODO Auto-generated constructor stub
	}
	
	public void test() {
		// 對於每位運動員,CountDownLatch減1後即結束比賽
		CountDownLatch begin = new CountDownLatch(1);
		// 對於整個比賽,全部運動員結束後纔算結束
		CountDownLatch end = new CountDownLatch(PLAYER_AMOUNT);
		Player[] plays = new Player[PLAYER_AMOUNT];
		for (int i = 0; i < PLAYER_AMOUNT; i++)
			plays[i] = new Player(i + 1, begin, end);
		// 設置特定的線程池,大小爲5
		ExecutorService exe = Executors.newFixedThreadPool(PLAYER_AMOUNT);
		for (Player p : plays)
			exe.execute(p); // 分配線程
		System.out.println("Race begins!");
		begin.countDown();
		try {
			end.await(); // 等待end狀態變爲0,即爲比賽結束
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		} finally {
			System.out.println("Race ends!");
		}
		exe.shutdown();
	}

	public class Player implements Runnable {
		private int id;
		private CountDownLatch begin;
		private CountDownLatch end;

		public Player(int i, CountDownLatch begin, CountDownLatch end) {
			// TODO Auto-generated constructor stub
			super();
			this.id = i;
			this.begin = begin;
			this.end = end;
		}

		@Override
		public void run() {  
			// TODO Auto-generated method stub
			try {
				begin.await(); // 等待begin的狀態爲0
				Thread.sleep((long) (Math.random() * 100)); // 隨機分配時間,即運動員完成時間
				System.out.println("Play" + id + " arrived.");
			} catch (InterruptedException e) {
				// TODO: handle exception
				e.printStackTrace();
			} finally {
				end.countDown(); // 使end狀態減1,最終減至0
			}
		}
	}
}
相關文章
相關標籤/搜索