04.Java多線程併發庫API使用3

1.java5的Semaphere同步工具

Semaphore能夠維護當前訪問自身的線程個數,並提供了同步機制。使用Semaphore能夠控制同時訪問資源的線程個數,例如,實現一個文件容許的併發訪問數。java

Semaphore實現的功能就相似銀行有6個窗口,12我的有業務要操做,那麼同時只能有6我的佔用窗口,當有的人業務操做完畢以後,讓開位置,其它等待的人羣中,有一人能夠佔用當前窗口,操做本身的業務。併發

另外等待的5我的中能夠是隨機得到優先機會,也能夠是按照先來後到的順序得到機會,這取決於構造Semaphore對象時傳入的參數選項。dom

單個信號量的Semaphore對象能夠實現互斥鎖的功能,而且能夠是由一個線程得到了「鎖」,再由另外一個線程釋放「鎖」,這可應用於死鎖恢復的一些場合。ide

package com.chunjiangchao.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 同步信號量的使用
 * @author chunjaingchao
 * 隨時能夠調整Semaphore中可併發的數量
 */
public class SemaphoreDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(1);//容許的併發數
        for(int i=0;i<10;i++){
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"正在執行,當前已經有"+(1-semaphore.availablePermits())+"個併發");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    semaphore.release();
                    
                }
                
            });
        }
    }

}

2.CyclicBarrier同步工具

表示你們彼此等待,你們集合好後纔開始出發,分散活動後又在指定地點集合碰面,這就比如整個公司的人員利用週末時間集體郊遊同樣,先各自從家出發到公司集合後,再同時出發到公園遊玩,在指定地點集合後再同時開始就餐,…。工具

一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。ui

package com.chunjiangchao.thread;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CyclicBarrier的使用
 * @author chunjiangchao
 *
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        final int threadNum = 5;
        ExecutorService threadPool = Executors.newCachedThreadPool();
        final CyclicBarrier cb = new CyclicBarrier(threadNum);
        for(int i=0;i<threadNum;i++){
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("線程"
                                + Thread.currentThread().getName()
                                + "運行到Barrier1,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達,"
                                + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊"
                                        : "正在等候"));
                        cb.await();//障礙點1:當前線程在await這個障礙地點停頓,等着其它線程運行到這
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("線程"
                                + Thread.currentThread().getName()
                                + "運行到Barrier2,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達,"
                                + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊"
                                        : "正在等候"));
                        cb.await();//障礙點2:
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("線程"
                                + Thread.currentThread().getName()
                                + "運行到Barrier3,已有"+ (cb.getNumberWaiting() + 1)+ "個已經到達,"
                                + (cb.getNumberWaiting() == threadNum-1 ? "都到齊了,繼續走啊"
                                        : "正在等候"));
                        cb.await();//障礙點3:
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
                
            });
        }
        //正在執行的任務接着執行,後續不容許添加任務
        threadPool.shutdown();
    }

}

3.java5的CountDownLatch同步工具

一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。this

猶如倒計時計數器,調用CountDownLatch對象的countDown方法就將計數器減1,當計數到達0時,則全部等待者或單個等待者開始執行。spa

能夠實現一我的(也能夠是多我的)等待其餘全部人都來通知他,這猶如一個計劃須要多個領導都簽字後才能繼續向下實施。還能夠實現一我的通知多我的的效果,相似裁判一聲口令,運動員同時開始奔跑。用這個功能作百米賽跑的遊戲程序不錯哦!線程

package com.chunjiangchao.thread;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * 建立這個CountDownLatch對象的時候,會傳入計數器個數,當前線程調用await方法進行等待其它線程的操做,
 * 當其餘線程操做計數器的值直到0的時候,纔會繼續執行後續操做
 * @author chuangjiangchao
 *
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        for(int i=0;i<3;i++){
            new WorkThread(countDownLatch).start();
        }
        System.out.println("老大在這兒等着");
        countDownLatch.await();
        System.out.println("大家都跑完了,該我走人了");
        
    }
    private static class WorkThread extends Thread{
        private CountDownLatch countDownLatch;
        public WorkThread(CountDownLatch countDownLatch){
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            System.out.println("執行耗時的操做");
            try {
                Thread.sleep(100*new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            System.out.println(Thread.currentThread().getName()+"跑完任務,計數器值改變");
        }
    }

}

4.java5的Exchanger同步工具

用於實現兩個線程之間的數據交換,每一個線程在完成必定的事務後想與對方交換數據,第一個先拿出數據的線程將一直等待第二個線程拿着數據到來時,才能彼此交換數據。code

誰先到達,誰就等待另一個線程到達,而後開始交換數據。最後執行各自的動做。

package com.chunjiangchao.thread;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.Exchanger;

/**
 * 兩個線程之間的數據交換
 * @author chunjiangchao
 *
 */
public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<String>();//經過這玩意兒來交換數據
//        ExecutorService threadPool = Executors.newCachedThreadPool();
//        threadPool.execute(command);
        new ThreadA(exchanger).start();
        new ThreadB(exchanger).start();
    }
    private static class ThreadA extends Thread{
        private Exchanger<String> exchanger;
        public ThreadA(Exchanger<String> exchanger){
            this.exchanger = exchanger;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"執行以前"+new Date().toLocaleString());
                Thread.sleep(new Random().nextInt(100)*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println(Thread.currentThread().getName()+"開始交換數據"+new Date().toLocaleString());
                String exchange = exchanger.exchange("A的數據");//給我等着,直到須要交換數據的線程B到來
                System.out.println(Thread.currentThread().getName()+"交換後獲得的數據:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }
    private static class ThreadB extends Thread{
        private Exchanger<String> exchanger;
        public ThreadB(Exchanger<String> exchanger){
            this.exchanger = exchanger;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"執行以前"+new Date().toLocaleString());
                Thread.sleep(new Random().nextInt(100)*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println(Thread.currentThread().getName()+"開始交換數據"+new Date().toLocaleString());
                String exchange = exchanger.exchange("B的數據");
                System.out.println(Thread.currentThread().getName()+"交換後獲得的數據:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }

}

 

5.java5阻塞隊列的應用(ArrayBlockingQueue)

阻塞隊列與Semaphore有些類似,但也不一樣,阻塞隊列是一方存放數據,另外一方釋放數據,Semaphore一般則是由同一方設置和釋放信號量(主要控制訪問資源的線程數)。

ArrayBlockingQueue :只有put方法和take方法才具備阻塞功能。

package com.chunjiangchao.thread;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * 兩個生產者,一個消費者
 * @author chunjiangchao
 *
 */
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
        for(int i=0;i<2;i++){
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(3000);
                            int nextInt = new Random().nextInt(100);
                            System.out.println(Thread.currentThread().getName()+"添加數據爲:"+nextInt);
                            queue.put(nextInt);//當前操做與take操做是阻塞的
                            System.out.println("當前數據個數"+queue.size());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }

        new Thread(new Runnable() {
            
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName()+"獲取數據");
                        Integer take = queue.take();//取數據,若是queue裏面沒有數據,就會一直等,等queue裏面存放數據了,纔會執行後續的代碼
                        System.out.println("獲取到的數據爲"+take+" ,當前數據個數"+queue.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    
    }

}

問題:用兩個具備1個空間的隊列來實現同步通知的功能

package com.chunjiangchao.thread;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 使用BlockingQueue實現同步通知的功能
 * @author zzb
 *
 */
public class BlockingQueueCommunicationDemo {

    public static void main(String[] args) {
        final Business business = new Business();
        Thread threadMain = new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        business.main();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        });
        threadMain.setName("threadMain");
        threadMain.start();
        Thread threadSub = new Thread(new Runnable(){
            
            @Override
            public void run() {
                while(true){
                    try {
                        business.sub();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        });
        threadSub.setName("threadSub");
        threadSub.start();
        /*
         * 你一下我一下,你一下,我一下
             threadMain執行耗時操做
            threadSub執行耗時操做
            threadMain執行耗時操做
            threadSub執行耗時操做
            threadMain執行耗時操做
            threadSub執行耗時操做
 
         */
    }
    private static class Business{
        private static ArrayBlockingQueue<Integer> mainQueue = new ArrayBlockingQueue<>(1);
        private static ArrayBlockingQueue<Integer> subQueue = new ArrayBlockingQueue<>(1);
        static{
            try {
                mainQueue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public void sub() throws InterruptedException{
            subQueue.take();//sub在subQueue取出數據,取不到數據,就等着
            System.out.println(Thread.currentThread().getName()+"執行耗時操做");
            Thread.sleep(3000);
            mainQueue.put(1);//存放數據到subQueue,方便
        }
        public void main() throws InterruptedException{
            mainQueue.take();//main在mainQueue取出數據,取不到數據,就等着
            System.out.println(Thread.currentThread().getName()+"執行耗時操做");
            Thread.sleep(3000);
            subQueue.put(1);//存放數據到subQueue,方便
        }
    }

}

 

未完待續……

相關文章
相關標籤/搜索