自定義消息隊列

  傳統的網統統信,通常是請求---響應式,以TCP模式爲例,在高併發狀況下,每每伴隨大量的客戶端Sokcet請求,服務器要不斷處理來自客戶端的請求,ServerSocket要不斷產生新的子線程去響應客戶端的請求,會給服務器帶來很大的訪問壓力。java

  在這種狀況下,消息隊列可謂爲咱們提供了一種新的思路。隊列是數據結構中的一種線性表,隊列中存儲的元素遵照FIFO(First In First Out,先進先出)的規則,這使得隊列中的元素是有序的。咱們能夠將隊列中的元素入隊與出隊視爲生產和消費,此外,再結合發佈訂閱模式(監聽模式),就能夠經過隊列在不一樣的線程之間進行通訊。服務器

  我在這裏爲你們提供一個簡單的自定義消息隊列,謹供參考。數據結構

package com.itszt.mq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 一個消息隊列監聽器,只要生產者生產出消息並推入隊列,就會通知處理器執行消費操做
 */
public class PushBlockQueue extends LinkedBlockingQueue<Object> {
    //多線程執行,採用線程池
    private static ExecutorService es = Executors.newFixedThreadPool(10);
    //單例中的餓漢模式,實例化一個隊列單例
    private static PushBlockQueue pbq = new PushBlockQueue();
    //狀態標識位
    private boolean flag = false;

    private PushBlockQueue() {
    }

    public static PushBlockQueue getInstance() {
        return pbq;
    }

    /**
     * 隊列監聽啓動
     */
    public void start() {
        if (!this.flag) {
            flag = true;
        } else {
            throw new IllegalArgumentException("隊列已啓動,不可重複啓動!");
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (flag) {
                        //從隊列中取消息
                        Object obj = take();
                        //線程池派出線程來消費取出的消息
                        es.execute(new PushBlockQueueHandler(obj));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 中止隊列監聽
     */
    public void stop(){
        this.flag=false;
    }
}
----------------------------------------------------
package com.itszt.mq;

/**
 *至關於隊列消息的消費者
 */
public class PushBlockQueueHandler implements Runnable{
    //消費的對象
    private Object obj;

    public PushBlockQueueHandler(Object obj){
        this.obj=obj;
    }
    //消費線程
    @Override
    public void run() {
        doBusiness();
    }
    //消費行爲
    private void doBusiness() {
        System.out.println(Thread.currentThread().getName()+"-收到消息:"+obj);
    }
}
-----------------------------------------------
package com.itszt.mq;

import java.util.Scanner;

/**
 * 自定義消息隊列測試類
 */
public class MQTest {
    public static void main(String[] args) {
        //獲取消息隊列的單例,並啓動隊列監聽器
        PushBlockQueue.getInstance().start();
        //循環向隊列寫入數據
        /**
         * 生產者----生產消息----》入隊列----監聽器----通知消費者---》消費
         */
        Scanner sc=new Scanner(System.in);
        try {
            while (true){
                String content = sc.nextLine();
                if(content.trim().equals("stop")){
                    System.exit(1);
                }
                PushBlockQueue.getInstance().put(content);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   啓動程序,你(主線程)就能夠在控制檯裏經過消息隊列向其餘線程發佈消息了!多線程

中國夢,你們的夢!
pool-1-thread-1-收到消息:中國夢,你們的夢!
爲中華民族的偉大復興而不懈奮鬥!
pool-1-thread-2-收到消息:爲中華民族的偉大復興而不懈奮鬥! 
相關文章
相關標籤/搜索