[高併發Java 六] JDK併發包2

1. 線程池的基本使用

1.1.爲何須要線程池

平時的業務中,若是要使用多線程,那麼咱們會在業務開始前建立線程,業務結束後,銷燬線程。可是對於業務來講,線程的建立和銷燬是與業務自己無關的,只關心線程所執行的任務。所以但願把儘量多的cpu用在執行任務上面,而不是用在與業務無關的線程建立和銷燬上面。而線程池則解決了這個問題,線程池的做用就是將線程進行復用。 java

1.2.JDK爲咱們提供了哪些支持


JDK中的相關類圖如上圖所示。

其中要提到的幾個特別的類。 算法

Callable類和Runable類類似,可是區別在於Callable有返回值。 設計模式

ThreadPoolExecutor是線程池的一個重要實現。 api

而Executors是一個工廠類。 緩存

1.3.線程池的使用

1.3.1.線程池的種類

  • new FixedThreadPool 固定數量的線程池,線程池中的線程數量是固定的,不會改變。
  • new SingleThreadExecutor 單一線程池,線程池中只有一個線程。
  • new CachedThreadPool 緩存線程池,線程池中的線程數量不固定,會根據需求的大小進行改變。
  • new ScheduledThreadPool 計劃任務調度的線程池,用於執行計劃任務,好比每隔5分鐘怎麼樣,
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}


public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}


public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
從方法上來看,顯然 FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是ThreadPoolExecutor的不一樣實例,只是參數不一樣。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
下面來簡述下 ThreadPoolExecutor構造函數中參數的含義。
  • corePoolSize 線程池中核心線程數的數目
  • maximumPoolSize 線程池中最多能容納多少個線程
  • keepAliveTime 當如今線程數目大於corePoolSize時,超過keepAliveTime時間後,多出corePoolSize的那些線程將被終結。
  • unit keepAliveTime的單位
  • workQueue 當任務數量很大,線程池中線程沒法知足時,提交的任務會被放到阻塞隊列中,線程空閒下來則會不斷從阻塞隊列中取數據。

這樣在來看上面所說的FixedThreadPool,它的線程的核心數目和最大容納數目都是同樣的,以致於在工做期間,並不會建立和銷燬線程。當任務數量很大,線程池中的線程沒法知足時,任務將被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。這就意味着,任務不斷地添加,會使內存消耗愈來愈大。 安全

CachedThreadPool則不一樣,它的核心線程數量是0,最大容納數目是Integer.MAX_VALUE,它的阻塞隊列是SynchronousQueue,這是一個特別的隊列,它的大小是0。因爲核心線程數量是0,因此必然要將任務添加到SynchronousQueue中,這個隊列只有一個線程在從中添加數據,同時另外一個線程在從中獲取數據時,才能成功。獨自往這個隊列中添加數據會返回失敗。當返回失敗時,則線程池開始擴展線程,這就是爲何CachedThreadPool的線程數目是不固定的。當60s該線程仍未被使用時,線程則被銷燬。 多線程

1.4.線程池使用的小例子

1.4.1.簡單線程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + "Thread ID:"
					+ Thread.currentThread().getId());
			try {
				Thread.sleep(1000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		MyTask myTask = new MyTask();
		ExecutorService es = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			es.submit(myTask);
		}
	}
}
因爲使用的newFixedThreadPool(5),可是啓動了10個線程,因此每次執行5個,而且 能夠很明顯的看到線程的複用,ThreadId是重複的,也就是前5個任務和後5個任務都是同一批線程去執行的。

這裏用的是 併發

es.submit(myTask);
還有一種提交方式:
es.execute(myTask);
區別在於submit會返回一個Future對象,這個將在之後介紹。

1.4.2.ScheduledThreadPool

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class ThreadPoolDemo {
	public static void main(String[] args) {
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
		//若是前面的任務還未完成,則調度不會啓動。
		ses.scheduleWithFixedDelay(new Runnable() {
			
			@Override
			public void run() {
				try {
					Thread.sleep(1000);
					System.out.println(System.currentTimeMillis()/1000);
				} catch (Exception e) {
					// TODO: handle exception
				}
				
			}
		}, 0, 2, TimeUnit.SECONDS);//啓動0秒後執行,而後週期2秒執行一次
	}
}

輸出: 分佈式

1454832514
1454832517
1454832520
1454832523
1454832526
...
因爲任務執行須要1秒,任務調度必須等待前一個任務完成。也就是這裏的每隔2秒的意思是,前一個任務完成後2秒再開啓新的一個任務。

2. 擴展和加強線程池

2.1.回調接口

線程池中有一些回調的api來給咱們提供擴展的操做。 ide

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>()){

					@Override
					protected void beforeExecute(Thread t, Runnable r) {
						System.out.println("準備執行");
					}

					@Override
					protected void afterExecute(Runnable r, Throwable t) {
						System.out.println("執行完成");
					}

					@Override
					protected void terminated() {
						System.out.println("線程池退出");
					}
			
		};

咱們能夠經過實現ThreadPoolExecutor的子類去覆蓋ThreadPoolExecutor的beforeExecute,afterExecute,terminated方法來實如今線程執行先後,線程池退出時的日誌管理或其餘操做。

2.2.拒絕策略

有時候,任務很是繁重,致使系統負載太大。在上面說過,當任務量愈來愈大時,任務都將放到FixedThreadPool的阻塞隊列中,致使內存消耗太大,最終致使內存溢出。這樣的狀況是應該要避免的。所以當咱們發現線程數量要超過最大線程數量時,咱們應該放棄一些任務。丟棄時,咱們應該把任務記下來,而不是直接丟掉。

ThreadPoolExecutor中還有另外一個構造函數。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
threadFactory咱們在後面再介紹。

而handler就是拒絕策略的實現,它會告訴咱們,若是任務不能執行了,該怎麼作。


共有以上4種策略。

AbortPolicy:若是不能接受任務了,則拋出異常。

CallerRunsPolicy:若是不能接受任務了,則讓調用的線程去完成。

DiscardOldestPolicy:若是不能接受任務了,則丟棄最老的一個任務,由一個隊列來維護。

DiscardPolicy:若是不能接受任務了,則丟棄任務。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(),
				new RejectedExecutionHandler() {

					@Override
					public void rejectedExecution(Runnable r,
							ThreadPoolExecutor executor) {
						System.out.println(r.toString() + "is discard");
					}
				});
固然咱們也能夠本身實現RejectedExecutionHandler接口來本身定義拒絕策略。

2.3.自定義ThreadFactory

剛剛已經看到了,在ThreadPoolExecutor的構造函數中能夠指定threadFactory

線程池中的線程都是由線程工廠建立出來,咱們能夠自定義線程工廠。

默認的線程工廠:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

3. ForkJoin

3.1.思想

就是分而治之的思想。


fork/join相似MapReduce算法,二者區別是:Fork/Join 只有在必要時如任務很是大的狀況下才分割成一個個小任務,而 MapReduce老是在開始執行第一步進行分割。看來,Fork/Join更適合一個JVM內線程級別,而MapReduce適合分佈式系統。

4.2.使用接口

RecursiveAction:無返回值

RecursiveTask:有返回值

4.3.簡單例子

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;


public class CountTask extends RecursiveTask<Long>{

	private static final int THRESHOLD = 10000;
	private long start;
	private long end;
	
	public CountTask(long start, long end) {
		super();
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;
		if(canCompute)
		{
			for (long i = start; i <= end; i++) {
				sum = sum + i;
			}
		}else
		{
			//分紅100個小任務
			long step = (start + end)/100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for (int i = 0; i < 100; i++) {
				long lastOne = pos + step;
				if(lastOne > end )
				{
					lastOne = end;
				}
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step + 1;
				subTasks.add(subTask);
				subTask.fork();//把子任務推向線程池
			}
			for (CountTask t : subTasks) {
				sum += t.join();//等待全部子任務結束
			}
		}
		return sum;
	}
	
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0, 200000L);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			long res = result.get();
			System.out.println("sum = " + res);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

}
上述例子描述了一個累加和的任務。將累加任務分紅100個任務,每一個任務只執行一段數字的累加和,最後join後,把每一個任務計算出的和再累加起來。

4.4.實現要素

4.4.1.WorkQueue與ctl

每個線程都會有一個工做隊列

static final class WorkQueue
在工做隊列中,會有一系列對線程進行管理的字段
volatile int eventCount;   // encoded inactivation count; < 0 if inactive
        int nextWait;              // encoded record of next event waiter
        int nsteals;               // number of steals
        int hint;                  // steal index hint
        short poolIndex;           // index of this queue in pool
        final short mode;          // 0: lifo, > 0: fifo, < 0: shared
        volatile int qlock;        // 1: locked, -1: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        ForkJoinTask<?> currentSteal; // current non-local task being executed
這裏要注意的是,JDK7和JDK8在ForkJoin的實現上有了很大的差異。咱們這裏介紹的是JDK8中的。 在線程池中,有時不是全部的線程都在執行的,部分線程會被掛起,那些掛起的線程會被存放到一個棧中。內部經過一個鏈表表示。

nextWait會指向下一個等待的線程。

poolIndex線程在線程池中的下標索引。

eventCount 在初始化時,eventCount與poolIndex有關。總共32位,第一位表示是否被激活,15位表示被掛起的次數eventCount,剩下的表示poolIndex。用一個字段來表示多個意思。

工做隊列WorkQueue用ForkJoinTask<?>[] array來表示。而top,base來表示隊列的兩端,數據在這二者之間。

在ForkJoinPool中維護着ctl(64位long型)

volatile long ctl;

* Field ctl is a long packed with:
     * AC: Number of active running workers minus target parallelism (16 bits)
     * TC: Number of total workers minus target parallelism (16 bits)
     * ST: true if pool is terminating (1 bit)
     * EC: the wait count of top waiting thread (15 bits)
     * ID: poolIndex of top of Treiber stack of waiters (16 bits)
AC表示活躍的線程數減去並行度(大概就是CPU個數)

TC表示總的線程數減去並行度

ST表示線程池自己是不是激活的

EC表示頂端等待線程的掛起數

ID表示頂端等待線程的poolIndex

很明顯ST+EC+ID就是咱們剛剛所說的 eventCount 。

那麼爲何明明5個變量,非要合成一個變量呢。其實用5個變量佔用容量也差很少。

用一個變量代碼的可讀性上會差不少。

那麼爲何用一個變量呢?其實這點纔是最巧妙的地方,由於這5個變量是一個總體,在多線程中,若是用5個變量,那麼當修改其中一個變量時,如何保證5個變量的總體性。那麼用一個變量則就解決了這個問題。若是用鎖解決,則會下降性能。

用一個變量則保證了數據的一致性和原子性。

在ForkJoin中隊ctl的更改都是使用CAS操做,在前面系列的文章中已經介紹過,CAS是無鎖的操做,性能很好。

因爲CAS操做也只能針對一個變量,因此這種設計是最優的。

4.4.2.工做竊取

接下來要介紹下整個線程池的工做流程。

每一個線程都會調用runWorker

final void runWorker(WorkQueue w) {
        w.growArray(); // allocate queue
        for (int r = w.hint; scan(w, r) == 0; ) {
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
scan()函數是掃描是否有任務要作。

r是一個相對隨機的數字。

private final int scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        long c = ctl;                            // for consistency check
        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
            for (int j = m + m + 1, ec = w.eventCount;;) {
                WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                if ((q = ws[(r - j) & m]) != null &&
                    (b = q.base) - q.top < 0 && (a = q.array) != null) {
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>)
                              U.getObjectVolatile(a, i))) != null) {
                        if (ec < 0)
                            helpRelease(c, ws, w, q, b);
                        else if (q.base == b &&
                                 U.compareAndSwapObject(a, i, t, null)) {
                            U.putOrderedInt(q, QBASE, b + 1);
                            if ((b + 1) - q.top < 0)
                                signalWork(ws, q);
                            w.runTask(t);
                        }
                    }
                    break;
                }
                else if (--j < 0) {
                    if ((ec | (e = (int)c)) < 0) // inactive or terminating
                        return awaitWork(w, c, ec);
                    else if (ctl == c) {         // try to inactivate and enqueue
                        long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
                        w.nextWait = e;
                        w.eventCount = ec | INT_SIGN;
                        if (!U.compareAndSwapLong(this, CTL, c, nc))
                            w.eventCount = ec;   // back out
                    }
                    break;
                }
            }
        }
        return 0;
    }
咱們接下來看看scan方法,scan的一個參數是WorkQueue,上面已經說過,每一個線程都會擁有一個WorkQueue,那麼多個線程的WorkQueue就會保存在workQueues裏面,r是一個隨機數,經過r來找到某一個 WorkQueue,在WorkQueue裏面有所要作的任務。

而後經過WorkQueue的base,取得base的偏移量。

b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..
而後經過偏移量獲得最後一個的任務,運行這個任務

t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..
經過這個大概的分析理解了過程,咱們發現,當前線程調用scan方法後,不會執行當前的WorkQueue中的任務,而是經過一個隨機數r,來獲得其餘 WorkQueue的任務。這就是ForkJoinPool的主要的一個機理。

當前線程不會只着眼於本身的任務,而是優先完成其餘任務。這樣作來,防止了飢餓現象的發生。這樣就預防了某些線程由於卡死或者其餘緣由而沒法及時完成任務,或者某個線程的任務量很大,其餘線程卻沒事可作。

而後來看看runTask方法

final void runTask(ForkJoinTask<?> task) {
            if ((currentSteal = task) != null) {
                ForkJoinWorkerThread thread;
                task.doExec();
                ForkJoinTask<?>[] a = array;
                int md = mode;
                ++nsteals;
                currentSteal = null;
                if (md != 0)
                    pollAndExecAll();
                else if (a != null) {
                    int s, m = a.length - 1;
                    ForkJoinTask<?> t;
                    while ((s = top - 1) - base >= 0 &&
                           (t = (ForkJoinTask<?>)U.getAndSetObject
                            (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
                        top = s;
                        t.doExec();
                    }
                }
                if ((thread = owner) != null) // no need to do in finally clause
                    thread.afterTopLevelExec();
            }
        }
有一個有趣的命名:currentSteal,偷得的任務,的確是剛剛解釋的那樣。

task.doExec();
將會完成這個任務。

完成了別人的任務之後,將會完成本身的任務。

經過獲得top來得到本身任務第一個任務

while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null)
{
       top = s;
       t.doExec();
}
接下來,經過一個圖來總結下剛剛線程池的流程


好比有T1,T2兩個線程,T1會經過T2的base來得到T2的最後一個任務(固然其實是經過一個隨機數r來取得某個線程最後一個任務),T1也會經過本身的top來執行本身的第一個任務。反之,T2也會如此。

拿其餘線程的任務都是從base開始拿的,本身拿本身的任務是從top開始拿的。這樣能夠減小衝突

若是沒有找到其餘任務

else if (--j < 0) {
                    if ((ec | (e = (int)c)) < 0) // inactive or terminating
                        return awaitWork(w, c, ec);
                    else if (ctl == c) {         // try to inactivate and enqueue
                        long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
                        w.nextWait = e;
                        w.eventCount = ec | INT_SIGN;
                        if (!U.compareAndSwapLong(this, CTL, c, nc))
                            w.eventCount = ec;   // back out
                    }
                    break;
                }
那麼首先會經過一系列運行來改變ctl的值,得到了nc,而後用CAS將新的值賦值。而後就調用awaitWork()將線程進入等待狀態(調用的 前面系列文章中提到的unsafe的park方法)。

這裏要說明的是改變ctl值這裏,首先是將ctl中的AC-1,AC是佔ctl的前16位,因此不能直接-1,而是經過AC_UNIT(0x1000000000000)來達到使ctl的前16位-1的效果。

前面說過eventCount中有保存poolIndex,經過poolIndex以及WorkQueue中的nextWait,就能遍歷全部的等待線程。





系列:

[高併發Java 一] 前言

[高併發Java 二] 多線程基礎

[高併發Java 三] Java內存模型和線程安全

[高併發Java 四] 無鎖

[高併發Java 五] JDK併發包1

[高併發Java 六] JDK併發包2

[高併發Java 七] 併發設計模式

[高併發Java 八] NIO和AIO

[高併發Java 九] 鎖的優化和注意事項

[高併發Java 十] JDK8對併發的新支持


Reference:

1. http://www.jdon.com/45364
相關文章
相關標籤/搜索