在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。web
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。算法
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。數據庫
這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程當中,若是先找到這個模式的第三者,能幫助咱們快速熟悉一個設計模式。編程
我和同事一塊兒利用業餘時間開發的Yuna工具中使用了生產者和消費者模式。首先我先介紹下Yuna工具,在阿里巴巴不少同事都喜歡經過郵件分享技術文章,由於經過郵件分享很方便,同窗們在網上看到好的技術文章,複製粘貼發送就完成了分享,可是咱們發現技術文章不能沉澱下來,對於新來的同窗看不到之前分享的技術文章,你們也很難找到之前分享過的技術文章。爲了解決這問題,咱們開發了Yuna工具。Yuna取名自我喜歡的一款遊戲最終幻想裏的女主角。設計模式
首先咱們申請了一個專門用來收集分享郵件的郵箱,好比share@alibaba.com,同窗將分享的文章發送到這個郵箱,讓同窗們每次都抄送到這個郵箱確定很麻煩,因此咱們的作法是將這個郵箱地址放在部門郵件列表裏,因此分享的同窗只須要象之前同樣向整個部門分享文章就行,Yuna工具經過讀取郵件服務器裏該郵箱的郵件,把全部分享的郵件下載下來,包括郵件的附件,圖片,和郵件回覆,咱們可能會從這個郵箱裏下載到一些非分享的文章,因此咱們要求分享的郵件標題必須帶有一個關鍵字,好比[內貿技術分享],下載完郵件以後,經過confluence的web service接口,把文章插入到confluence裏,這樣新同事就能夠在confluence裏看之前分享過的文章,而且Yuna工具還能夠自動把文章進行分類和歸檔。服務器
爲了快速上線該功能,當時咱們花了三天業餘時間快速開發了Yuna1.0版本。在1.0版本中我並無使用生產者消費模式,而是使用單線程來處理,由於當時只須要處理咱們一個部門的郵件,因此單線程明顯夠用,整個過程是串行執行的。在一個線程裏,程序先抽取所有的郵件,轉化爲文章對象,而後添加所有的文章,最後刪除抽取過的郵件。代碼以下:多線程
public void extract() { logger.debug("開始" + getExtractorName() + "。。"); //抽取郵件 List<Article> articles = extractEmail(); //添加文章 for (Article article : articles) { addArticleOrComment(article); } //清空郵件 cleanEmail(); logger.debug("完成" + getExtractorName() + "。。"); }
Yuna工具在推廣後,愈來愈多的部門使用這個工具,處理的時間愈來愈慢,Yuna是每隔5分鐘進行一次抽取的,而當郵件多的時候一次處理可能就花了幾分鐘,因而我在Yuna2.0版本里使用了生產者消費者模式來處理郵件,首先生產者線程按必定的規則去郵件系統裏抽取郵件,而後存放在阻塞隊列裏,消費者從阻塞隊列裏取出文章後插入到conflunce裏。代碼以下:併發
public class QuickEmailToWikiExtractor extends AbstractExtractor { private ThreadPoolExecutor threadsPool; private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue; public QuickEmailToWikiExtractor() { emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>(); int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000)); } public void extract() { logger.debug("開始" + getExtractorName() + "。。"); long start = System.currentTimeMillis(); //抽取全部郵件放到隊列裏 new ExtractEmailTask().start(); // 把隊列裏的文章插入到Wiki insertToWiki(); long end = System.currentTimeMillis(); double cost = (end - start) / 1000; logger.debug("完成" + getExtractorName() + ",花費時間:" + cost + "秒"); } /** * 把隊列裏的文章插入到Wiki */ private void insertToWiki() { //登陸wiki,每間隔一段時間須要登陸一次 confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD); while (true) { //2秒內取不到就退出 ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS); if (email == null) { break; } threadsPool.submit(new insertToWikiTask(email)); } } protected List<Article> extractEmail() { List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails(); if (allEmails == null) { return null; } for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) { emailQueue.offer(exchangeEmailShallowDTO); } return null; } /** * 抽取郵件任務 * * @author tengfei.fangtf */ public class ExtractEmailTask extends Thread { public void run() { extractEmail(); } } }
在多核時代,多線程併發處理速度比單線程處理速度更快,因此咱們可使用多個線程來生產數據,一樣可使用多個消費線程來消費數據。而更復雜的狀況是,消費者消費的數據,有可能須要繼續處理,因而消費者處理完數據以後,它又要做爲生產者把數據放在新的隊列裏,交給其餘消費者繼續處理。以下圖:ide
咱們在一個長鏈接服務器中使用了這種模式,生產者1負責將全部客戶端發送的消息存放在阻塞隊列1裏,消費者1從隊列裏讀消息,而後經過消息ID進行hash獲得N個隊列中的一個,而後根據編號將消息存放在到不一樣的隊列裏,每一個阻塞隊列會分配一個線程來消費阻塞隊列裏的數據。若是消費者2沒法消費消息,就將消息再拋回到阻塞隊列1中,交給其餘消費者處理。工具
如下是消息總隊列的代碼;
/** * 總消息隊列管理 * * @author tengfei.fangtf */ public class MsgQueueManager implements IMsgQueue{ private static final Logger LOGGER = LoggerFactory.getLogger(MsgQueueManager.class); /** * 消息總隊列 */ public final BlockingQueue<Message> messageQueue; private MsgQueueManager() { messageQueue = new LinkedTransferQueue<Message>(); } public void put(Message msg) { try { messageQueue.put(msg); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public Message take() { try { return messageQueue.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return null; } }
啓動一個消息分發線程。在這個線程裏子隊列自動去總隊列裏獲取消息。
/** * 分發消息,負責把消息從大隊列塞到小隊列裏 * * @author tengfei.fangtf */ static class DispatchMessageTask implements Runnable { @Override public void run() { BlockingQueue<Message> subQueue; for (;;) { //若是沒有數據,則阻塞在這裏 Message msg = MsgQueueFactory.getMessageQueue().take(); //若是爲空,則表示沒有Session機器鏈接上來, 須要等待,直到有Session機器鏈接上來 while ((subQueue = getInstance().getSubQueue()) == null) { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } //把消息放到小隊列裏 try { subQueue.put(msg); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
使用Hash算法獲取一個子隊列。
/** * 均衡獲取一個子隊列。 * * @return */ public BlockingQueue<Message> getSubQueue() { int errorCount = 0; for (;;) { if (subMsgQueues.isEmpty()) { return null; } int index = (int) (System.nanoTime() % subMsgQueues.size()); try { return subMsgQueues.get(index); } catch (Exception e) { //出現錯誤表示,在獲取隊列大小以後,隊列進行了一次刪除操做 LOGGER.error("獲取子隊列出現錯誤", e); if ((++errorCount) < 3) { continue; } } } }
使用的時候咱們只須要往總隊列裏發消息。
//往消息隊列裏添加一條消息 IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue(); Packet msg = Packet.createPacket(Packet64FrameType. TYPE_DATA, "{}".getBytes(), (short) 1); messageQueue.put(msg);
本章講解了生產者消費者模式,並給出了實例。讀者能夠在平時的工做中思考下哪些場景可使用生產者消費者模式,我相信這種場景應該很是之多,特別是須要處理任務時間比較長的場景,好比上傳附件並處理,用戶把文件上傳到系統後,系統把文件丟到隊列裏,而後馬上返回告訴用戶上傳成功,最後消費者再去隊列裏取出文件處理。好比調用一個遠程接口查詢數據,若是遠程服務接口查詢時須要幾十秒的時間,那麼它能夠提供一個申請查詢的接口,這個接口把要申請查詢任務放數據庫中,而後該接口馬上返回。而後服務器端用線程輪詢並獲取申請任務進行處理,處理完以後發消息給調用方,讓調用方再來調用另一個接口拿數據。
另外Java中的線程池類其實就是一種生產者和消費者模式的實現方式,可是實現方法更高明。生產者把任務丟給線程池,線程池建立線程並處理任務,若是將要運行的任務數大於線程池的基本線程數就把任務扔到阻塞隊列裏,這種作法比只使用一個阻塞隊列來實現生產者和消費者模式顯然要高明不少,由於消費者可以處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。
咱們的系統也可使用線程池來實現多生產者消費者模式。好比建立N個不一樣規模的Java線程池來處理不一樣性質的任務,好比線程池1將數據讀到內存以後,交給線程池2裏的線程繼續處理壓縮數據。線程池1主要處理IO密集型任務,線程池2主要處理CPU密集型任務。