生產者和消費者模式應用於異步處理場景,異步處理的好處是生產者和消費者解耦,不互相依賴,生產者不須要等待消費者處理完,就能夠持續生產消費內容,效率大大提升。java
生產者和消費者代碼類結構以下:編程
2.Producer是生產者,在這裏是一個抽象類,子類須要實現generateTask方法。設計模式
3.Consumer是消費者,在這裏是一個抽象類,子類須要實現exec方法。緩存
4.這裏的Producer和Consumer只是一個抽象後的代碼模板,邏輯比較簡單,落地時可根據實際須要編寫合適的模板。安全
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * @ClassName BlockedQueue * @Description 阻塞任務隊列,添加任務時若是已經達到容量上限,則會阻塞等待 * @Author 鏗然一葉 * @Date 2019/10/5 11:32 * @Version 1.0 * javashizhan.com **/
public class BlockedQueue<T>{
//鎖
private final Lock lock = new ReentrantLock();
// 條件變量:隊列不滿
private final Condition notFull = lock.newCondition();
// 條件變量:隊列不空
private final Condition notEmpty = lock.newCondition();
//任務集合
private Vector<T> taskQueue = new Vector<T>();
//隊列容量
private final int capacity;
/** * 構造器 * @param capacity 隊列容量 */
public BlockedQueue(int capacity) {
this.capacity = capacity;
}
/** * 入隊操做 * @param t */
public void enq(T t) {
lock.lock();
try {
System.out.println("size: " + taskQueue.size() + " capacity: " + capacity);
while (taskQueue.size() == this.capacity) {
// 隊列滿了以後等待,等待隊列不滿
notFull.await();
}
System.out.println(Thread.currentThread().getName() + " add task: " + t.toString());
taskQueue.add(t);
// 入隊後, 通知隊列不空了,能夠出隊
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/** * 出隊操做 * @return */
public T deq(){
lock.lock();
try {
try {
while (taskQueue.size() == 0) {
// 隊列爲空時等待,等待隊列不空
notEmpty.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
T t = taskQueue.remove(0);
// 出隊後,通知隊列不滿,能夠繼續入隊
notFull.signal();
return t;
}finally {
lock.unlock();
}
}
}
複製代碼
/** * @ClassName Producer * @Description 生產者,這個類比較簡單,使用繼承也省不了多少代碼,可繼承,也能夠自行實現。 * @Author 鏗然一葉 * @Date 2019/10/5 11:19 * @Version 1.0 * javashizhan.com **/
public abstract class Producer<T> implements Runnable {
private BlockedQueue<T> taskQueue;
public Producer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
while(true) {
T[] tasks = generateTask();
if (null != tasks && tasks.length > 0) {
for(T task: tasks) {
if (null != task) {
this.taskQueue.enq(task);
}
}
}
}
}
/** * 生成任務,使用了「模板方法」設計模式,子類只要實現此方法則可。 * @return */
public abstract T[] generateTask();
}
複製代碼
/** * @ClassName Consumer * @Description 消費者,這個類比較簡單,使用繼承也省不了多少代碼,可繼承,也能夠自行實現。 * @Author 鏗然一葉 * @Date 2019/10/5 11:10 * @Version 1.0 * javashizhan.com **/
public abstract class Consumer<T> implements Runnable {
private BlockedQueue<T> taskQueue;
public Consumer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
while(true) {
T task = taskQueue.deq();
exec(task);
}
}
/** * 執行任務,使用了「模板方法」設計模式,子類只要實現此方法則可 * @param task */
public abstract void exec(T task);
}
複製代碼
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * @ClassName BlockedQueue * @Description TODO * @Author 鏗然一葉 * @Date 2019/10/5 9:13 * @Version 1.0 * javashizhan.com **/
public class LockTest {
public static void main(String[] args) {
BlockedQueue<String> taskQueue = new BlockedQueue<String>(10);
for (int i = 0; i < 3; i++) {
String producerName = "Producder-" + i;
Thread producer = new Thread(new Producer<String>(taskQueue) {
@Override
public String[] generateTask() {
String[] tasks = new String[20];
for (int i = 0; i < tasks.length; i++) {
long timestamp = System.currentTimeMillis();
tasks[i] = "Task_" + timestamp + "_" + i;
}
return tasks;
}
}, producerName);
producer.start();
}
for (int i = 0; i < 5; i++) {
String consumerName = "Consumer-" + i;
Thread consumer = new Thread(new Consumer<String>(taskQueue) {
@Override
public void exec(String task) {
System.out.println(Thread.currentThread().getName() + " do task [" + task + "]");
//休眠一會,模擬任務執行耗時
sleep(2000);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, consumerName);
consumer.start();
}
}
}
複製代碼
輸出日誌:bash
size: 0 capacity: 10
Producder-1 add task: Task_1570250409102_0
size: 1 capacity: 10
Producder-1 add task: Task_1570250409103_1
size: 2 capacity: 10
Producder-1 add task: Task_1570250409103_2
size: 3 capacity: 10
Producder-1 add task: Task_1570250409103_3
size: 4 capacity: 10
Producder-1 add task: Task_1570250409103_4
size: 5 capacity: 10
Producder-1 add task: Task_1570250409103_5
size: 6 capacity: 10
Producder-1 add task: Task_1570250409103_6
size: 7 capacity: 10
Producder-1 add task: Task_1570250409103_7
size: 8 capacity: 10
Producder-1 add task: Task_1570250409103_8
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_9
size: 10 capacity: 10
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-0 do task [Task_1570250409102_0]
Consumer-4 do task [Task_1570250409103_1]
Consumer-3 do task [Task_1570250409103_2]
Producder-1 add task: Task_1570250409103_10
Consumer-1 do task [Task_1570250409103_3]
Producder-0 add task: Task_1570250409102_0
size: 8 capacity: 10
Producder-0 add task: Task_1570250409103_1
size: 9 capacity: 10
Producder-0 add task: Task_1570250409103_2
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_4]
Producder-0 add task: Task_1570250409103_3
size: 10 capacity: 10
Consumer-3 do task [Task_1570250409103_6]
Producder-2 add task: Task_1570250409103_0
Consumer-1 do task [Task_1570250409103_5]
size: 9 capacity: 10
Producder-2 add task: Task_1570250409103_1
size: 10 capacity: 10
Consumer-4 do task [Task_1570250409103_7]
Consumer-0 do task [Task_1570250409103_8]
Producder-1 add task: Task_1570250409103_11
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_12
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_9]
Producder-1 add task: Task_1570250409103_13
size: 10 capacity: 10
複製代碼
1.這裏用到了Lock來加鎖,Lock相比synchronized關鍵字加鎖更靈活一些,若是有特殊須要,方便改造。併發
2.synchronized實現生產者和消費者模式的例子可參考「Java併發編程入門(七)輕鬆理解wait和notify以及使用場景」,那個代碼還不夠通用,你能夠修改得通用一些。異步
3.就當前這個例子而言,使用Lock加鎖和「Java併發編程入門(七)輕鬆理解wait和notify以及使用場景」中使用synchronized加鎖沒有多大區別,這裏僅僅是爲了體會下Lock的使用方法。ide
4.使用有界阻塞隊列時須要注意生產者生產任務過程是否可控,若是是第三方不可控調用,當生產任務速度遠遠大於消費者處理任務速度時,可能因爲阻塞致使長時間掛起,要麼掛起時間過長,致使等待線程太多,要麼超時失敗。這時就不適合使用阻塞方式,應該在隊列滿時拋出異常以通知調用方不要再等待。post
end.
相關閱讀:
Java併發編程(一)知識地圖
Java併發編程(二)原子性
Java併發編程(三)可見性
Java併發編程(四)有序性
Java併發編程(五)建立線程方式概覽
Java併發編程入門(六)synchronized用法
Java併發編程入門(七)輕鬆理解wait和notify以及使用場景
Java併發編程入門(八)線程生命週期
Java併發編程入門(九)死鎖和死鎖定位
Java併發編程入門(十)鎖優化
Java併發編程入門(十一)限流場景和Spring限流器實現
Java併發編程入門(十三)讀寫鎖和緩存模板
Java併發編程入門(十四)CountDownLatch應用場景
Java併發編程入門(十五)CyclicBarrier應用場景
Java併發編程入門(十六)秒懂線程池差異
Java併發編程入門(十七)一圖掌握線程經常使用類和接口
Java併發編程入門(十八)再論線程安全
Java極客站點: javageektour.com/