使用LinkedBlockingQueue來實現生產者消費者的例子

工做中,常常有將文件中的數據導入數據庫的表中,或者將數據庫表中的記錄保存到文件中。爲了提升程序的處理速度,能夠設置讀線程和寫線程,這些線程經過消息隊列進行數據交互。本例就是使用了LinkedBlockingQueue來模仿生產者線程和消費者線程進行數據生產和消費。
爲了方便,這些不一樣的類被寫在了一個類中,實際使用的時候,能夠單獨拆開,觸類旁通地使用。java

如下是例子:數據庫

LinkedBlockingQueueDemo.java安全

 

import java.util.Date;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
 
public class LinkedBlockingQueueDemo {
    // 生產者線程數量
    private final static int providerThreadAmount = 5;
 
    // 記錄每個生產者線程是否處理完畢的標記
    private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];
 
    // 整個全部的生產者線程所有結束的標記
    private static boolean done = false;
 
    // 一個線程安全的隊列,用於生產者和消費者異步地信息交互
    private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();
 
    static class ProviderThread extends Thread {
        private Thread thread;
        private String threadName;
        private int threadNo;
 
        public ProviderThread(String threadName2, int threadNo) {
            this.threadName = threadName2;
            this.threadNo = threadNo;
        }
 
        public void start() {
            if (thread == null) {
                thread = new Thread(this, threadName);
            }
 
            thread.start();
            System.out.println(
                    (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());
        }
 
        @Override
        public void run() {
            int rows = 0;
            for (int i = 0; i < 100; i++) {
                String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());
                // offer不會去阻塞線程,put會
                //linkedBlockingQeque.offer(string);
                linkedBlockingQeque.put(string);
                rows++;
                /*
                 * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
                 * (InterruptedException e) { e.printStackTrace(); }
                 */
            }
 
            // 本線程處理完畢的標記
            LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;
            System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"
                    + Thread.currentThread().getName());
        }
    }
 
    static class ConsumerThread implements Runnable {
        private Thread thread;
        private String threadName;
 
        public ConsumerThread(String threadName2) {
            this.threadName = threadName2;
        }
 
        public void start() {
            if (thread == null) {
                thread = new Thread(this, threadName);
            }
 
            thread.start();
            System.out.println(
                    (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());
        }
 
        @Override
        public void run() {
            int rows = 0;
            // 生產者線程沒有結束,或者消息隊列中有元素的時候,去隊列中取數據
            while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) {
                try {
                    //在甘肅電信的實際應用中發現,當數據的處理量達到千萬級的時候,帶參數的poll會將主機的幾百個G的內存耗盡,jvm會提示申請內存失敗,並將進程退出。網上說,這是這個方法的一個bug。
                    //String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);
                    String string = linkedBlockingQeque.poll();
                    if (string == null) {
                        continue;
                    }
 
                    rows++;
 
                    System.out
                            .println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "
                                    + string + "\t" + Thread.currentThread().getName());
                    /*
                     * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
                     * (InterruptedException e) { e.printStackTrace(); }
                     */
 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
            }
            System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"
                    + Thread.currentThread().getName());
        }
    }
 
    public static synchronized void setDone(boolean flag) {
        LinkedBlockingQueueDemo.done = flag;
    }
    
    public static synchronized boolean getDone() {
        return LinkedBlockingQueueDemo.done;
    }
 
    public static void main(String[] args) {
        System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());
        System.out.println(
                (new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());
 
        // 啓動若干生產者線程
        for (int i = 0; i < providerThreadAmount; i++) {
            String threadName = String.format("%s-%d", "ProviderThread", i);
            ProviderThread providerThread = new ProviderThread(threadName, i);
            providerThread.start();
        }
 
        // 啓動若干個消費者線程
        for (int i = 0; i < 10; i++) {
            String threadName = String.format("%s-%d", "ConsumerThread", i);
            ConsumerThread consumerThread = new ConsumerThread(threadName);
            consumerThread.start();
        }
 
        // 循環檢測生產者線程是否處理完畢
        do {
            for (boolean b : providerDoneFlag) {
                if (b == false) {
                    /*
                     * try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +
                     * " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque.
                     * size() + "\t" + Thread.currentThread().getName()); } catch
                     * (InterruptedException e) { e.printStackTrace(); }
                     */
 
                    // 只要有一個生產者線程沒有結束,則整個生產者線程檢測認爲沒有結束
                    break;
                }
 
                LinkedBlockingQueueDemo.setDone(true);
            }
 
            // 生產者線程所有結束的時候,跳出檢測
            if (LinkedBlockingQueueDemo.getDone() == true) {
                break;
            }
        } while (true);
 
        System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());
    }
}

 

 

結果略。dom

相關文章
相關標籤/搜索