ForkJoinPool 主要用於實現「分而治之」的算法,特別是分治以後遞歸調用的函數,例如 quick sort 等。
ForkJoinPool 最適合的是計算密集型的任務,若是存在 I/O,線程間同步,sleep() 等會形成線程長時間阻塞的狀況時,最好配合使用 ManagedBlocker。java
static { // initialize field offsets for CAS etc try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); RUNSTATE = U.objectFieldOffset (k.getDeclaredField("runState")); STEALCOUNTER = U.objectFieldOffset (k.getDeclaredField("stealCounter")); Class<?> tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class<?> wk = WorkQueue.class; QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); QSCANSTATE = U.objectFieldOffset (wk.getDeclaredField("scanState")); QPARKER = U.objectFieldOffset (wk.getDeclaredField("parker")); QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); QCURRENTJOIN = U.objectFieldOffset (wk.getDeclaredField("currentJoin")); Class<?> ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { public ForkJoinPool run() { return makeCommonPool(); }}); int par = common.config & SMASK; // report 1 even if threads disabled commonParallelism = par > 0 ? par : 1; }
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }
經過代碼指定,必須得在commonPool初始化以前(parallel的stream被調用以前,通常可在系統啓動後設置)注入進去,不然沒法生效。
經過啓動參數指定無此限制,較爲安全算法
parallelism(即配置線程池個數
)
能夠經過java.util.concurrent.ForkJoinPool.common.parallelism進行配置,最大值不能超過MAX_CAP,即32767.編程
static final int MAX_CAP = 0x7fff; //32767
若是沒有指定,則默認爲Runtime.getRuntime().availableProcessors() - 1.安全
代碼指定(必須得在commonPool初始化以前注入進去,不然沒法生效
)併發
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
或者參數指定app
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
threadFactory
默認爲defaultForkJoinWorkerThreadFactory,沒有securityManager的話。less
/** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread. */ static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } }
代碼指定(必須得在commonPool初始化以前注入進去,不然沒法生效
)dom
System.setProperty("java.util.concurrent.ForkJoinPool.common.threadFactory",YourForkJoinWorkerThreadFactory.class.getName());
參數指定ide
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=com.xxx.xxx.YourForkJoinWorkerThreadFactory
exceptionHandler
若是沒有設置,默認爲null函數
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
代碼指定(必須得在commonPool初始化以前注入進去,不然沒法生效
)
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",YourUncaughtExceptionHandler.class.getName());
參數指定
-Djava.util.concurrent.ForkJoinPool.common.exceptionHandler=com.xxx.xxx.YourUncaughtExceptionHandler
// Mode bits for ForkJoinPool.config and WorkQueue.config static final int MODE_MASK = 0xffff << 16; // top half of int static final int LIFO_QUEUE = 0; static final int FIFO_QUEUE = 1 << 16; static final int SHARED_QUEUE = 1 << 31; // must be negative
控制是FIFO仍是LIFO
/** * Takes next task, if one exists, in order specified by mode. */ final ForkJoinTask<?> nextLocalTask() { return (config & FIFO_QUEUE) == 0 ? pop() : poll(); }
ForkJoinPool 的每一個工做線程都維護着一個工做隊列(WorkQueue),這是一個雙端隊列(Deque),裏面存放的對象是任務(ForkJoinTask)。
每一個工做線程在運行中產生新的任務(一般是由於調用了 fork())時,會放入工做隊列的隊尾,而且工做線程在處理本身的工做隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
每一個工做線程在處理本身的工做隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool的任務,或是來自於其餘工做線程的工做隊列),竊取的任務位於其餘線程的工做隊列的隊首,也就是說工做線程在竊取其餘工做線程的任務時,使用的是 FIFO 方式。
queue capacity
/** * Capacity of work-stealing queue array upon initialization. * Must be a power of two; at least 4, but should be larger to * reduce or eliminate cacheline sharing among queues. * Currently, it is much larger, as a partial workaround for * the fact that JVMs often place arrays in locations that * share GC bookkeeping (especially cardmarks) such that * per-write accesses encounter serious memory contention. */ static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
超出報異常
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */ final ForkJoinTask<?>[] growArray() { ForkJoinTask<?>[] oldA = array; int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array ForkJoinTask<?> x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); if (x != null && U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); } while (++b != t); } return a; }