Java編程思想 第21章 併發

這是在2013年的筆記整理。如今從新拿出來,放在網上,從新總結下。html

兩種基本的線程實現方式 以及中斷

 

package thread; java

 

 

/** 編程

*小程序

* @author zjf windows

* @create_time 2013-12-18 緩存

* @use測試基本的兩種線程的實現方式安全

*         測試中斷多線程

*/ 併發

public class BasicThreadTest { app

 

    public static void main(String[] args) {

        Counter c1 = new Counter();

        c1.start();

        Thread c2 = new Thread(new Countable());

        c2.start();

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        //中斷

        c1.interrupt();

        c2.interrupt();

          

        

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        //此時c1線程已經終止不能再次start 屢次啓動一個線程是非法的。java.lang.IllegalThreadStateException

        c1.start();

    }

      

    

    /**

     *

     * @author zjf

     * @create_time 2013-12-18

     * @use Runnable接口方式的實現

     */

    static class Countable implements Runnable{

        public void run() {

            int i = 0;

            while(!Thread.currentThread().isInterrupted())

            {

                System.out.println(this.toString() + "-------------" + i);

                i ++;

            }

        }

    }

    /**

     *

     * @author zjf

     * @create_time 2013-12-18

     * @use Thread繼承方式的實現

     */

    static class Counter extends Thread{

        public void run() {

            int i = 0;

            while(!Thread.currentThread().isInterrupted())

            {

                System.out.println(this.toString() + "-------------" + i);

                i ++;

            }

        }

    }

}

中斷
  • public void interrupt()

中斷線程。

若是線程在調用 Object 類的 wait()wait(long)wait(long, int) 方法,或者該類的 join()join(long)join(long, int)sleep(long)sleep(long, int) 方法過程當中受阻,則其中斷狀態將被清除,它還將收到一個 InterruptedException

  • public static boolean interrupted()

測試當前線程是否已經中斷。線程的中斷狀態 由該方法清除。

  • public boolean isInterrupted()

測試線程是否已經中斷。線程的中斷狀態 不受該方法的影響。

測試睡眠被中斷

sleep是靜態方法。

 

package thread;

 

 

/**

*

* @author zjf

* @create_time 2013-12-18

* @use測試Sleep方法被中斷

*/

public class SleepTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use測試目的:睡眠時是否能夠被中斷

     * @param args

     */

    public static void main(String[] args) {

        Thread t = new Thread(new Sleepable());

        t.start();

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        t.interrupt();

    }

 

    

    static class Sleepable implements Runnable{

        public void run() {

            try {

                //睡眠10可是線程開始1秒後被中斷當前線程在睡眠時可以接收到中斷而後退出

                Thread.sleep(10000);

            } catch (InterruptedException e) {

                //若是被中斷就退出

                System.out.println("exit");//一秒後退出

            }

        }

    }

}

 

測試使用yield讓步

yield是靜態方法。

 

package thread;

 

public class YieldTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use測試yield方法

     * @param args

     */

    public static void main(String[] args) {

        new Thread() {

            @Override

            public void run() {

                for(int i = 1; i < 100; i++)

                {

                    System.out.println("yield-----" + i);

                    //測試結果顯示使用yield讓步與不使用差異不大

                    Thread.yield();

                }

            }

        }.start();

        

        new Thread() {

            @Override

            public void run() {

                for(int i = 1; i < 100; i++)

                {

                    System.out.println("notyield-----" + i);

                }

            }

        }.start();

    }

 

}

 

測試cached線程池

 

newCachedThreadPool:建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們。對於執行不少短時間異步任務的程序而言,這些線程池一般可提升程序性能。調用 execute 將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。所以,長時間保持空閒的線程池不會使用任何資源。

 

CachedThreadPool通常會建立所需數量的線程,而且會複用,這是選擇的首選。

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class CachedThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use測試Cached線程池

     * @param args

     */

    public static void main(String[] args) {

 

        /*

         * cached線程池不能設置擁有線程的數量

         */

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            es.execute(new Countable(i));

        }

        

        /*

         * 由於要複用線程因此線程執行完任務以後不會馬上關閉而是等待一分鐘(可配置)

         * 的時間若是在這一分鐘期間沒有新的任務要執行會自動關閉

         * shutdown標明不會再有新的任務加入因此加入shutdown代碼以後任務執行以後就會關閉線程

         * 不會等待一分鐘

         */

        es.shutdown();

 

    }

 

    static class Countable implements Runnable {

 

        private int i;

 

        public Countable(int i) {

            this.i = i;

        }

 

        public void run() {

 

            System.out.println("" + i + "啓動的線程的ID"

                    + Thread.currentThread().getId());

            

            /**

             *輸出爲

                 0啓動的線程的ID7

                2啓動的線程的ID9

                1啓動的線程的ID8

                3啓動的線程的ID10

                4啓動的線程的ID11

                5啓動的線程的ID12

                6啓動的線程的ID13

                8啓動的線程的ID8

                7啓動的線程的ID9

                9啓動的線程的ID10

                

                可見在地8 9 10個線程的時候複用了第1 2 3個線程。

                這創建在第1 2 3個線程已經運行完的基礎上。

             */

 

        }

    }

 

}

shutdown和shutdownnow

shutdown:

  • 阻止加入新的任務。
  • 結束已經完成任務的空閒線程,直到全部任務執行完畢,關閉全部線程爲止。

shutdownnow:

  1. 完成shutdown的功能。
  2. 向每一個未完成的線程發佈中斷命令。
  3. 返回未執行的任務列表
shutdownnow

package thread;

 

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class ShutdownNowTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newFixedThreadPool(3);

        for(int i = 0; i < 10; i ++)

        {

            es.execute(new Countable(i));

        }

        TimeUnit.SECONDS.sleep(1);

        //返回等待的任務列表

        List<Runnable> countList = es.shutdownNow();

        for(Runnable r : countList)

        {

            System.out.println(r.toString() + " is not done...");

        }

    }

 

}

 

class Countable implements Runnable{

    private int i;

    public Countable(int i) {

        this.i = i;

    }

    

    public int getI() {

        return i;

    }

 

    @Override

    public String toString() {

        

        return "thread, id : " + i;

    }

    

    public void run() {

        

        try {

            System.out.println(this.toString() + " is start to run...");

            TimeUnit.MILLISECONDS.sleep(500);

            System.out.println(this.toString() + " is done...");

        } catch (InterruptedException e) {

            System.out.println(this.toString() + " is interrupted...");

        }

    }

    

}

 

 

/**輸出

 

thread, id : 0 is start to run...

thread, id : 1 is start to run...

thread, id : 2 is start to run...

thread, id : 0 is done...

thread, id : 1 is done...

thread, id : 2 is done...

thread, id : 3 is start to run...

thread, id : 4 is start to run...

thread, id : 5 is start to run...

thread, id : 5 is done...

thread, id : 3 is done...

thread, id : 4 is done...

thread, id : 6 is start to run...

thread, id : 6 is interrupted...

thread, id : 7 is not done...

thread, id : 8 is not done...

thread, id : 9 is not done...

 

*/

測試ThreadFactory

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

 

public class ThreadFactoryTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use測試Cached線程池

     * @param args

     */

    public static void main(String[] args) {

 

        ThreadFactory threadFactory = new MyThreadFactory();

        ExecutorService es = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 10; i++) {

            es.execute(new Countable(i));

        }

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        private int i;

        public Countable(int i) {

            this.i = i;

        }

        public void run() {

            System.out.println("" + i + "個任務正在運行!");

        }

    }

 

    static class MyThreadFactory implements ThreadFactory {

        private static int count = 0;

        public Thread newThread(Runnable r) {

            return new MyThread(r,count++);

        }

    };

    

    static class MyThread extends Thread

    {

        private Runnable target;

        private int count;

        public MyThread(Runnable target, int count) {

            super();

            this.target = target;

            this.count = count;

        }

        @Override

        public void run() {

            System.out.println("" + count + "個線程啓動!" );

            if(target != null)

            {

                target.run();

            }

            System.out.println("" + count + "個線程結束!" );

        }

    }

}

/*

* 輸出結果

        0個線程啓動!

        1個線程啓動!

        2個線程啓動!

        3個線程啓動!

        0個任務正在運行!

        1個任務正在運行!

        2個任務正在運行!

        4個線程啓動!

        3個任務正在運行!

        5個線程啓動!

        4個任務正在運行!

        5個任務正在運行!

        8個任務正在運行!

        6個線程啓動!

        7個任務正在運行!

        7個線程啓動!

        6個任務正在運行!

        9個任務正在運行!

        7個線程結束!

        0個線程結束!

        3個線程結束!

        6個線程結束!

        5個線程結束!

        1個線程結束!

        4個線程結束!

        2個線程結束!

        

    證實:    Countable中的run方法被執行了10

            MyThread中的run方法只被執行了9

            緣由:CachedThreadPool在須要的時候會調用ThreadFactorynewThread方法可是也會用到緩存

            */

測試FixedThreadPool

 

newFixedThreadPool:建立一個可重用固定線程集合的線程池,以共享的無界隊列方式來運行這些線程。若是在關閉前的執行期間因爲失敗而致使任何線程終止,那麼一個新線程將代替它執行後續的任務(若是須要)。在某個線程被顯式地關閉以前,池中的線程將一直存在。(這與cacheThreadPool不同)

 

 

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class FixedThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use測試Fixed線程池

     * @param args

     */

    public static void main(String[] args) {

 

        

        ExecutorService es = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {

            es.execute(new Countable(i));

        }

        es.shutdown();

 

    }

 

    static class Countable implements Runnable {

 

        private int i;

 

        public Countable(int i) {

            this.i = i;

        }

 

        public void run() {

 

            System.out.println("" + i + "啓動的線程的ID"

                    + Thread.currentThread().getId());

 

        }

    }

 

}

/*

0啓動的線程的ID7

2啓動的線程的ID9

1啓動的線程的ID8

3啓動的線程的ID7

4啓動的線程的ID9

*/

 

SingleThreadExecutor

newSingleThreadExecutor():

建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,若是由於在關閉前的執行期間出現失敗而終止了此單個線程,那麼若是須要,一個新線程將代替它執行後續的任務)。可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。與其餘等效的 newFixedThreadPool(1) 不一樣,可保證無需從新配置此方法所返回的執行程序便可使用其餘的線程(備註:應該是內部實現的差別 外部的使用沒什麼差別)。

 

由於一個任務執行完畢以後,線程纔會空閒下來去執行另一個任務,因此能夠保證順序執行任務。

測試ScheduledExecutorService

scheduled

adj. 預約的;已排程的

v. 把…列表;把…列入計劃;安排(schedule的過去分詞)

 

上面演示的線程執行器或者線程池都是ExecutorService,下面看看ScheduledExecutorService。ScheduledExecutorService集成而且擴展了ExecutorService,可安排在給定的延遲後運行或按期執行的命令。

 

package thread;

 

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

 

public class SingleThreadScheduledTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use測試SingleThreadScheduled線程池

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ScheduledExecutorService es = Executors

                .newSingleThreadScheduledExecutor();

        //ScheduledThreadPool須要傳參控制池中所保存的線程數(即便線程是空閒的也包括在內)

        //ScheduledExecutorService es = Executors.newScheduledThreadPool(1);

        // 給定時間延遲後執行

        // es.schedule(new Countable(), 1, TimeUnit.SECONDS);

        // 傳入一個任務而後按照給定頻率循環執行在每次任務開始執行的時間點之間存在固定間隔

        //es.scheduleAtFixedRate(new Countable(), 2, 1, TimeUnit.SECONDS);

        // 傳入一個任務而後按照給定頻率循環執行每一次執行終止和下一次執行開始之間都存在給定的間隔

        es.scheduleWithFixedDelay(new Countable(), 2, 1, TimeUnit.SECONDS);

        // 若是沒有這句代碼將沒有任何反應,由於----|

        // 下面的shutdown代碼將會阻止執行新加入任務包含延遲執行而未執行的任務

        TimeUnit.SECONDS.sleep(10);

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        public void run() {

            System.out.println("一個任務運行開始!");

            try {

                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {

            }

            System.out.println("一個任務運行結束!");

        }

    }

 

}

 

package thread;

 

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

 

public class ScheduledThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use測試SingleThreadScheduled線程池

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        //ScheduledThreadPool須要傳參控制池中所保存的線程數(即便線程是空閒的也包括在內)

        ScheduledExecutorService es = Executors.newScheduledThreadPool(1);

        es.scheduleAtFixedRate(new Countable(), 0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        public void run() {

            System.out.println("一個任務運行開始!");

            try {

                TimeUnit.SECONDS.sleep(3);

            } catch (InterruptedException e) {

            }

            System.out.println("一個任務運行結束!");

        }

    }

}

/*

* 線程池中只有一個線程 + 每隔1秒要執行一個任務 + 一個任務要運行3秒才結束

* 結果是每隔3秒才能執行一次

*/

 

優先級

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class PriorTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use測試優先級

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                Thread.currentThread().setPriority(Thread.MAX_PRIORITY);

                int i = 0;

                while (!Thread.currentThread().isInterrupted()) {

                    System.out.println("MAX_PRIORITY" + i);

                    i++;

                }

            }

        });

        

        es.execute(new Runnable() {

            public void run() {

                Thread.currentThread().setPriority(Thread.MIN_PRIORITY);

                int i = 0;

                while (!Thread.currentThread().isInterrupted()) {

                    System.out.println("MIN_PRIORITY" + i);

                    i++;

                }

            }

        });

        TimeUnit.SECONDS.sleep(1);

        es.shutdownNow();

    }

}

/*

* 最後一次輸出結果是

*     MAX_PRIORITY32525

*    MIN_PRIORITY31289

* 差異並不大調整優先級適用於作適當的強弱調整不能用來控制流程走勢

* windows7個優先級 java能夠設置10個優先級

*/

測試Callable

package thread;

 

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

public class CallableTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use

     * @param args

     */

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        //CallableFuture都是泛型設計的 T表明返回值的類型

        Future<String> future = es.submit(new Callable<String>() {

            //call方法返回T 而且能夠拋出異常到主線程

            public String call() throws Exception {

                System.out.println("running.");

                TimeUnit.SECONDS.sleep(1);

                return "hello world!";

            }

        });

 

        es.shutdown();

        

        //若是被調用線程尚未完成 get方法將阻塞也可使用isDone()方法來判斷是否完成

        try {

            System.out.println(future.get());

        } catch (InterruptedException e) {

            e.printStackTrace();//異常處理

        } catch (ExecutionException e) {

            e.printStackTrace();//異常處理

        }

    }

 

}

submit解析

package thread;

 

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

public class SubmitTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     */

 

    /*

     * submit方法提交了一個任務給es去執行 es將分配一個線程來執行若是遇到ruturn或者拋出了異常信息都將記錄到Future對象中

     * 注意異常不會馬上拋出只是記錄到future 在調用futureget方法時候才拋出

     */

 

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

 

        // 方式1

        Future<String> future1 = es.submit(new Callable<String>() {

            public String call() throws Exception {

                return "done";

            }

        });

 

        try {

            future1.get();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

 

        // 方式2 這種方式的get永遠爲null 可是能夠拋出異常

        Future<?> future2 = es.submit(new Runnable() {

 

            public void run() {

 

            }

        });

 

        try {

            future2.get();

        } catch (InterruptedException e) {

 

            e.printStackTrace();

        } catch (ExecutionException e) {

 

            e.printStackTrace();

        }

 

        // 方式3 由於run方法是void即便加上String.class 只能得到一個 Future<String>對象而已 get的結果仍然是String型的null

        es.submit(new Runnable() {

            public void run() {

                

            }

        }, String.class);

    }

 

}

join方法

若是在一個線程的run方法中調用t.join,那麼將會在t執行結束以後纔會繼續當前線程。

 void

join()
          等待該線程終止。

 void

join(long millis)
          等待該線程終止的時間最長爲 millis 毫秒。

Join拋出InterruptedException。能夠被中斷。

package thread;

 

public class JoinTest {

 

    public static void main(String[] args) {

        Thrd thrd = new Thrd();

        thrd.start();

        try {

            thrd.join();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("after join");

        System.out.println("exit");

    }

 

    static class Thrd extends Thread {

        @Override

        public void run() {

            for (int i = 0; i < 10; i++) {

                System.out.println("running " + i);

                try {

                    Thread.sleep(1000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

}

 

未捕捉異常處理器

經過futureget方法也能夠獲取異常不知道這兩種方式有何差異?

 

package thread;

 

import java.lang.Thread.UncaughtExceptionHandler;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

 

public class UncaughtExceptionHandlerTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use

     * @param args

     */

    public static void main(String[] args) {

 

        ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {

            public Thread newThread(Runnable r) {

                Thread t = new Thread(r);

                t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

                    public void uncaughtException(Thread t, Throwable e) {

                        System.out.println("線程" + t.getId() + "發生了異常:"

                                + e.getMessage());

                    }

                });

                return t;

            }

        });

 

        es.execute(new Runnable() {

            public void run() {

                throw new RuntimeException("自定義異常");

            }

        });

 

        es.shutdown();

    }

}

經過Funture來捕捉異常:

package thread;

 

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class ExceptionTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     */

    public static void main(String[] args) {

 

        ExecutorService es = Executors.newCachedThreadPool();

        try {

            es.submit(new Runnable() {

                public void run() {

                    throw new RuntimeException("error");

                }

            }).get();

        } catch (InterruptedException e) {

            

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

            System.out.println("-------------------------------------------------------------------------------------------------------------");

            e.getCause().printStackTrace();

        }

        es.shutdown();

    }

}

java.util.concurrent.ExecutionException: java.lang.RuntimeException: error

    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)

    at java.util.concurrent.FutureTask.get(FutureTask.java:80)

    at thread.ExceptionTest.main(ExceptionTest.java:23)

Caused by: java.lang.RuntimeException: error

    at thread.ExceptionTest$1.run(ExceptionTest.java:21)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)

    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)

    at java.util.concurrent.FutureTask.run(FutureTask.java:123)

    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)

    at java.lang.Thread.run(Thread.java:595)

-------------------------------------------------------------------------------------------------------------

java.lang.RuntimeException: error

    at thread.ExceptionTest$1.run(ExceptionTest.java:21)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)

    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)

    at java.util.concurrent.FutureTask.run(FutureTask.java:123)

    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)

    at java.lang.Thread.run(Thread.java:595)

 

20131224 週二

java編程思想 第21章 併發

監視器

內部的默認鎖:對象內部的鎖 用於對象synchronized方法。class內部的鎖用於類的static synchronized方法。

何時使用同步

對於共享資源的訪問:

若是一個線程正在寫入一個數據,而這個數據有可能會被另一個線程讀取,那麼當前寫入操做應該同步。

若是一個線程正在讀取一個數據,而這個數據有可能以前被另一個線程寫入,那麼當前讀取操做應該同步。

ReentrantLock

可重入鎖,累計進入鎖定數值累計,須要屢次解鎖才能完全解鎖。

保持計數:當前線程重入鎖的次數。

默認爲非公平鎖:有一個構造函數能夠接受公平參數,若是設置爲true,將會盡力保證將鎖資源分配給等待時間最長的線程以保證公平。若是使用無參構造函數,那麼將採用非公平鎖。

lock():嘗試獲取鎖,成功後將保持計數+1,若是鎖被另一個線程持有,將等待。

lockInterruptibly():和lock用法意同樣,區別是它在獲取鎖的等待過程當中能夠被中斷。

tryLock():和lock的差異:1.它將忽略公平設置。永遠不公平。2. 若是鎖被另一個線程持有,當即返回或者按照傳遞的等待時間超時後返回。

unlock():若是當前線程是此鎖定全部者,則將保持計數減 1。若是保持計數如今爲 0,則釋放該鎖定。

 

何時須要使用鎖

使用默認的synchronized方法,或者synchronized塊,是基於默認的ReentrantLock實現的。可是ReentrantLock的功能要更多。並且不帶參數的synchronized是針對當前對象或者當前類的默認鎖的,若是一個類有多個方法要同步,可是不是每一個方法都相互牽制,那麼應該使用lock來區別對待。

原子操做

對非long和double以外的基本類型的讀取和賦值操做時原子操做

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class AtomiTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newCachedThreadPool();

        for(int i =0; i <runCount; i++)

        {

            //新建100個線程來執行countAdd的操做

            es.execute(new Runnable(){

                public void run() {

                    for(int i =0; i <100; i++)

                    {

                        countAdd();

                    }

                    //標示當前線程已經運行完畢

                    iAmDone();

                }});

        }

        es.shutdown();

        //全部線程運行完畢後打印出結果

        while(!isAllDone())

        {

            TimeUnit.MILLISECONDS.sleep(500);

        }

        System.out.println(getCount());

    }

 

    private static int count = 0;

    

    private static int done = 0;

    

    private static int runCount = 100;

    

    private static Lock countLock = new ReentrantLock();

    

    //此處不加上synchronized 結果將不是10000(比10000小)

    public synchronized static void iAmDone()

    {

        done ++;

    }

    

    public synchronized static boolean isAllDone()

    {

        return done == 100;

    }

    

    public static void countAdd()

    {

        //使用lock 不在方法中加synchronized 這樣不會與上面的done方法公用thislock 能夠提高性能

        countLock.lock();

        count ++;

        countLock.unlock();

    }

    

    public static int getCount()

    {

        return count;

    }

}

 

原子性和可視性

在多核處理器中,多任務被分配到多核上去處理,一個核上的更改不會便可刷新到其餘核。加上volatile關鍵字能夠保證可視性。

java能夠保證對除了long和double以外的基本類型的簡單的賦值和讀取操做的原子性。可是不能保證可視性。

若是加上了volatile關鍵字,不只能夠保證可視性,同時也能夠保證long和double的原子性。

synchronized能夠保證可視性。

若是對於基本類型(不包含long和double)執行簡單的(一句代碼的)讀寫操做(比較抽象,++就不是),能夠保證原子性和可視性。可是既然是一句話那麼不如使用sychronized,也不會耗費多少資源,並且穩定。

對原子性的誤用

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class UseAtomiWrong {

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     */

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 100; i++) {

            es.execute(new Runnable() {

                public void run() {

                    for (int i = 0; i < 100; i++) {

                        add();

                    }

                }

            });

        }

        es.execute(new Runnable(){

            public void run() {

                int i = getEvent();

                while(i%2 == 0)

                {

                    i = getEvent();

                }

                System.out.println(i);//打印出了奇數

            }});

 

        es.shutdown();

    }

 

    public volatile static int even = 0;

 

    //雖然對even加上了volatile 可是可能會讀到只執行了一次even++以後的不穩定狀態仍是要加synchronized

    public static int getEvent()

    {

        return even;

    }

    public synchronized static void add() {

        even++;

        even++;

    }

}

原子類

AtomicBooleanAtomicIntegerAtomicLongAtomicReference 的實例各自提供對相應類型單個變量的訪問和更新。每一個類也爲該類型提供適當的實用工具方法。例如,類 AtomicLong 和 AtomicInteger 提供了原子增量方法。一個應用程序將按如下方式生成序列號:

class Sequencer {

private AtomicLong sequenceNumber = new AtomicLong(0);

public long next() { return sequenceNumber.getAndIncrement(); }

}

原子訪問和更新的內存效果通常遵循如下可變規則:

get 具備讀取 volatile 變量的內存效果。

set 具備寫入(分配) volatile 變量的內存效果。

weakCompareAndSet 以原子方式讀取和有條件地寫入變量,並對於該變量上的其餘內存操做進行排序,不然將充當普通的非可變內存操做。

compareAndSet 和全部其餘的讀取和更新操做(如 getAndIncrement)都有讀取和寫入 volatile 變量的內存效果。

臨界區

Object obj = new Object();

        Lock lock = new ReentrantLock();

        //使用當前對象的默認所

        synchronized (this) {

            //代碼

        }

        //使用其餘對象的默認鎖

        synchronized (obj) {

            //代碼

        }

        //使用指定鎖

        synchronized (lock) {

            //代碼

        }

判斷ExecutorService中的全部任務是否完成

ExecutorService.isTerminated():

Returns true if all tasks have completed following shut down. Note that isTerminated is never true unless either shutdown or shutdownNow was called first.

 

ExecutorService.awaitTermination(long timeout, TimeUnit unit) throws InterruptedException:

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

 

線程狀態

新建:最初創建須要經歷的一個極其短暫的臨時狀態。

就緒:只要給了時間片就能夠運行。

阻塞:sleep wait或者等待鎖資源的時候處於的狀態。

死亡。運行完成的線程進入死亡狀態。不能夠再運行其餘任務。

關閉線程的方法:

經過interrupt來結束線程。es.shutdownnow能夠給全部線程發送中斷指令。若是須要對某一個線程中斷,可使用Future。如:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class FutureInterruptTest {

 

    public static Lock lock = new ReentrantLock();

    

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                lock.lock();

                System.out.println("locking...");

                try {

                    //鎖定5秒鐘

                    TimeUnit.SECONDS.sleep(5);

                } catch (InterruptedException e) {

                    

                }

                lock.unlock();

            }

        });

        

        Future<?> future = es.submit(new Runnable() {

            public void run() {

                try {

                    //陷入等待鎖的狀態

                    lock.lockInterruptibly();

                    lock.unlock();

                } catch (InterruptedException e) {

                    

                }

                System.out.println("done...");

            }

        });

        

        TimeUnit.SECONDS.sleep(1);

        //cancel方法若是線程時新建未運行狀態那麼就結束它若是已經運行那麼中斷它

        //若是把上面的lock.lockInterruptibly();改成lock那麼將接收不到中斷響應,直到得到鎖。

        future.cancel(true);

    }

}

 

使用volatile的boolean變量:

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class TerminateThreadUserBoolean {

 

    public static volatile boolean stop = false;

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            es.execute(new Runnable() {

                public void run() {

                    

                    

                    while(!stop)

                    {

                        System.out.println("running...");

                        try {

                            TimeUnit.SECONDS.sleep(1);

                        } catch (InterruptedException e) {

                            System.out.println("InterruptedException");

                        }

                        System.out.println("runed");

                    }

                    

                }

            });

        }

        TimeUnit.SECONDS.sleep(3);

        stop = true;

        System.out.println("try to shutdown");

        // shutdown以後才能測試awaitTermination

        es.shutdown();

        boolean success = es.awaitTermination(1, TimeUnit.SECONDS);

        if(success)

        {

            System.out.println("全部線程已經關閉..");

        }

        else {

            System.out.println("部分線程沒有關閉..");

        }

        

    }

 

}

 

中斷循環線程:

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class ThreadCloseTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-21

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                //try放在while的外層

                try {

                    //while中檢測中斷

                    while(!Thread.interrupted())

                    {

                        System.out.println("i am running");

                        TimeUnit.MILLISECONDS.sleep(500);

                    }

                } catch (InterruptedException e) {

                    

                }

                finally

                {

                    System.out.println("i am interrupted");

                }

            }

        });

        TimeUnit.SECONDS.sleep(3);

        es.shutdownNow();

    }

 

}

 

synchronized使用默認lock.lock;在等待默認鎖的過程當中不能被中斷。
interrupt

public void interrupt()

中斷線程。

public static boolean interrupted()

測試當前線程是否已經中斷。線程的中斷狀態 由該方法清除。換句話說,若是連續兩次調用該方法,則第二次調用將返回 false(在第一次調用已清除了其中斷狀態以後,且第二次調用檢驗完中斷狀態前,當前線程再次中斷的狀況除外)。

public boolean isInterrupted()

測試線程是否已經中斷。線程的中斷狀態 不受該方法的影響。

備註:若是使用執行器,在每一個任務結束以後 ,將會自動將線程的中斷狀態清除,而後再去執行下一個任務。示例代碼:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

public class InterruptTest {

 

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newSingleThreadExecutor();

        Future<?> future = es.submit(new Runnable() {

 

            public void run() {

                //只判斷不清除中斷

                while(!Thread.currentThread().isInterrupted())

                {

                    System.out.println("1st running");

                }

            }

        });

        TimeUnit.SECONDS.sleep(1);

        //發送中斷

        future.cancel(true);

        es.execute(new Runnable() {

 

            public void run() {

                while(!Thread.currentThread().isInterrupted())

                {

                    //若是中斷標誌沒有清楚將不會打印出下面代碼

                    System.out.println("2st running");

                }

            }

        });

        es.shutdown();

    }

 

}

Wait和notify

鎖和同步塊是用來解決線程互斥的問題。wait和notify是用來解決線程協做的問題。

wait和notify是針對一個鎖的,wait和notify的對象是這個鎖上的其餘等待對象。

wait方法能夠被中斷。notify不會等待,因此不須要被中斷。

sleep和yield不會釋放鎖(它們跟鎖沒有關係,只跟線程有關),而wait和notify必須放在sychronized塊內,由於跟鎖是關聯在一塊兒的。wait將釋放鎖,而後等待。

notify會喚醒在這個鎖上等待的其餘程序。可是不會釋放鎖。直到當前的線程放棄此對象上的鎖定,才能繼續執行被喚醒的線程。被喚醒的線程將以常規方式與在該對象上主動同步的其餘全部線程進行競爭;例如,喚醒的線程在獲取notify線程釋放的鎖方面沒有特權。

 

wait(long timeout):此方法致使當前線程(稱之爲 T將其自身放置在對象的等待集中,而後放棄此對象上的全部同步要求(包含放棄鎖)。出於線程調度目的,線程 T 被禁用,且處於休眠狀態,直到發生如下四種狀況之一:
  • 其餘某個線程調用此對象的 notify 方法,而且線程 T 碰巧被任選爲被喚醒的線程。
  • 其餘某個線程調用此對象的 notifyAll 方法。
  • 其餘某個線程中斷線程 T。
  • 已經到達指定的實際時間。可是,若是 timeout 爲零,則不考慮實際時間,該線程將一直等待,直到得到通知。

而後,從對象的等待集中刪除線程 T,並從新進行線程調度。而後,該線程以常規方式與其餘線程競爭,以得到在該對象上同步的權利;一旦得到對該對象的控制權,該對象上的全部其同步聲明都將被還原到之前的狀態 - 這就是調用 wait 方法時的狀況。而後,線程 T 從 wait 方法的調用中返回。因此,從 wait 方法返回時,該對象和線程 T 的同步狀態與調用 wait 方法時的狀況徹底相同。

如:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class NotifyTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-30

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        final NotifyTest t = new NotifyTest();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

 

            public void run() {

                try {

                    t.testWait();

                } catch (InterruptedException e) {

                    

                    e.printStackTrace();

                }

            }

        });

          

        

        es.execute(new Runnable() {

 

            public void run() {

                t.testNotify();

            }

        });

        

    }

 

    public synchronized void testWait() throws InterruptedException {

        System.out.println("pre wait");

        wait();

        System.out.println("after wait");

    }

 

    public synchronized void testNotify() {

        System.out.println("pre notify");

        //雖然喚醒了testWait 可是沒有釋放所資源 testWait仍然沒法運行

        notify();

        try {

            //20秒以後程序執行完畢而後釋放了鎖這時纔會輸出 "after wait"

            TimeUnit.SECONDS.sleep(20);

        } catch (InterruptedException e) {

            

            e.printStackTrace();

        }

        System.out.println("after notify");

    }

}

Wait的簡單例子

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class WaxTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-30

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        Car car = new Car();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Waxing(car));

        es.execute(new Buffering(car));

        TimeUnit.SECONDS.sleep(6);

        es.shutdownNow();

    }

 

    static class Car{

        private boolean isWaxOn = false;

        

        //通常將InterruptedException拋出到任務的run方法中去控制

        //這裏不加synchronized也能夠應爲它是在synchronizedwaxing方法中被調用的

        public synchronized void waitForWaxing() throws InterruptedException

        {

            //使用while

            while(isWaxOn == true)

            {

                wait();

            }

        }

        

        public synchronized void waitForBuffing() throws InterruptedException

        {

            //使用while

            while(isWaxOn == false)

            {

                wait();

            }

        }

          

        

        public synchronized void waxing() throws InterruptedException

        {

            waitForWaxing();

            TimeUnit.MILLISECONDS.sleep(200);

            System.out.println("waxing on");

            isWaxOn = true;

            notifyAll();

        }

        

        public synchronized void buffing() throws InterruptedException

        {

            waitForBuffing();

            TimeUnit.MILLISECONDS.sleep(200);

            System.out.println("buffing over");

            isWaxOn = false;

            notifyAll();

        }

    }

    

    //塗蠟任務

    static class Waxing implements Runnable{

 

        private Car car;

        

        public Waxing(Car car) {

            super();

            this.car = car;

        }

 

        public void run() {

            try {

                while(!Thread.interrupted())

                {

                    car.waxing();

                }

            } catch (InterruptedException e) {

                

            }

        }

        

    }

    

    //拋光任務

    static class Buffering implements Runnable{

 

        private Car car;

        

        public Buffering(Car car) {

            super();

            this.car = car;

        }

 

        public void run() {

            try {

                while(!Thread.interrupted())

                {

                    car.buffing();

                }

            } catch (InterruptedException e) {

                

            }

        }

    }

    

}

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class RestanurantTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-31

     * @use

     * @param args

     */

    public static void main(String[] args) {

        new Restanurant();

    }

 

    static class Meal {

        private int orderNumber;

 

        public Meal(int orderNumber) {

            super();

            this.orderNumber = orderNumber;

        }

 

        @Override

        public String toString() {

 

            return "Meal:" + orderNumber;

        }

    }

 

    static class Restanurant {

        Meal meal;

        WaitPerson waiter = new WaitPerson(this);

        Chef chef = new Chef(this);

        ExecutorService es = Executors.newCachedThreadPool();

 

        public Restanurant() {

            es.execute(waiter);

            es.execute(chef);

            es.shutdown();

        }

    }

 

    static class WaitPerson implements Runnable {

 

        private Restanurant restanurant;

 

        public WaitPerson(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    //使用共有的restanurant來控制同步

                    synchronized (restanurant) {

                        while (restanurant.meal == null) {

                            //由於是synchronizedrestanurant 因此wait方法也是調用restanurant

                            restanurant.wait();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = null;

                        System.out.println("服務員上餐結束!");

                        restanurant.notifyAll();

                    }

 

                }

            } catch (InterruptedException e) {

 

            }

 

        }

 

    }

 

    static class Chef implements Runnable {

 

        private Restanurant restanurant;

 

        public Chef(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        private int orderNumber = 0;

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    synchronized (restanurant) {

                        while (restanurant.meal != null) {

                            restanurant.wait();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = new Meal(++orderNumber);

                        System.out.println("廚師作飯完畢!");

                        restanurant.notifyAll();

                        if(orderNumber >= 10)

                        {

                            restanurant.es.shutdownNow();

                        }

                    }

                }

            } catch (InterruptedException e) {

 

            }

        }

 

    }

}

 

使用顯式的condition

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class RestanurantTest1 {

 

    /**

     * @author zjf

     * @create_time 2013-12-31

     * @use

     * @param args

     */

    public static void main(String[] args) {

        new Restanurant();

    }

 

    static class Meal {

        private int orderNumber;

 

        public Meal(int orderNumber) {

            super();

            this.orderNumber = orderNumber;

        }

 

        @Override

        public String toString() {

 

            return "Meal:" + orderNumber;

        }

    }

 

    static class Restanurant {

        Lock mealLock = new ReentrantLock();

        Condition condition = mealLock.newCondition();

        Meal meal;

        WaitPerson waiter = new WaitPerson(this);

        Chef chef = new Chef(this);

        ExecutorService es = Executors.newCachedThreadPool();

 

        public Restanurant() {

            es.execute(waiter);

            es.execute(chef);

            es.shutdown();

        }

    }

 

    static class WaitPerson implements Runnable {

 

        private Restanurant restanurant;

 

        public WaitPerson(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    restanurant.mealLock.lockInterruptibly();

                        while (restanurant.meal == null) {

                            restanurant.condition.await();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = null;

                        System.out.println("服務員上餐結束!");

                        restanurant.condition.signalAll();

                        restanurant.mealLock.unlock();

 

                }

            } catch (InterruptedException e) {

 

            }

 

        }

 

    }

 

    static class Chef implements Runnable {

 

        private Restanurant restanurant;

 

        public Chef(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        private int orderNumber = 0;

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    restanurant.mealLock.lockInterruptibly();

                        while (restanurant.meal != null) {

                            restanurant.condition.await();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = new Meal(++orderNumber);

                        System.out.println("廚師作飯完畢!");

                        restanurant.condition.signalAll();

                        if(orderNumber >= 10)

                        {

                            restanurant.es.shutdownNow();

                        }

                        restanurant.mealLock.unlock();

                }

            } catch (InterruptedException e) {

 

            }

        }

 

    }

}

BlockingQueue

接口 BlockingQueue<E>

E take()

throws InterruptedException

檢索並移除此隊列的頭部,若是此隊列不存在任何元素,則一直等待。

void put(E o)

throws InterruptedException

將指定元素添加到此隊列尾部,若是沒有可用空間,將一直等待(若是有必要)。

實現類:ArrayBlockingQueue<E> 固定數目, LinkedBlockingQueue<E>不固定數目。

20140224 週一

java編程思想 第21章 併發

 

使用管道在線程間傳遞數據

package thread;

 

import java.io.IOException;

import java.io.PipedReader;

import java.io.PipedWriter;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class PipedReaderTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-24

     * @use

     * @param args

     * @throws IOException

     * @throws InterruptedException

     */

    public static void main(String[] args) throws IOException,

            InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        final PipedWriter writer = new PipedWriter();

        final PipedReader reader = new PipedReader();

        //兩個關聯上

        writer.connect(reader);

        es.execute(new Runnable() {

            public void run() {

                char[] strs = "my name is zjf".toCharArray();

                try {

                    for (char c : strs) {

                        writer.write(c);

                        TimeUnit.MILLISECONDS.sleep(300);

                    }

 

                } catch (IOException e) {

 

                    e.printStackTrace();

                } catch (InterruptedException e) {

 

                    e.printStackTrace();

                } finally {

                    try {

                        //輸出完成後關閉writer

                        writer.close();

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

        });

 

        es.execute(new Runnable() {

 

            public void run() {

                try {

                    while (!Thread.interrupted()) {

                        int c;

                        //writer關閉以後將會獲取-1 循環被終止陷入外層interruptedwhile循環中

                        while ((c = reader.read()) != -1) {

                            System.out.println((char) c);

                        }

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        });

 

        //5秒後發送中斷指令

        TimeUnit.SECONDS.sleep(5);

        es.shutdownNow();

    }

}

 

20140225 週二

java編程思想 第21章 併發

CountDownLatch

用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier

CountDownLatch 是一個通用同步工具,它有不少用途。將計數 1 初始化的 CountDownLatch 用做一個簡單的開/關鎖存器,或入口:在經過調用 countDown() 的線程打開入口前,全部調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可使一個線程在 N 個線程完成某項操做以前一直等待,或者使其在某項操做完成 N 次以前一直等待。

 

示例用法: 下面給出了兩個類,其中一組 worker 線程使用了兩個倒計數鎖存器:

第一個類是一個啓動信號,在 driver 爲繼續執行 worker 作好準備以前,它會阻止全部的 worker 繼續執行。

第二個類是一個完成信號,它容許 driver 在完成全部 worker 以前一直等待。

class Driver { // ...

void main() throws InterruptedException {

CountDownLatch startSignal = new CountDownLatch(1);

CountDownLatch doneSignal = new CountDownLatch(N);

 

for (int i = 0; i < N; ++i) // create and start threads

new Thread(new Worker(startSignal, doneSignal)).start();

 

doSomethingElse(); // don't let run yet

startSignal.countDown(); // let all threads proceed

doSomethingElse();

doneSignal.await(); // wait for all to finish

}

}

 

class Worker implements Runnable {

private final CountDownLatch startSignal;

private final CountDownLatch doneSignal;

Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {

this.startSignal = startSignal;

this.doneSignal = doneSignal;

}

public void run() {

try {

startSignal.await();

doWork();

doneSignal.countDown();

} catch (InterruptedException ex) {} // return;

}

 

void doWork() { ... }

}

 

 

另外一種典型用法是,將一個問題分紅 N 個部分,用執行每一個部分並讓鎖存器倒計數的 Runnable 來描述每一個部分,而後將全部 Runnable 加入到 Executor 隊列。當全部的子部分完成後,協調線程就可以經過 await。(當線程必須用這種方法反覆倒計數時,可改成使用 CyclicBarrier。)

class Driver2 { // ...

void main() throws InterruptedException {

CountDownLatch doneSignal = new CountDownLatch(N);

Executor e = ...

 

for (int i = 0; i < N; ++i) // create and start threads

e.execute(new WorkerRunnable(doneSignal, i));

 

doneSignal.await(); // wait for all to finish

}

}

 

class WorkerRunnable implements Runnable {

private final CountDownLatch doneSignal;

private final int i;

WorkerRunnable(CountDownLatch doneSignal, int i) {

this.doneSignal = doneSignal;

this.i = i;

}

public void run() {

try {

doWork(i);

doneSignal.countDown();

} catch (InterruptedException ex) {} // return;

}

 

void doWork() { ... }

}

 

 

個人例子:

 

package thread;

 

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class CountDownLatchTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-25

     * @use計算1 + 2 + 3 + ... + 1000000000

     * @param args

     */

    public static void main(String[] args) {

        long startTime = System.currentTimeMillis();

        long sum = sum(1, 1000000000, 10);

        long endTime = System.currentTimeMillis();

        System.out.println("計算結果是" + sum + ",耗時" + (endTime - startTime) + "毫秒!");

    }

 

    public static long sum(int start, int end, int concurrentSize) {

        ConcurrentSumer sumer = new ConcurrentSumer(start, end, concurrentSize);

        return sumer.sum();

    }

}

 

class ConcurrentSumer {

    private long sum = 0;

    private int start;

    private int end;

    private int concurrentSize;

    CountDownLatch countDownLatch;

    

    public ConcurrentSumer(int start, int end, int concurrentSize) {

        super();

        this.start = start;

        this.end = end;

        this.concurrentSize = concurrentSize;

        countDownLatch = new CountDownLatch(concurrentSize);

    }

    

    private synchronized void addSum(long add)

    {

        sum += add;

    }

    

    public long sum() {

        ExecutorService es = Executors.newCachedThreadPool();

        int extend = (end - start)/concurrentSize +1;

        while(start <= end)

        {

            es.execute(new SumTask(start,(start + extend) > end ? end : (start + extend)));

            start = start + extend + 1;

        }

        es.shutdown();

        try {

            //等待全部任務完成

            countDownLatch.await();

            System.out.println("全部任務已經完成...");

        } catch (InterruptedException e) {

            //若是沒有等到全部任務都完成就被中斷那麼返回0

            sum = 0;

        }

        return sum;

    }

 

    class SumTask implements Runnable {

        private int st;

        private int ed;

        

        public SumTask(int st, int ed) {

            super();

            this.st = st;

            this.ed = ed;

        }

 

        public void run() {

            long s = 0;

            for(int i = st; i <= ed; i++ )

            {

                s += i;

            }

            addSum(s);

            System.out.println("一個線程已經完成...");

            countDownLatch.countDown();

        }

    }

 

}

 

CyclicBarrier

cyclic ['saiklik]

adj.

1. 週期的,構成周期的

2. 循環的,輪轉的;往復運動的

 

barrier ['bæriə]

n.

1. (阻礙通道的)障礙物,屏障(如柵欄、擋板、擋牆、壁壘、障壁、十字轉門等)

 

 

一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。

public CyclicBarrier(int parties,
Runnable barrierAction)

建立一個新的 CyclicBarrier

parties:並行運行的任務數。

barrierAction:每當並行任務的任務調用的barrier的await方法的次數到達parties此的時候,那麼barrierAction方法將會被執行一次,此時全部的並行任務處於等待狀態,等待barrierAction執行完畢,全部卡在await方法的並行任務得以繼續執行。

public int await()
throws InterruptedException,
BrokenBarrierException

在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。

 

賽馬例子:

package thread;

 

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

/**

*

* @author zjf

* @create_time 2014-2-26

* @use賽馬模擬小程序

*/

public class CyclicBarrierTest {

    public static void main(String[] args) {

        //賽馬數量

        int horseCount = 10;

        //目標距離

        final int targetLine = 20;

        final List<Horse> horses = new ArrayList<Horse>();

        final ExecutorService es = Executors.newCachedThreadPool();

        //每一組馬匹走完一步以後統計是否已經到達終點

        CyclicBarrier barrier = new CyclicBarrier(horseCount, new Runnable() {

            public void run() {

                System.out.println("");

                for (Horse horse : horses) {

                    if (horse.getComplete() >= targetLine) {

                        System.out.println(horse + " wone!");

                        //若是有到達終點的發送中斷

                        es.shutdownNow();

                        break;

                    }

 

                }

            }

        });

 

        for (int i = 0; i < horseCount; i++) {

            Horse horse = new Horse(i, barrier);

            horses.add(horse);

            es.execute(horse);

        }

    }

}

 

class Horse implements Runnable {

    private int id;

    private CyclicBarrier barrier;

    private int complete = 0;

    private Random random = new Random();

 

    public synchronized int getComplete() {

        return complete;

    }

 

    public synchronized void oneStep() {

        //模擬一步的距離

        complete += random.nextInt(3);

        System.out.print(this + " : " + complete + "--");

    }

 

    public Horse(int i, CyclicBarrier barrier) {

        this.id = i;

        this.barrier = barrier;

    }

 

    public void run() {

        try {

            while (!Thread.interrupted()) {

                oneStep();

                //每執行一步以後等待並行的都執行完await以後才能繼續執行

                barrier.await();

                TimeUnit.MILLISECONDS.sleep(300);

            }

        } catch (InterruptedException e) {

 

        } catch (BrokenBarrierException e) {

 

        }

    }

 

    @Override

    public String toString() {

        return "horse" + id;

    }

}

 

20140226 週三

java編程思想 第21章 併發

PriorityBlockingQueue

一個無界的阻塞隊列,它使用與類 PriorityQueue 相同的順序規則,而且提供了阻塞檢索的操做。雖然此隊列邏輯上是無界的,可是因爲資源被耗盡,因此試圖執行添加操做可能會失敗(致使 OutOfMemoryError)。此類不容許使用 null 元素。

 

例子:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.PriorityBlockingQueue;

import java.util.concurrent.TimeUnit;

 

public class PriorityBlockingQueueTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-26

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        //Task類實現了Comparable接口,按照排序來決定優先級。

        final PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<Task>();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                try {

                    while(!Thread.interrupted())

                    {

                        Task task = taskQueue.take();

                        System.out.println(task);

                        //每隔200秒取出一個

                        TimeUnit.MILLISECONDS.sleep(200);

                    }

                } catch (InterruptedException e) {

                    

                }

            }

        });

        for (final TASKLEVLE taskLevel : TASKLEVLE.values()) {

            es.execute(new Runnable() {

                public void run() {

                    try {

                        for (int i = 0; i < 5; i++) {

                            //5個任務每隔300秒放入一個

                            taskQueue.put(new Task(taskLevel, i));

                            TimeUnit.MILLISECONDS.sleep(300);

                        }

                    } catch (Exception e) {

 

                    }

                }

            });

        }

        

        TimeUnit.SECONDS.sleep(10);

        es.shutdownNow();

    }

 

}

/*

* Task要實現Comparable接口

*/

class Task implements Comparable<Task> {

    private TASKLEVLE taskLevel = TASKLEVLE.MIDDLE;

 

    private final long id;

 

    public Task(TASKLEVLE taskLevel, long id) {

        super();

        this.taskLevel = taskLevel;

        this.id = id;

    }

 

    public int compareTo(Task o) {

        return o.taskLevel.compareTo(taskLevel);

    }

 

    @Override

    public String toString() {

        return "task-" + id + "level-" + taskLevel;

    }

 

}

 

enum TASKLEVLE {

    LOW, MIDDLE, HIGH, SUPER

};

併發

優先權
  • 線程的"優先權"priority)能告訴調度程序其重要性如何。儘管處理器處理現有線程集的順序是不肯定的,可是若是有許多線程被阻塞並在等待運行,那麼調度程序將傾向於讓優先權最高的線程先執行。然而,這並非意味着優先權較低的線程將得不到執行(也就是說,優先權不會致使死鎖)。優先級較低的線程僅僅是執行的頻率較低。
  • 對於已存在的線程,你能夠用getPriority( )方法獲得其優先權,也能夠在任什麼時候候使用setPriority( )方法更改其優先權
  • 儘管JDK10個優先級別,但它與多數操做系統都不能映射得很好。好比,Windows 20007個優先級且不是固定的,因此這種映射關係也是不肯定的(儘管SunSolaris231個優先級)。惟一可移植的策略是當你調整優先級的時候,只使用MAX_PRIORITYNORM_PRIORITY,和MIN_PRIORITY三種級別。
來自JDK API

線程 是程序中的執行線程。Java 虛擬機容許應用程序併發地運行多個執行線程。

每一個線程都有一個優先級,高優先級線程的執行優先於低優先級線程。每一個線程均可以或不能夠標記爲一個守護程序。當某個線程中運行的代碼建立一個新 Thread 對象時,該新線程的初始優先級被設定爲建立線程的優先級,而且當且僅當建立線程是守護線程時,新線程纔是守護程序。

當 Java 虛擬機啓動時,一般都會有單個非守護線程(它一般會調用某個指定類的 main 方法)。Java 虛擬機會繼續執行線程,直到下列任一狀況出現時爲止: 調用了 Runtime 類的 exit 方法,而且安全管理器容許退出操做發生。

非守護線程的全部線程都已中止運行,不管是經過從對 run 方法的調用中返回,仍是經過拋出一個傳播到 run 方法以外的異常。

守護線程

public final void setDaemon(boolean on)

將該線程標記爲守護線程或用戶線程。當正在運行的線程都是守護線程時,Java 虛擬機退出。

該方法必須在啓動線程前調用。

Thread和Runnable
  • 你的類也許已經繼承了其它的類,在這種狀況下,就不可能同時繼承ThreadJava並不支持多重繼承)。這時,你可使用"實現Runnable接口"的方法做爲替代。
  • Thread也是從Runnable接口實現而來的。
  • Runnable類型的類只需一個run( )方法,可是若是你想要對這個Thread對象作點別的事情(好比在toString( )裏調用getName( )),那麼你就必須經過調用Thread.currentThread( )方法明確獲得對此線程的引用。
相關文章
相關標籤/搜索