實戰java高併發程序設計第三章(一)

1. 同步控制

synchronized的擴展:重入鎖

同步控制不只有synchronized配合object.wait()以及object.notify(),也有加強版的reentrantLock(重入鎖)java

public class ReenterLock implements Runnable{
    public static ReentrantLock lock=new ReentrantLock();
    public static int i=0;
    @Override
    public void run() {
        for(int j=0;j<10000000;j++){
            lock.lock();
            lock.lock();        //此處演示重入性
            try{
                i++;
            }finally{
                lock.unlock();        //退出臨界區必須解鎖
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl=new ReenterLock();
        Thread t1=new Thread(tl);
        Thread t2=new Thread(tl);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);    //計算結果爲 20000000

    }
}

咱們來看下reentrantlock相比synchronized鎖有何優勢:算法

中斷響應

面對死鎖,彷佛synchronized沒有任何主動解決策略,而reentrantlock則能夠輕鬆解決緩存

public class IntLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    /**
     * 控制加鎖順序,方便構造死鎖
     * @param lock
     */
    public IntLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();        //可中斷的加鎖
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(Thread.currentThread().getName()+":線程被中斷");
        } finally {
            if (lock1.isHeldByCurrentThread())
                lock1.unlock();
            if (lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getName()+":線程退出");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        IntLock r1 = new IntLock(1);
        IntLock r2 = new IntLock(2);
        Thread t1 = new Thread(r1,"線程1");
        Thread t2 = new Thread(r2,"線程2");
        t1.start();t2.start();
        Thread.sleep(1000);
        //中斷其中一個線程
        t2.interrupt();
    }
}
//    輸出結果:
//    java.lang.InterruptedException
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
//        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
//        at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
//        at java.lang.Thread.run(Thread.java:745)
//    線程2:線程被中斷
//    線程2:線程退出
//    線程1:線程退出

由上可知,當t1,t2造成死鎖時,能夠主動利用中斷來解開,但完成任務的只有t1,t2被中斷. 而若是換成synchronized則將沒法進行中斷多線程

鎖申請等待時限

lock1.tryLock();                            //嘗試獲取鎖,得到當即返回true,未得到當即返回false
lock1.tryLock(5, TimeUnit.SECONDS);         //嘗試獲取鎖,5秒內未得到則返回false,得到返回true
public class TryLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    public TryLock(int lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        if (lock == 1) {
            while (true) {
                if (lock1.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock2.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock2.unlock();
                            }
                        }
                    } finally {
                        lock1.unlock();
                    }
                }
            }
        } else {
            while (true) {
                if (lock2.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock1.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock1.unlock();
                            }
                        }
                    } finally {
                        lock2.unlock();
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        TryLock r1 = new TryLock(1);
        TryLock r2 = new TryLock(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }
}
//    15:My Job done
//    14:My Job done

使用trylock能夠有效地避免產生死鎖框架

公平鎖

synchronized鎖爲非公平鎖,而reentrantLock既能夠是公平鎖也能夠是非公平鎖
非公平鎖容易產生飢餓,公平鎖先進先出,但效率不敵非公平鎖dom

public ReentrantLock(boolean fair)

重入鎖的搭檔Condition

Condition和object.wait(),object.notify()方法相似
condition的基本方法以下:ide

void await() throws InterruptedException;        //使當前線程等待,釋放鎖,能響應signal和signalAll方法,響應中斷
void awaitUninterruptibly();                    //相似 await,但不響應中斷
long awaitNanos(long nanosTimeout)throws InterruptedException;    //等待一段時間
boolean await (long time,TimeUnit unit)throws InterruptedException;
boolean awaitUntil(Date deadline)throws InterruptedException;
void signal();    //喚醒一個等待中的線程
void signalAll();  //喚醒全部等待中的線程

JDK內部就有不少對於ReentrantLock的使用,如ArrayBlockingQueue函數

//在 ArrayBlockingQueue中的一些定義
    boolean fair = true;
    private final ReentrantLock lock = new ReentrantLock(fair);
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    //put(方法的實現  
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();       //put方法作同步
        try {
            try {
                while (count == items.length) //隊列已滿
                    notFull.await();        //等待隊列有足夠的空間
            } catch (InterruptedException ie) {
                notFull.signal();
                throw ie;
            }
            insert(e);                  //notFull被通知時,說明有足夠的空間
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notFull.signal();           //通知take方法的線程,隊列已有數據
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();         //對take()方法作同步
        try {
            try {
                while (count == 0)          //若是隊列爲空
                    notEmpty.await();        //則消費者隊列要等待一個非空的信號
            } catch (InterruptedException ie) {
                notEmpty.signal();
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();    //通知put線程隊列已有空閒空間
        return x;
    }

多線程同時訪問:信號量(semaphore)

同步鎖只能容許一個線程進行訪問,信號量能夠指定多個線程同時訪問同一個資源.工具

//構造方法
    public Semaphore(int permits)                //傳入int表示能同時訪問的線程數
    public Semaphore(int permits, boolean fair)  //線程數,是否公平鎖
    
    //實例方法
    public void acquire() throws InterruptedException        //獲取一個訪問權限,會阻塞線程,會被打斷
    public void acquireUninterruptibly()                     //獲取一個訪問權限,會阻塞線程,不會被打斷
    public boolean tryAcquire()                              //獲取一個訪問權限,當即返回
    public boolean tryAcquire(long timeout, TimeUnit unit)   //獲取一個訪問權限,嘗試一段時間
    public void release()                                    //釋放一個訪問權限
public class SemapDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);
    @Override
    public void run() {
        try {
            semp.acquire();                
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();                //使用完後要釋放,不然會引發信號量泄漏
        }
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

    //輸出結果
    //每次輸出5個結果,對應信號量的5個許可

讀寫鎖ReadWriteLock

讀寫鎖適用於讀多寫少的場景,讀讀之間爲並行,讀寫之間爲串行,寫寫之間也爲串行
public class ReadWriteLockDemo {
    private static Lock lock=new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();       //獲取讀寫鎖
    private static Lock readLock = readWriteLock.readLock();                                //讀鎖
    private static Lock writeLock = readWriteLock.writeLock();                              //寫鎖
    private int value;
    public Object handleRead(Lock lock) throws InterruptedException{
        try{
            lock.lock();                //模擬讀操做
            Thread.sleep(1000);            //讀操做的耗時越多,讀寫鎖的優點就越明顯
            return value;                
        }finally{
        lock.unlock();
        }
    }
    public void handleWrite(Lock lock,int index) throws InterruptedException{
        try{
            lock.lock();                //模擬寫操做
            Thread.sleep(1000);
            value=index;
        }finally{
        lock.unlock();
        }
    }
    public static void main(String[] args) {
        final ReadWriteLockDemo demo=new ReadWriteLockDemo();
        Runnable readRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readLock);
//                    demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable writeRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(writeLock,new Random().nextInt());
//                    demo.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for(int i=0;i<18;i++){
            new Thread(readRunnale).start();
        }
        
        for(int i=18;i<20;i++){
            new Thread(writeRunnale).start();
        }    
    }
}

//結果:
//讀寫鎖明顯要比單純的鎖要更快結束,說明讀寫鎖確實提高很多效率

倒計數器CountDownLatch

讓一個線程等待,知道倒計時結束
public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);        //構造倒計時器,倒計數爲10
    static final CountDownLatchDemo demo=new CountDownLatchDemo();   
    @Override
    public void run() {
        try {
            //模擬檢查任務
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check complete");
            end.countDown();                                        //倒計時器減1
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            exec.submit(demo);
        }
        //等待檢查
        end.await();                        //主線程阻塞,待其餘線程所有完成後再喚醒主線程
        //發射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

循環柵欄CyclicBarrier

循環柵欄相似於倒計時器,可是計數器能夠反覆使用,cyclicBarrier比CountDownLatch稍微強大些,能夠傳入一個barrierAction,barrierAction指每次完成計數便出發一次
public CyclicBarrier(int parties,Runnable barrierAction)  //構造方法
public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;
        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }
        public void run() {
            try {
                //等待全部士兵到齊
                cyclic.await();                        //觸發一次循環柵欄,達到計數器後纔會進行下一步工做
                doWork();
                //等待全部士兵完成工做
                cyclic.await();                        //再次觸發循環柵欄,達到計數器後纔會進行下一步工做
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt()%10000));        //模擬工做
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任務完成");
        }
    }
    public static class BarrierRun implements Runnable {            //用於傳入CyclicBarrier的構造方法,做爲達到計數器數值後的觸發任務, 能夠被屢次調用
        boolean flag;
        int N;
        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }
        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "個,任務完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "個,集合完畢!]");
                flag = true;
            }
        }
    }
    public static void main(String args[]) throws InterruptedException {
        final int N = 10;
        Thread[] allSoldier=new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //設置屏障點,主要是爲了執行這個方法
        System.out.println("集合隊伍!");
        for (int i = 0; i < N; ++i) {
            System.out.println("士兵 "+i+" 報道!");
            allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }
}
注意:
一旦其中一個被interrupt後,極可能會拋出一個interruptExpection和9個BrokenBarrierException,表示該循環柵欄已破損,防止其餘線程進行無所謂的長久等待

線程阻塞工具LockSupport

LockSupport是一個很是實用的線程阻塞工具,不須要獲取某個對象的鎖(如wait),也不會拋出interruptedException異常
public static void park()        //掛起當前線程,
public static void park(Object blocker)        //掛起當前線程,顯示阻塞對象,parking to wait for <地址值>
public class LockSupportDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park(this);                
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);    //即便unpark發生在park前,也可使程序正常結束
        t1.join();
        t2.join();
    }
}
LockSupport使用了相似信號量的機制,它爲每一個線程準備一個許可,若是許可可用,park當即返回,而且消費這個許可(轉爲不可用),若是許可不可用,就會阻塞,而unpark方法就是使一個許可變爲可用
locksupport.park()能夠相應中斷,可是不會拋出interruptedException,咱們能夠用Thread.interrupted等方法中獲取中斷標記.
public class LockSupportIntDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park();
                if(Thread.interrupted()){                        //檢測到中斷位,並清除中斷狀態
                    System.out.println(getName()+" 被中斷了");
                }
                if (Thread.currentThread().isInterrupted()){    //中斷狀態已被清除,沒法檢測到
                    System.out.println(1);
                }
            }
            System.out.println(getName()+"執行結束");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
//輸出:
//in t1
//t1 被中斷了
//t1執行結束
//in t2
//t2執行結束

Guava和Limiter限流

限流算法通常有兩種:漏桶算法和令牌桶算法
漏桶算法: 利用緩存區,全部請求進入系統,都在緩存區中保存,而後以固定的流速流出緩存區進行處理.
令牌桶算法: 桶中存放令牌,每一個請求拿到令牌後才能進行處理,若是沒有令牌,請求要麼等待,要麼丟棄.RateLimiter就是採用這種算法
public class RateLimiterDemo {
    static RateLimiter limiter = RateLimiter.create(2);        //每秒處理2個請求
    public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis());
        }
    }
    public static void main(String args[]) throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            limiter.acquire();            //過剩流量會等待
            new Thread(new Task()).start();
        }
    }
}
//  某些場景傾向於丟棄過剩流量,tryAcquire則是當即返回,不會阻塞
//        for (int i = 0; i < 50; i++) {        
//            if(!limiter.tryAcquire()) {
//                continue;
//            }
//            new Thread(new Task()).start();
//        }

2. 線程池

Executors框架

Executor框架提供了各類類型的線程池,主要有如下工廠方法:性能

//固定線程數量,當有新任務提交時,若池中有空閒線程則當即執行,若沒有空閒線程,任務會被暫存在一個任務隊列中,直到有空閒線程
public static ExecutorService newFixedThreadPool(int nThreads)
//返回只有一個線程的線程池,多餘任務被保存到一個任務隊列中,線程空閒時,按先入先出的順序執行隊列中的任務
public static ExecutorService newSingleThreadPoolExecutor()
//線程數量不固定,優先使用空閒線程,多餘任務會建立新線程
public static ExecutorService newCachedThreadPool()
//線程數量爲1,給定時間執行某任務,或週期性執行任務
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//線程數量能夠指定,定時或週期性執行任務
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

計劃任務:newScheduledThreadPool主要方法

//給定時間,對任務進行一次調度
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//週期調度,以任務完成後間隔固定時間調度下一個任務,(二者相加)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
//週期調度,兩個任務開始的時間差爲固定間隔,若是任務時間大於間隔時間則以任務時間爲準(二者取其大者)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
注意:
任務異常時後續全部任務都將中止調度,所以必須保證全部任務異常均被正常處理.
  • 核心線程池 ThreadPoolExecutor

ThreadPoolExecutor構造函數:

public ThreadPoolExecutor(int corePoolSize,         //核心線程池大小
                              int maximumPoolSize,  //最大線程池大小
                              long keepAliveTime,  //線程池中超過corePoolSize數目的空閒線程最大存活時間;能夠allowCoreThreadTimeOut(true)使得核心線程有效時間
                              TimeUnit unit,  //keepAliveTime時間單位
                              BlockingQueue<Runnable> workQueue, //阻塞任務隊列
                              ThreadFactory threadFactory,  //新建線程工廠
                              RejectedExecutionHandler handler ) //當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

workQueue指被提交可是未執行的任務隊列,是BlockingQueue接口的對象
1.直接提交隊列:SynchronousQueue,該隊列沒有容量,每一個插入操做對應一個刪除操做,即提交的任務老是會交給線程執行,若是沒有空閒進程,則建立新線程,數量達最大則執行拒絕策略,通常須要設置很大的maximumPoolSize
2.有界任務隊列:ArrayBlockingQueue,有新任務時,若線程池的實際線程數小於corePoolSize,優先建立新線程,若大於corePoolSize,加入到等待隊列,若隊列已滿,不大於maximumPoolSize前提下,建立新線程執行;當且僅當等待隊列滿時纔會建立新線程,不然數量一直維持在corePoolSize
3.無界任務隊列:LinkedBlockingQueue,小於corePoolSize時建立線程,達到corePoolSize則加入隊列直到資源消耗殆盡
4.優先任務隊列:PriorityBlockingQueue,特殊無界隊列,老是保證高優先級的任務先執行.

Executors分析

newFixedThreadPool: corePoolSize=maximumPoolSize,線程不會超過corePoolSize,使用LinkedBlockingQueue
newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1
newCachedThreadPool: corePoolSize=0,maximumPoolSize爲無窮大,空閒線程60秒回收,使用SynchronousQueue隊列

ThreadPoolExecutor的execute()方法執行邏輯

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {    //檢查是否小於corePoolSize
            if (addWorker(command, true))    //添加線程,執行任務
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {    //添加進隊列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))   //雙重校驗
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))    //提交線程池失敗
            reject(command);                    //拒絕執行
    }

clipboard.png

拒絕策略

**AbortPolicy**:該策略會直接拋出異常,阻止系統正常工做。  
**CallerRunsPolicy**:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可  能會急劇降低。    任務,並嘗試再次提交當前任務。     
**DiscardOldestPolicy**:該策略將丟棄最老的一個請求,也就是即將被執行的一個    
**DiscardPolicy**:該策略默默地丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,我以爲這多是最好的一種方案了吧!

自定義ThreadFactory

public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory(){
                    @Override
                    public Thread newThread(Runnable r) {    //自定義建立線程的方法
                        Thread t= new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create "+t);
                        return t;
                    }
                }
               );
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }

擴展線程池

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId()
                    + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:" + ((MyTask) r).name);
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執行完成:" + ((MyTask) r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();               //等待全部任務執行完畢後再關閉
    }
}

異常堆棧消息

線程池中的異常堆棧可能不會拋出,須要咱們本身去包裝

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
    private Runnable wrap(final Runnable task, final Exception clientStack,
            String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();                //外層包裹trycatch,便可打印出異常
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    throw e;
                }
            }
        };
    }
}

Fork/Join框架

相似於mapreduce,用於大數據量,fork()創造子線程,join表示等待,
clipboard.png

clipboard.png

clipboard.png

public class CountTask extends RecursiveTask<Long>{
    private static final int THRESHOLD = 10000;        //任務分解規模
    private long start;
    private long end;
    public CountTask(long start,long end){
        this.start=start;
        this.end=end;
    }
    @Override
    public Long compute(){
        long sum=0;
        boolean canCompute = (end-start)<THRESHOLD;     //表示計算極限10000,超出此值須要使用forkjoin
        if(canCompute){
            for(long i=start;i<=end;i++){
                sum +=i;
            }
        }else{
            //分紅100個小任務
            long step=(start+end)/100;
            ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
            long pos=start;
            for(int i=0;i<100;i++){
                long lastOne=pos+step;
                if(lastOne>end)lastOne=end;     //最後一個任務可能小於step,故須要此步
                CountTask subTask=new CountTask(pos,lastOne);   //子任務
                pos+=step+1;        //調整下一個任務
                subTasks.add(subTask);
                subTask.fork();     //fork子任務
            }
            for(CountTask  t:subTasks){
                sum+=t.join();      //聚合任務
            }
        }
        return sum;
    }
    public static void main(String[]args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000000000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try{
            long res = result.get();
            System.out.println("sum="+res);
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ExecutionException e){
            e.printStackTrace();
        }
    }
}
注意:
若是任務的劃分層次不少,一直得不到返回,可能有兩種緣由:
1.系統內線程數量越積越多,致使性能嚴重降低
2.函數調用層次變多,致使棧溢出

Guava對線程池的拓展

1.特殊的DirectExecutor線程池

Executor executor=MoreExecutors.directExecutor();    // 僅在當前線程運行,用於抽象

2.Daemon線程池
提供將普通線程轉換爲Daemon線程.不少狀況下,咱們不但願後臺線程池阻止程序的退出

public class MoreExecutorsDemo2 {
    public static void main(String[] args) {
        ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        MoreExecutors.getExitingExecutorService(exceutor);
        exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName()));
    }
}

3.future模式擴展待續....

相關文章
相關標籤/搜索