項目裏碰到一個不錯的消息處理框架,把核心代碼抽出來作個備註。核心源碼分幾塊。java
一、一個消息管理類,在hashmap裏建立了多個阻塞式消息隊列緩存
package com.fredric.demo; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class QueueManager { private Map map; public QueueManager(){ map = new HashMap(); //建立兩個阻塞式隊列 map.put("queue_type1", new ArrayBlockingQueue(100)); map.put("queue_type2", new ArrayBlockingQueue(100)); } public boolean put(String queueType, Map data){ BlockingQueue queue = (BlockingQueue) map.get(queueType); if(null == queue){ throw new RuntimeException(); }else{ try { queue.put(data); return true; } catch (InterruptedException e) { e.printStackTrace(); } return false; } } public Map take(String queueType){ BlockingQueue queue = (BlockingQueue) map.get(queueType); if(null == queue){ throw new RuntimeException(); }else{ try { Map map = (Map) queue.take(); return map; } catch (InterruptedException e) { e.printStackTrace(); } return null; } } }
二、構築消息的處理,即消息的消費者。這裏要注意的是消息具體的處理業務是單獨再起線程的,以下:框架
package com.fredric.demo; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Type1Consumer implements Runnable{ private ExecutorService executorService; public Type1Consumer(){ //建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程 executorService = Executors.newCachedThreadPool(); } public void run() { do{ final Map msg = MsgProducer.queueManager.take("queue_type1"); if(msg != null){ executorService.execute(new Runnable() { public void run() { System.out.println("queue_type1 do msg handler" + msg.toString()); } }); } }while(true); } }
三、消息的生產者,即往消息隊列管理類中插入數據測試
package com.fredric.demo; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; public class MsgProducer { public static QueueManager queueManager = new QueueManager(); public void initJobQueue(){ //建立一個單線程化的線程池 Executors.newSingleThreadExecutor().execute(new Type1Consumer()); Executors.newSingleThreadExecutor().execute(new Type2Consumer()); } public void sendType1Msg(String data){ Map map = new HashMap<String, String>(); map.put("time", new Date().toString()); map.put("data", data); queueManager.put("queue_type1", map); } public void sendType2Msg(String data){ Map map = new HashMap<String, String>(); map.put("time", new Date().toString()); map.put("data", data); queueManager.put("queue_type2", map); } }
四、Main方法測試以下:spa
public class App { public static void main(String[] args){ MsgProducer producer = new MsgProducer(); producer.initJobQueue(); for(int i = 0; i < 10; i++){ producer.sendType1Msg("type1_msg"); producer.sendType2Msg("type2_msg"); } } }