一個簡單的消息處理框架

項目裏碰到一個不錯的消息處理框架,把核心代碼抽出來作個備註。核心源碼分幾塊。java

一、一個消息管理類,在hashmap裏建立了多個阻塞式消息隊列緩存

package com.fredric.demo;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class QueueManager {

    private Map map;

    public QueueManager(){

        map = new HashMap();

        //建立兩個阻塞式隊列
        map.put("queue_type1", new ArrayBlockingQueue(100));
        map.put("queue_type2", new ArrayBlockingQueue(100));
    }


    public boolean put(String queueType, Map data){
        BlockingQueue queue = (BlockingQueue) map.get(queueType);
        if(null == queue){
            throw new RuntimeException();
        }else{
            try {
                queue.put(data);
                return true;

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return false;
        }
    }

    public Map take(String queueType){
        BlockingQueue queue = (BlockingQueue) map.get(queueType);
        if(null == queue){
            throw new RuntimeException();
        }else{
            try {
                Map map = (Map) queue.take();
                return map;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return null;
        }
    }
}

二、構築消息的處理,即消息的消費者。這裏要注意的是消息具體的處理業務是單獨再起線程的,以下:框架

package com.fredric.demo;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Type1Consumer implements Runnable{

    private ExecutorService executorService;

    public Type1Consumer(){

        //建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程
        executorService = Executors.newCachedThreadPool();
    }


    public void run() {
        do{

            final Map msg = MsgProducer.queueManager.take("queue_type1");

            if(msg != null){
                executorService.execute(new Runnable() {
                    public void run() {
                        System.out.println("queue_type1 do msg handler" + msg.toString());
                    }
                });
            }

        }while(true);
    }
}

三、消息的生產者,即往消息隊列管理類中插入數據測試

package com.fredric.demo;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;

public class MsgProducer {

    public static QueueManager queueManager = new QueueManager();


    public void initJobQueue(){
        //建立一個單線程化的線程池
        Executors.newSingleThreadExecutor().execute(new Type1Consumer());
        Executors.newSingleThreadExecutor().execute(new Type2Consumer());
    }


    public void sendType1Msg(String data){
        Map map = new HashMap<String, String>();
        map.put("time", new Date().toString());
        map.put("data", data);
        queueManager.put("queue_type1", map);
    }

    public void sendType2Msg(String data){
        Map map = new HashMap<String, String>();
        map.put("time", new Date().toString());
        map.put("data", data);
        queueManager.put("queue_type2", map);
    }
}

四、Main方法測試以下:spa

public class App {

    public static void main(String[] args){

        MsgProducer producer = new MsgProducer();
        producer.initJobQueue();

        for(int i = 0; i < 10; i++){
            producer.sendType1Msg("type1_msg");
            producer.sendType2Msg("type2_msg");
        }
    }
}
相關文章
相關標籤/搜索