在本人的上一篇博客文章:Netty構建分佈式消息隊列(AvatarMQ)設計指南之架構篇 中,重點向你們介紹了AvatarMQ主要構成模塊以及目前存在的優缺點。最後以一個生產者、消費者傳遞消息的例子,具體演示了AvatarMQ所具有的基本消息路由功能。而本文的寫做目的,是想從開發、設計的角度,簡單的對如何使用Netty,構建分佈式消息隊列背後的技術細節、原理,進行一下簡單的分析和說明。html
首先,在一個企業級的架構應用中,究竟什麼時候需引入消息隊列呢?本人認爲,最常常的狀況,無非這幾種:作業務解耦、事件消息廣播、消息流控處理。其中,對於業務解耦是做爲消息隊列,要解決的一個首要問題。所謂業務解耦,就是說在一個業務流程處理上,只關注具體的流程,盡到通知的責任便可,沒必要等待消息處理的結果。java
總得來看,企業級系統模塊通訊的方式一般狀況下,無非兩種。git
同步方式:REST、RPC方式實現;異步方式:消息中間件(消息隊列)方式實現。github
同步方式的優勢:能夠基於http協議之上,無需中間件代理,系統架構相對而言比較簡單。缺點是:客戶端和服務端緊密耦合,而且要實時在線通訊,不然會致使消息發送失敗。算法
異步方式的優勢:客戶端和服務端互相解耦,雙方能夠不產生依賴。缺點是:因爲引入了消息中間件,在編程的時候會增長難度係數。此外,消息中間件的可靠性、容錯性、健壯性每每成爲這類架構的決定性因素。spring
舉一個本人工做中的例子向你們說明一下:移動業務中的產品訂購中心,每當一個用戶經過某些渠道(營業廳、自助終端等等)開通、訂購了某個套餐以後,若是這些套餐涉及第三方平臺派單的話,產品訂購中心會向第三方平臺發起訂購請求操做。試想一下,若是遇到高峯受理時間段,因爲業務受理量的激增,致使一些外圍系統的響應速度下降(好比業務網關響應速度不及時、網絡延時等等緣由),最終用戶開通一個套餐花在主流程的時間會延長不少,這個會形成極很差的用戶體驗,最終可能致使受理失敗。在上述的場景裏面,咱們就能夠很好的引入一個消息隊列進行業務的解耦,具體來講,產品訂購中心只要「通知」第三方平臺,咱們的套餐開通成功了,並不必定非要同步阻塞地等待其真正的開通處理完成。正由於如此,消息隊列逐漸成爲當下系統模塊通訊的主要方式手段。apache
當今在Java的消息隊列通訊領域,有不少主流的消息中間件,好比RabbitMQ、ActiveMQ、以及煊赫一時Kafka。其中ActiveMQ是基於JMS的標準之上開發定製的一套消息隊列系統,性能穩定,訪問接口也很是友好,可是這類的消息隊列在訪問吞吐量上有所折扣;另一個方面,好比Kafka這樣,以高效吞吐量著稱的消息隊列系統,可是在穩定性和可靠性上,能力彷佛還不夠,所以更多的是用在服務日誌傳輸、短消息推送等等對於可靠性不高的業務場景之中。總結起來,不論是ActiveMQ仍是Kafka,其框架的背後涉及到不少異步網絡通訊、多線程、高併發處理方面的專業技術知識。但本文的重點,也不在於介紹這些消息中間件背後的技術細節,而是想重點闡述一下,如何透過上述消息隊列的基本原理,在必要的時候,開發定製一套符合自身業務要求的消息隊列系統時,可以得到更加全面的視角去設計、考量這些問題。編程
所以本人用心開發實現了一個,基於Netty的消息隊列系統:AvatarMQ。固然,在設計、實現AvatarMQ的時候,我會適當參考這些成熟消息中間件中用到的不少重要的思想理念。緩存
當各位從github上面下載到AvatarMQ的源代碼的時候,能夠發現,其中的包結構以下所示:服務器
如今對每一個包的主要功能進行一下簡要說明(下面省略前綴com.newlandframework.avatarmq)。
broker:消息中間件的服務器模塊,主要負責消息的路由、負載均衡,對於生產者、消費者進行消息的應答回覆處理(ACK),AvatarMQ中的中心節點,是鏈接生產者、消費者的橋樑紐帶。
consumer:消息中間件中的消費者模塊,負責接收生產者過來的消息,在設計的時候,會對消費者進行一個集羣化管理,同一個集羣標識的消費者,會構成一個大的消費者集羣,做爲一個總體,接收生產者投遞過來的消息。此外,還提供消費者接收消息相關的API給客戶端進行調用。
producer:消息中間件中的生產者模塊,負責生產特定主題(Topic)的消息,傳遞給對此主題感興趣的消費者,同時提供生產者生產消息的API接口,給客戶端使用。
core:AvatarMQ中消息處理的核心模塊,負責消息的內存存儲、應答控制、對消息進行多線程任務分派處理。
model:主要定義了AvatarMQ中的數據模型對象,好比MessageType消息類型、MessageSource消息源頭等等模型對象的定義。
msg:主要定義了具體的消息類型對應的結構模型,好比消費者訂閱消息SubscribeMessage、消費者取消訂閱消息UnSubscribeMessage,消息服務器應答給生產者的應答消息ProducerAckMessage、消息服務器應答給消費者的應答消息ConsumerAckMessage。
netty:主要封裝了Netty網絡通訊相關的核心模塊代碼,好比訂閱消息事件的路由分派策略、消息的編碼、解碼器等等。
serialize:利用Kryo這個優秀高效的對象序列化、反序列框架對消息對象進行序列化網絡傳輸。
spring:Spring的容器管理類,負責把AvatarMQ中的消息服務器模塊:Broker,進行容器化管理。這個包裏面的AvatarMQServerStartup是整個AvatarMQ消息服務器的啓動入口。
test:這個就不用多說了,就是針對AvatarMQ進行消息路由傳遞的測試demo。
AvatarMQ運行原理示意圖:
首先是消息生產者客戶端(AvatarMQ Producer)發送帶有主題的消息給消息轉發服務器(AvatarMQ Broker),消息轉發服務器確認收到生產者的消息,發送ACK應答給生產者,而後把消息繼續投遞給消費者(AvatarMQ Consumer)。同時broker服務器接收來自消費者的訂閱、取消訂閱消息,併發送ACK應該給對應的消費者,整個消息系統就是這樣周而復始的工做。
如今再來看一下,AvatarMQ中的核心模塊的組成,以下圖所示:
Producer Manage:消息的生產者,其主要代碼在(com.newlandframework.avatarmq.producer)包之下,其主要代碼模塊關鍵部分簡要說明以下:
package com.newlandframework.avatarmq.producer; import com.newlandframework.avatarmq.core.AvatarMQAction; import com.newlandframework.avatarmq.model.MessageSource; import com.newlandframework.avatarmq.model.MessageType; import com.newlandframework.avatarmq.model.RequestMessage; import com.newlandframework.avatarmq.model.ResponseMessage; import com.newlandframework.avatarmq.msg.Message; import com.newlandframework.avatarmq.msg.ProducerAckMessage; import com.newlandframework.avatarmq.netty.MessageProcessor; import java.util.concurrent.atomic.AtomicLong; /** * @filename:AvatarMQProducer.java * @description:AvatarMQProducer功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQProducer extends MessageProcessor implements AvatarMQAction { private boolean brokerConnect = false; private boolean running = false; private String brokerServerAddress; private String topic; private String defaultClusterId = "AvatarMQProducerClusters"; private String clusterId = ""; private AtomicLong msgId = new AtomicLong(0L); //鏈接消息轉發服務器broker的ip地址,以及生產出來消息附帶的主題信息 public AvatarMQProducer(String brokerServerAddress, String topic) { super(brokerServerAddress); this.brokerServerAddress = brokerServerAddress; this.topic = topic; } //沒有鏈接上消息轉發服務器broker就發送的話,直接應答失敗 private ProducerAckMessage checkMode() { if (!brokerConnect) { ProducerAckMessage ack = new ProducerAckMessage(); ack.setStatus(ProducerAckMessage.FAIL); return ack; } return null; } //啓動消息生產者 public void start() { super.getMessageConnectFactory().connect(); brokerConnect = true; running = true; } //鏈接消息轉發服務器broker,設定生產者消息處理鉤子,用於處理broker過來的消息應答 public void init() { ProducerHookMessageEvent hook = new ProducerHookMessageEvent(); hook.setBrokerConnect(brokerConnect); hook.setRunning(running); super.getMessageConnectFactory().setMessageHandle(new MessageProducerHandler(this, hook)); } //投遞消息API public ProducerAckMessage delivery(Message message) { if (!running || !brokerConnect) { return checkMode(); } message.setTopic(topic); message.setTimeStamp(System.currentTimeMillis()); RequestMessage request = new RequestMessage(); request.setMsgId(String.valueOf(msgId.incrementAndGet())); request.setMsgParams(message); request.setMsgType(MessageType.AvatarMQMessage); request.setMsgSource(MessageSource.AvatarMQProducer); message.setMsgId(request.getMsgId()); ResponseMessage response = (ResponseMessage) sendAsynMessage(request); if (response == null) { ProducerAckMessage ack = new ProducerAckMessage(); ack.setStatus(ProducerAckMessage.FAIL); return ack; } ProducerAckMessage result = (ProducerAckMessage) response.getMsgParams(); return result; } //關閉消息生產者 public void shutdown() { if (running) { running = false; super.getMessageConnectFactory().close(); super.closeMessageConnectFactory(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } }
Consumer Clusters Manage / Message Routing:消息的消費者集羣管理以及消息路由模塊,其主要模塊在包(com.newlandframework.avatarmq.consumer)之中。其中消息消費者對象,對應的核心代碼主要功能描述以下:
package com.newlandframework.avatarmq.consumer; import com.google.common.base.Joiner; import com.newlandframework.avatarmq.core.AvatarMQAction; import com.newlandframework.avatarmq.core.MessageIdGenerator; import com.newlandframework.avatarmq.core.MessageSystemConfig; import com.newlandframework.avatarmq.model.MessageType; import com.newlandframework.avatarmq.model.RequestMessage; import com.newlandframework.avatarmq.msg.SubscribeMessage; import com.newlandframework.avatarmq.msg.UnSubscribeMessage; import com.newlandframework.avatarmq.netty.MessageProcessor; /** * @filename:AvatarMQConsumer.java * @description:AvatarMQConsumer功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class AvatarMQConsumer extends MessageProcessor implements AvatarMQAction { private ProducerMessageHook hook; private String brokerServerAddress; private String topic; private boolean subscribeMessage = false; private boolean running = false; private String defaultClusterId = "AvatarMQConsumerClusters"; private String clusterId = ""; private String consumerId = ""; //鏈接的消息服務器broker的ip地址以及關注的生產過來的消息鉤子 public AvatarMQConsumer(String brokerServerAddress, String topic, ProducerMessageHook hook) { super(brokerServerAddress); this.hook = hook; this.brokerServerAddress = brokerServerAddress; this.topic = topic; } //向消息服務器broker發送取消訂閱消息 private void unRegister() { RequestMessage request = new RequestMessage(); request.setMsgType(MessageType.AvatarMQUnsubscribe); request.setMsgId(new MessageIdGenerator().generate()); request.setMsgParams(new UnSubscribeMessage(consumerId)); sendSyncMessage(request); super.getMessageConnectFactory().close(); super.closeMessageConnectFactory(); running = false; } //向消息服務器broker發送訂閱消息 private void register() { RequestMessage request = new RequestMessage(); request.setMsgType(MessageType.AvatarMQSubscribe); request.setMsgId(new MessageIdGenerator().generate()); SubscribeMessage subscript = new SubscribeMessage(); subscript.setClusterId((clusterId.equals("") ? defaultClusterId : clusterId)); subscript.setTopic(topic); subscript.setConsumerId(consumerId); request.setMsgParams(subscript); sendAsynMessage(request); } public void init() { super.getMessageConnectFactory().setMessageHandle(new MessageConsumerHandler(this, new ConsumerHookMessageEvent(hook))); Joiner joiner = Joiner.on(MessageSystemConfig.MessageDelimiter).skipNulls(); consumerId = joiner.join((clusterId.equals("") ? defaultClusterId : clusterId), topic, new MessageIdGenerator().generate()); } //鏈接消息服務器broker public void start() { if (isSubscribeMessage()) { super.getMessageConnectFactory().connect(); register(); running = true; } } public void receiveMode() { setSubscribeMessage(true); } public void shutdown() { if (running) { unRegister(); } } public String getBrokerServerAddress() { return brokerServerAddress; } public void setBrokerServerAddress(String brokerServerAddress) { this.brokerServerAddress = brokerServerAddress; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public boolean isSubscribeMessage() { return subscribeMessage; } public void setSubscribeMessage(boolean subscribeMessage) { this.subscribeMessage = subscribeMessage; } public String getDefaultClusterId() { return defaultClusterId; } public void setDefaultClusterId(String defaultClusterId) { this.defaultClusterId = defaultClusterId; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } }
消息的集羣管理模塊,主要代碼是ConsumerContext.java、ConsumerClusters.java。先簡單說一下消費者集羣模塊ConsumerClusters,主要負責定義消費者集羣的行爲,以及負責消息的路由。主要的功能描述以下所示:
package com.newlandframework.avatarmq.consumer; import com.newlandframework.avatarmq.model.RemoteChannelData; import com.newlandframework.avatarmq.model.SubscriptionData; import com.newlandframework.avatarmq.netty.NettyUtil; import io.netty.channel.Channel; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.Predicate; /** * @filename:ConsumerClusters.java * @description:ConsumerClusters功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class ConsumerClusters { //輪詢調度(Round-Robin Scheduling)位置標記 private int next = 0; private final String clustersId; private final ConcurrentHashMap<String/*生產者消息的主題*/, SubscriptionData/*消息對應的topic信息數據結構*/> subMap = new ConcurrentHashMap<String, SubscriptionData>(); private final ConcurrentHashMap<String/*消費者標識編碼*/, RemoteChannelData/*對應的消費者的netty網絡通訊管道信息*/> channelMap = new ConcurrentHashMap<String, RemoteChannelData>(); private final List<RemoteChannelData> channelList = Collections.synchronizedList(new ArrayList<RemoteChannelData>()); public ConsumerClusters(String clustersId) { this.clustersId = clustersId; } public String getClustersId() { return clustersId; } public ConcurrentHashMap<String, SubscriptionData> getSubMap() { return subMap; } public ConcurrentHashMap<String, RemoteChannelData> getChannelMap() { return channelMap; } //添加一個消費者到消費者集羣 public void attachRemoteChannelData(String clientId, RemoteChannelData channelinfo) { if (findRemoteChannelData(channelinfo.getClientId()) == null) { channelMap.put(clientId, channelinfo); subMap.put(channelinfo.getSubcript().getTopic(), channelinfo.getSubcript()); channelList.add(channelinfo); } else { System.out.println("consumer clusters exists! it's clientId:" + clientId); } } //從消費者集羣中刪除一個消費者 public void detachRemoteChannelData(String clientId) { channelMap.remove(clientId); Predicate predicate = new Predicate() { public boolean evaluate(Object object) { String id = ((RemoteChannelData) object).getClientId(); return id.compareTo(clientId) == 0; } }; RemoteChannelData data = (RemoteChannelData) CollectionUtils.find(channelList, predicate); if (data != null) { channelList.remove(data); } } //根據消費者標識編碼,在消費者集羣中查找定位一個消費者,若是不存在返回null public RemoteChannelData findRemoteChannelData(String clientId) { return (RemoteChannelData) MapUtils.getObject(channelMap, clientId); } //負載均衡,根據鏈接到broker的順序,依次投遞消息給消費者。這裏的均衡算法直接採用 //輪詢調度(Round-Robin Scheduling),後續能夠加入:加權輪詢、隨機輪詢、哈希輪詢等等策略。 public RemoteChannelData nextRemoteChannelData() { Predicate predicate = new Predicate() { public boolean evaluate(Object object) { RemoteChannelData data = (RemoteChannelData) object; Channel channel = data.getChannel(); return NettyUtil.validateChannel(channel); } }; CollectionUtils.filter(channelList, predicate); return channelList.get(next++ % channelList.size()); } //根據生產者的主題關鍵字,定位於具體的消息結構 public SubscriptionData findSubscriptionData(String topic) { return this.subMap.get(topic); } }
而ConsumerContext主要的負責管理消費者集羣的,其主要核心代碼註釋說明以下:
package com.newlandframework.avatarmq.consumer; import com.newlandframework.avatarmq.model.RemoteChannelData; import com.newlandframework.avatarmq.model.SubscriptionData; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.iterators.FilterIterator; /** * @filename:ConsumerContext.java * @description:ConsumerContext功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class ConsumerContext { //消費者集羣關係定義 private static final CopyOnWriteArrayList<ClustersRelation> relationArray = new CopyOnWriteArrayList<ClustersRelation>(); //消費者集羣狀態 private static final CopyOnWriteArrayList<ClustersState> stateArray = new CopyOnWriteArrayList<ClustersState>(); public static void setClustersStat(String clusters, int stat) { stateArray.add(new ClustersState(clusters, stat)); } //根據消費者集羣編碼cluster_id獲取一個消費者集羣的狀態 public static int getClustersStat(String clusters) { Predicate predicate = new Predicate() { public boolean evaluate(Object object) { String clustersId = ((ClustersState) object).getClusters(); return clustersId.compareTo(clusters) == 0; } }; Iterator iterator = new FilterIterator(stateArray.iterator(), predicate); ClustersState state = null; while (iterator.hasNext()) { state = (ClustersState) iterator.next(); break; } return (state != null) ? state.getState() : 0; } //根據消費者集羣編碼cluster_id查找一個消費者集羣 public static ConsumerClusters selectByClusters(String clusters) { Predicate predicate = new Predicate() { public boolean evaluate(Object object) { String id = ((ClustersRelation) object).getId(); return id.compareTo(clusters) == 0; } }; Iterator iterator = new FilterIterator(relationArray.iterator(), predicate); ClustersRelation relation = null; while (iterator.hasNext()) { relation = (ClustersRelation) iterator.next(); break; } return (relation != null) ? relation.getClusters() : null; } //查找一下關注這個主題的消費者集羣集合 public static List<ConsumerClusters> selectByTopic(String topic) { List<ConsumerClusters> clusters = new ArrayList<ConsumerClusters>(); for (int i = 0; i < relationArray.size(); i++) { ConcurrentHashMap<String, SubscriptionData> subscriptionTable = relationArray.get(i).getClusters().getSubMap(); if (subscriptionTable.containsKey(topic)) { clusters.add(relationArray.get(i).getClusters()); } } return clusters; } //添加消費者集羣 public static void addClusters(String clusters, RemoteChannelData channelinfo) { ConsumerClusters manage = selectByClusters(clusters); if (manage == null) { ConsumerClusters newClusters = new ConsumerClusters(clusters); newClusters.attachRemoteChannelData(channelinfo.getClientId(), channelinfo); relationArray.add(new ClustersRelation(clusters, newClusters)); } else if (manage.findRemoteChannelData(channelinfo.getClientId()) != null) { manage.detachRemoteChannelData(channelinfo.getClientId()); manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo); } else { String topic = channelinfo.getSubcript().getTopic(); boolean touchChannel = manage.getSubMap().containsKey(topic); if (touchChannel) { manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo); } else { manage.getSubMap().clear(); manage.getChannelMap().clear(); manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo); } } } //從一個消費者集羣中刪除一個消費者 public static void unLoad(String clientId) { for (int i = 0; i < relationArray.size(); i++) { String id = relationArray.get(i).getId(); ConsumerClusters manage = relationArray.get(i).getClusters(); if (manage.findRemoteChannelData(clientId) != null) { manage.detachRemoteChannelData(clientId); } if (manage.getChannelMap().size() == 0) { ClustersRelation relation = new ClustersRelation(); relation.setId(id); relationArray.remove(id); } } } }
ACK Queue Dispatch:主要是broker分別向對應的消息生產者、消費者發送ACK消息應答,其主要核心模塊是在:com.newlandframework.avatarmq.broker包下面的AckPullMessageController和AckPushMessageController模塊,主要職責是在broker中收集生產者的消息,確認成功收到以後,把其放到消息隊列容器中,而後專門安排一個工做線程池把ACK應答發送給生產者。
Message Queue Dispatch:生產者消息的分派,主要是由com.newlandframework.avatarmq.broker包下面的SendMessageController派發模塊進行任務的分派,其中消息分派支持兩種策略,一種是內存緩衝消息區裏面只要一有消息就通知消費者;還有一種是對消息進行緩衝處理,累計到必定的數量以後進行派發,這個是根據:MessageSystemConfig類中的核心參數:SystemPropertySendMessageControllerTaskCommitValue(com.newlandframework.avatarmq.system.send.taskcommit)決定的,默認是1。即一有消息就派發,若是改爲大於1的數值,表示消息緩衝的數量。如今給出SendMessageController的核心實現代碼:
package com.newlandframework.avatarmq.broker; import com.newlandframework.avatarmq.core.SemaphoreCache; import com.newlandframework.avatarmq.core.MessageSystemConfig; import com.newlandframework.avatarmq.core.MessageTaskQueue; import com.newlandframework.avatarmq.core.SendMessageCache; import com.newlandframework.avatarmq.model.MessageDispatchTask; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; /** * @filename:SendMessageController.java * @description:SendMessageController功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class SendMessageController implements Callable<Void> { private volatile boolean stoped = false; private AtomicBoolean flushTask = new AtomicBoolean(false); private ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>> requestCacheList = new ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>>() { protected ConcurrentLinkedQueue<MessageDispatchTask> initialValue() { return new ConcurrentLinkedQueue<MessageDispatchTask>(); } }; private final Timer timer = new Timer("SendMessageTaskMonitor", true); public void stop() { stoped = true; } public boolean isStoped() { return stoped; } public Void call() { int period = MessageSystemConfig.SendMessageControllerPeriodTimeValue; int commitNumber = MessageSystemConfig.SendMessageControllerTaskCommitValue; int sleepTime = MessageSystemConfig.SendMessageControllerTaskSleepTimeValue; ConcurrentLinkedQueue<MessageDispatchTask> queue = requestCacheList.get(); SendMessageCache ref = SendMessageCache.getInstance(); while (!stoped) { SemaphoreCache.acquire(MessageSystemConfig.NotifyTaskSemaphoreValue); MessageDispatchTask task = MessageTaskQueue.getInstance().getTask(); queue.add(task); if (queue.size() == 0) { try { Thread.sleep(sleepTime); continue; } catch (InterruptedException ex) { Logger.getLogger(SendMessageController.class.getName()).log(Level.SEVERE, null, ex); } } if (queue.size() > 0 && (queue.size() % commitNumber == 0 || flushTask.get() == true)) { ref.commit(queue); queue.clear(); flushTask.compareAndSet(true, false); } timer.scheduleAtFixedRate(new TimerTask() { public void run() { try { flushTask.compareAndSet(false, true); } catch (Exception e) { System.out.println("SendMessageTaskMonitor happen exception"); } } }, 1000 * 1, period); } return null; } }
消息分派採用多線程並行派發,其內部經過柵欄機制,爲消息派發設置一個屏障點,後續能夠暴露給JMX接口,進行對整個消息系統,消息派發狀況的動態監控。好比發現消息積壓太多,能夠加大線程並行度。消息無堆積的話,下降線程並行度,減輕系統負荷。如今給出消息派發任務模塊SendMessageTask的核心代碼:
package com.newlandframework.avatarmq.core; import com.newlandframework.avatarmq.msg.ConsumerAckMessage; import com.newlandframework.avatarmq.msg.Message; import com.newlandframework.avatarmq.broker.SendMessageLauncher; import com.newlandframework.avatarmq.consumer.ClustersState; import com.newlandframework.avatarmq.consumer.ConsumerContext; import com.newlandframework.avatarmq.model.MessageType; import com.newlandframework.avatarmq.model.RequestMessage; import com.newlandframework.avatarmq.model.ResponseMessage; import com.newlandframework.avatarmq.model.RemoteChannelData; import com.newlandframework.avatarmq.model.MessageSource; import com.newlandframework.avatarmq.model.MessageDispatchTask; import com.newlandframework.avatarmq.netty.NettyUtil; import java.util.concurrent.Callable; import java.util.concurrent.Phaser; /** * @filename:SendMessageTask.java * @description:SendMessageTask功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class SendMessageTask implements Callable<Void> { private MessageDispatchTask[] tasks; //消息柵欄器,爲後續進行消息JMX實時監控預留接口 private Phaser phaser = null; private SendMessageLauncher launcher = SendMessageLauncher.getInstance(); public SendMessageTask(Phaser phaser, MessageDispatchTask[] tasks) { this.phaser = phaser; this.tasks = tasks; } public Void call() throws Exception { for (MessageDispatchTask task : tasks) { Message msg = task.getMessage(); if (ConsumerContext.selectByClusters(task.getClusters()) != null) { RemoteChannelData channel = ConsumerContext.selectByClusters(task.getClusters()).nextRemoteChannelData(); ResponseMessage response = new ResponseMessage(); response.setMsgSource(MessageSource.AvatarMQBroker); response.setMsgType(MessageType.AvatarMQMessage); response.setMsgParams(msg); response.setMsgId(new MessageIdGenerator().generate()); try { //消息派發的時候,發現管道不可達,跳過 if (!NettyUtil.validateChannel(channel.getChannel())) { ConsumerContext.setClustersStat(task.getClusters(), ClustersState.NETWORKERR); continue; } RequestMessage request = (RequestMessage) launcher.launcher(channel.getChannel(), response); ConsumerAckMessage result = (ConsumerAckMessage) request.getMsgParams(); if (result.getStatus() == ConsumerAckMessage.SUCCESS) { ConsumerContext.setClustersStat(task.getClusters(), ClustersState.SUCCESS); } } catch (Exception e) { ConsumerContext.setClustersStat(task.getClusters(), ClustersState.ERROR); } } } //若干個並行的線程共同到達統一的屏障點以後,再進行消息統計,把數據最終彙總給JMX phaser.arriveAndAwaitAdvance(); return null; } }
Message Serialize:消息的序列化模塊,主要基於Kryo。其主要的核心代碼爲:com.newlandframework.avatarmq.serialize包下面的KryoCodecUtil、KryoSerialize完成消息的序列化和反序列化工做。其對應的主要核心代碼模塊是:
package com.newlandframework.avatarmq.serialize; import com.esotericsoftware.kryo.pool.KryoPool; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @filename:KryoCodecUtil.java * @description:KryoCodecUtil功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class KryoCodecUtil implements MessageCodecUtil { private KryoPool pool; public KryoCodecUtil(KryoPool pool) { this.pool = pool; } public void encode(final ByteBuf out, final Object message) throws IOException { ByteArrayOutputStream byteArrayOutputStream = null; try { byteArrayOutputStream = new ByteArrayOutputStream(); KryoSerialize kryoSerialization = new KryoSerialize(pool); kryoSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); } finally { byteArrayOutputStream.close(); } } public Object decode(byte[] body) throws IOException { ByteArrayInputStream byteArrayInputStream = null; try { byteArrayInputStream = new ByteArrayInputStream(body); KryoSerialize kryoSerialization = new KryoSerialize(pool); Object obj = kryoSerialization.deserialize(byteArrayInputStream); return obj; } finally { byteArrayInputStream.close(); } } }
package com.newlandframework.avatarmq.serialize; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.pool.KryoPool; import com.google.common.io.Closer; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; /** * @filename:KryoSerialize.java * @description:KryoSerialize功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class KryoSerialize { private KryoPool pool = null; private Closer closer = Closer.create(); public KryoSerialize(final KryoPool pool) { this.pool = pool; } public void serialize(OutputStream output, Object object) throws IOException { try { Kryo kryo = pool.borrow(); Output out = new Output(output); closer.register(out); closer.register(output); kryo.writeClassAndObject(out, object); pool.release(kryo); } finally { closer.close(); } } public Object deserialize(InputStream input) throws IOException { try { Kryo kryo = pool.borrow(); Input in = new Input(input); closer.register(in); closer.register(input); Object result = kryo.readClassAndObject(in); pool.release(kryo); return result; } finally { closer.close(); } } }
Netty Core:基於Netty對producer、consumer、broker的網絡事件處理器(Handler)進行封裝處理,核心模塊在:com.newlandframework.avatarmq.netty包之下。其中broker的Netty網絡事件處理器爲ShareMessageEventWrapper、producer的Netty網絡事件處理器爲MessageProducerHandler、consumer的Netty網絡事件處理器爲MessageConsumerHandler。其對應的類圖爲:
能夠看到,他們共同的父類是:MessageEventWrapper。該類的代碼簡要說明以下:
package com.newlandframework.avatarmq.netty; import com.newlandframework.avatarmq.core.HookMessageEvent; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; /** * @filename:MessageEventWrapper.java * @description:MessageEventWrapper功能模塊 * @author tangjie<https://github.com/tang-jie> * @blog http://www.cnblogs.com/jietang/ * @since 2016-8-11 */ public class MessageEventWrapper<T> extends ChannelInboundHandlerAdapter implements MessageEventHandler, MessageEventProxy { final public static String proxyMappedName = "handleMessage"; protected MessageProcessor processor; protected Throwable cause; protected HookMessageEvent<T> hook; protected MessageConnectFactory factory; private MessageEventWrapper<T> wrapper; public MessageEventWrapper() { } public MessageEventWrapper(MessageProcessor processor) { this(processor, null); } public MessageEventWrapper(MessageProcessor processor, HookMessageEvent<T> hook) { this.processor = processor; this.hook = hook; this.factory = processor.getMessageConnectFactory(); } public void handleMessage(ChannelHandlerContext ctx, Object msg) { return; } public void beforeMessage(Object msg) { } public void afterMessage(Object msg) { } //管道鏈路激活 public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } //讀管道數據 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); ProxyFactory weaver = new ProxyFactory(wrapper); NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MessageEventWrapper.proxyMappedName); advisor.setAdvice(new MessageEventAdvisor(wrapper, msg)); weaver.addAdvisor(advisor); //具體的如何處理管道中的數據,直接由producer、consumer、broker自行決定 MessageEventHandler proxyObject = (MessageEventHandler) weaver.getProxy(); proxyObject.handleMessage(ctx, msg); } //管道鏈路失效,可能網絡鏈接斷開了,後續若是重連broker,能夠在這裏作文章 public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } public void setWrapper(MessageEventWrapper<T> wrapper) { this.wrapper = wrapper; } }
整個AvatarMQ消息隊列系統的運行狀況,能夠參考:Netty構建分佈式消息隊列(AvatarMQ)設計指南之架構篇,裏面說的很詳細了,本文就不具體演示了。
下圖是VisualVM監控AvatarMQ中broker服務器的CPU使用率曲線。
能夠發現,隨着消息的堆積,broker進行消息投遞、ACK應答的壓力增大,CPU的使用率明細提升。如今具體看下broker的CPU使用率增高的緣由是調用哪一個熱點方法呢?
從下圖能夠看出,熱點方法是:SemaphoreCache的acquire。
這個是由於broker接收來自生產者消息的同時,會先把消息緩存起來,而後利用多線程機制進行消息的分派,這個時候會對信號量維護的許可集合進行獲取操做,獲取成功以後,才能進行任務的派發,主要防止臨界區的共享資源競爭。這裏的Semaphore是用來控制多線程訪問共享資源(生產者過來的消息),相似操做系統中的PV原語,P原語至關於acquire(),V原語至關於release()。
寫在最後
本文經過一個基於Netty構建分佈式消息隊列系統(AvatarMQ),簡單地闡述了一個極簡消息中間件的內部結構、以及如何利用Netty,構建生產者、消費者消息路由的通訊模塊。一切都是從零開始,開發、實現出精簡版的消息中間件!本系列文章的主要靈感源自,本身業餘時間,閱讀到的一些消息隊列原理闡述文章以及相關開源消息中間件的源代碼,其中也結合了本身的一些理解和體會。因爲自身技術水平、理解能力方面的限制,不能可能擁有大師同樣高屋建瓴的視角,本文有說得不對、寫的很差的地方,懇請廣大同行批評指正。如今,文章寫畢,算是對本身平時學習的一些經驗總結,在這以前,對於消息中間件都是很簡單的使用別人造好的輪子,沒有更多的深刻了解背後的技術細節,只是單純的以爲別人寫的很強大、很高效。其實有的時候提高本身能力,要更多的深究其背後的技術原理,觸類旁通,而不是簡單的走馬觀花,一味地點到爲止,久而久之、日復一日,自身的技術積累就很難有質的飛躍。
AvatarMQ必定還有許多不足、瓶頸甚至是bug,確實它不是一個完美的消息中間件,真由於如此,還須要不斷地進行重構優化。後續本人還會持續更新、維護這個開源項目,但願有興趣的朋友,共同關注!
文章略長,謝謝你們的觀賞,若是以爲不錯,還請多多推薦!
附上AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ
唐潔寫於2016年9月7日 白露。