《java.util.concurrent 包源碼閱讀》23 Fork/Join框架之Fork的冰山一角

上篇文章一直追蹤到了ForkJoinWorkerThread的pushTask方法,仍然沒有辦法解釋Fork的原理,那麼不妨來看看ForkJoinWorkerThread的run方法:算法

    public void run() { Throwable exception = null; try { // 初始化任務隊列  onStart(); // 線程運行 pool.work(this); } catch (Throwable ex) { exception = ex; } finally { // 結束後的工做  onTermination(exception); } }

所以咱們須要再次回到ForkJoinPool,看看work方法:多線程

    final void work(ForkJoinWorkerThread w) { boolean swept = false; // 下面scan方法沒有掃描到任務返回true long c; // ctl是一個64位長的數據,它的格式以下: // 48-63:AC,正在運行的worker線程數減去系統的併發數(減去系統的併發得出的實際是在某一瞬間等待併發資源的線程數量) // 32-47:TC,全部的worker線程數減去系統的併發數 // 31: ST,1表示線程池正在關閉 // 16-30:EC,第一個等待線程的等待數 // 0- 15:ID,Treiber棧(存儲等待線程)頂的worker線程在線程池的線程隊列中的索引 // (int)(c = ctl) >= 0表示ST位爲0,即線程池不是正在關閉的狀態 while (!w.terminate && (int)(c = ctl) >= 0) { int a; // 正在運行的worker線程數,ctl中的AC部分 // swept爲false可能有三種: // 1. scan返回false // 2. 首次循環 // 3. tryAwaitWork成功 if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) swept = scan(w, a); else if (tryAwaitWork(w, c)) swept = false; } }

接下來分析scan方法,我認可我看得有點暈。併發

    private boolean scan(ForkJoinWorkerThread w, int a) {
        int g = scanGuard; // mask 0 avoids useless scans if only one active
        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
        ForkJoinWorkerThread[] ws = workers;
        if (ws == null || ws.length <= m)         // staleness check
            return false;
        // 代碼看起來暈啊,看來當前的ForkJoinWorkerThread不必定是運行本身的
        // Task,能夠運行其餘ForkJoinWorkerThread的Task。
        // 彷佛有點明白了,這樣能夠實現Fork出來的任務被多線程執行
        // 看起來這是一個較爲複雜的算法
        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[k & m];
            if (v != null && (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && v.queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    int d = (v.queueBase = b + 1) - v.queueTop;
                    v.stealHint = w.poolIndex;
                    if (d != 0)
                        signalWork();             // propagate if nonempty
                    w.execTask(t);
                }
                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                return false;                     // store next seed
            }
            else if (j < 0) {                     // xorshift
                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
            }
            else
                ++k;
        }
        if (scanGuard != g)                       // staleness check
            return false;
        else {                                    // try to take submission
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            if ((b = queueBase) != queueTop &&
                (q = submissionQueue) != null &&
                (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueBase = b + 1;
                    w.execTask(t);
                }
                return false;
            }
            return true;                         // all queues empty
        }
    }


可是起碼能看出來,Fork出來的任務是如何被其餘線程運行以實現多線程運行的了。面對這麼個有點複雜的算法,我只能先去查查,發現原來叫作Work-Stealing,好吧,下一篇來研究這個Work-Stealing。less

相關文章
相關標籤/搜索