工做中,常常有將文件中的數據導入數據庫的表中,或者將數據庫表中的記錄保存到文件中。爲了提升程序的處理速度,能夠設置讀線程和寫線程,這些線程經過消息隊列進行數據交互。本例就是使用了LinkedBlockingQueue來模仿生產者線程和消費者線程進行數據生產和消費。
爲了方便,這些不一樣的類被寫在了一個類中,實際使用的時候,能夠單獨拆開,觸類旁通地使用。java
如下是例子:數據庫
LinkedBlockingQueueDemo.java安全
import java.util.Date; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class LinkedBlockingQueueDemo { // 生產者線程數量 private final static int providerThreadAmount = 5; // 記錄每個生產者線程是否處理完畢的標記 private static boolean[] providerDoneFlag = new boolean[providerThreadAmount]; // 整個全部的生產者線程所有結束的標記 private static boolean done = false; // 一個線程安全的隊列,用於生產者和消費者異步地信息交互 private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>(); static class ProviderThread extends Thread { private Thread thread; private String threadName; private int threadNo; public ProviderThread(String threadName2, int threadNo) { this.threadName = threadName2; this.threadNo = threadNo; } public void start() { if (thread == null) { thread = new Thread(this, threadName); } thread.start(); System.out.println( (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); } @Override public void run() { int rows = 0; for (int i = 0; i < 100; i++) { String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName()); // offer不會去阻塞線程,put會 //linkedBlockingQeque.offer(string); linkedBlockingQeque.put(string); rows++; /* * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch * (InterruptedException e) { e.printStackTrace(); } */ } // 本線程處理完畢的標記 LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true; System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t" + Thread.currentThread().getName()); } } static class ConsumerThread implements Runnable { private Thread thread; private String threadName; public ConsumerThread(String threadName2) { this.threadName = threadName2; } public void start() { if (thread == null) { thread = new Thread(this, threadName); } thread.start(); System.out.println( (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); } @Override public void run() { int rows = 0; // 生產者線程沒有結束,或者消息隊列中有元素的時候,去隊列中取數據 while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) { try { //在甘肅電信的實際應用中發現,當數據的處理量達到千萬級的時候,帶參數的poll會將主機的幾百個G的內存耗盡,jvm會提示申請內存失敗,並將進程退出。網上說,這是這個方法的一個bug。 //String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS); String string = linkedBlockingQeque.poll(); if (string == null) { continue; } rows++; System.out .println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is " + string + "\t" + Thread.currentThread().getName()); /* * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch * (InterruptedException e) { e.printStackTrace(); } */ } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t" + Thread.currentThread().getName()); } } public static synchronized void setDone(boolean flag) { LinkedBlockingQueueDemo.done = flag; } public static synchronized boolean getDone() { return LinkedBlockingQueueDemo.done; } public static void main(String[] args) { System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName()); System.out.println( (new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode()); // 啓動若干生產者線程 for (int i = 0; i < providerThreadAmount; i++) { String threadName = String.format("%s-%d", "ProviderThread", i); ProviderThread providerThread = new ProviderThread(threadName, i); providerThread.start(); } // 啓動若干個消費者線程 for (int i = 0; i < 10; i++) { String threadName = String.format("%s-%d", "ConsumerThread", i); ConsumerThread consumerThread = new ConsumerThread(threadName); consumerThread.start(); } // 循環檢測生產者線程是否處理完畢 do { for (boolean b : providerDoneFlag) { if (b == false) { /* * try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) + * " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque. * size() + "\t" + Thread.currentThread().getName()); } catch * (InterruptedException e) { e.printStackTrace(); } */ // 只要有一個生產者線程沒有結束,則整個生產者線程檢測認爲沒有結束 break; } LinkedBlockingQueueDemo.setDone(true); } // 生產者線程所有結束的時候,跳出檢測 if (LinkedBlockingQueueDemo.getDone() == true) { break; } } while (true); System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName()); } }
結果略。dom