併發隊列之:BlockingQueue和ConcurrentLinkedQueue

一.並行和併發區別:

  • 並行:是指二者同時執行一件事。好比賽跑,兩我的都在不停的往前跑;

  • 併發:是指資源有限的狀況下,二者交替輪流使用資源。好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。

二.什麼叫線程安全

  • 線程安全就是說多線程訪問同一代碼,不會產生不肯定的結果。

  • 反之線程不安全就是,多線程在訪問同一代碼時,會發生不肯定因素,例如死鎖,數據不一致性等。

三.LinkedBlockingQueue

   LinkedBlockingQueue是一個線程安全的阻塞隊列,它實現了BlockingQueue接口,BlockingQueue接口繼承自java.util.Queue接口,並在這個接口的基礎上增長了take和put方法,這兩個方法正是隊列操做的阻塞版本。

因爲LinkedBlockingQueue實現是線程安全的,實現了先進先出等特性,是做爲生產者消費者的首選;LinkedBlockingQueue 能夠指定容量,也能夠不指定,不指定的話,默認最大是Integer.MAX_VALUE;其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。java

示例代碼:apache

package com.lky.test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.junit.Test;

/**
 * @Title: testBlockQueue.java
 * @Package com.lky.test
 * @Description:多線程模擬實現生產者消費者模型(阻塞隊列)
 * @author lky
 * @date 2015年10月24日 下午5:08:01
 * @version V1.0
 */
public class testBlockQueue {

    private Log log = LogFactory.getLog(testBlockQueue.class);

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定義阻塞隊列
     * @author lky
     * @date 2015年10月24日 下午5:07:28
     * @version V1.0
     */
    public class Basket {
        // 隊列的最大容量爲3
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 若是隊列不滿,則放入,不然阻塞等待
        public void produce(String apple) throws InterruptedException {
            basket.put(apple);
        }

        // 若是隊列不爲空,則取出,不然阻塞等待
        public String consumer() throws InterruptedException {
            return basket.take();
        }
    }

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定義生產者
     * @author lky
     * @date 2015年10月24日 下午5:18:17
     * @version V1.0
     */
    public class Produce implements Runnable {
        private Basket basket;
        private String fruit;

        public Produce(String fruit, Basket basket) {
            this.basket = basket;
            this.fruit = fruit;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    log.info("[" + Thread.currentThread().getName() + "]" + "開始生產apple----->" + this.fruit);
                    basket.produce(fruit);
                    log.info("apple生產完畢!!!!");
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                log.error("生產蘋果異常!!!!!");
            }
        }
    }

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定義消費者
     * @author lky
     * @date 2015年10月24日 下午5:24:31
     * @version V1.0
     */
    public class Consumer implements Runnable {
        private Basket basket;

        public Consumer(Basket basket) {
            this.basket = basket;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    String fruit = basket.consumer();
                    log.info("[" + Thread.currentThread().getName() + "]" + "取到一個水果: " + fruit);
                    Thread.sleep(1000);
                }

            } catch (Exception e) {
                log.error("消費者取蘋果異常!!!!");
            }
        }
    }

    @Test
    public void test() {
        System.out.println(Runtime.getRuntime().availableProcessors());//獲取當前系統的cpu數目
        
        Basket basket = new Basket();
        Produce produce1 = new Produce("apple", basket);
        Produce produce2 = new Produce("banna", basket);
        Consumer consumer = new Consumer(basket);

        // 新建一個線程池
        ExecutorService service = Executors.newCachedThreadPool();

        service.submit(produce1);
        service.submit(produce2);
        service.submit(consumer);

        try {
            Thread.sleep(20000);
        } catch (Exception e) {
            log.error("程序異常錯誤!!!!");
        }
        service.shutdown();
    }
}

四 .ConcurrentLinkedQueue

     ConcurrentLinkedQueue是Queue的一個安全實現.Queue中元素按FIFO原則進行排序.採用CAS操做,來保證元素的一致性。

示例代碼:安全

package com.lky.test;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* @Title: testNBlockQueue.java 
* @Package com.lky.test 
* @Description: 多線程模擬實現生產者消費者模型(非阻塞式隊列)
* @author lky 
* @date 2015年10月24日 下午8:02:14 
* @version V1.0
 */
public class testNBlockQueue {
    private static Log log = LogFactory.getLog(testNBlockQueue.class);
    private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

    private static int count = 2;
    private static CountDownLatch latch = new CountDownLatch(count);

    private static class Poll implements Runnable {
        @Override
        public void run() {
                while (!queue.isEmpty()) {
                    log.info(Thread.currentThread().getName() + "消費一個商品: " + queue.poll());
                }
                latch.countDown();
        }
    }

    public static void main(String args[]) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService eService = Executors.newFixedThreadPool(4);

        // 生產商品
        for (int i = 0; i < 100000; ++i) {
            queue.offer(i);
        }

        // 消費者
        for (int i = 0; i < count; ++i) {
            eService.submit(new Poll());
        }

        latch.await();// 使得主線程阻塞,直到latch.getCount()爲0
        System.out.println("Cost time: " + (System.currentTimeMillis() - timeStart));
        eService.shutdown();
    }
}
      

     總結:在Java多線程應用中,隊列的使用率很高,多數生產消費模型的首選數據結構就是隊列(先進先出)。Java提供的線程安全的Queue能夠分爲阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際須要選用阻塞隊列或者非阻塞隊列。

相關文章
相關標籤/搜索