MQ:
消息總線(Message Queue),是一種跨進程的通訊機制,用於上下游傳遞消息。在互聯網架構中,MQ是一種很是常見的上下游「邏輯解耦+物理解耦」的消息通訊服務。使用MQ以後,消息發送上游只須要依賴MQ,邏輯上和物理上都不用依賴其餘服務。
MQ的不足
(1)系統更加複雜,多了一個MQ組件
(2)消息傳遞路徑更長,延時會增長
(3)消息可能會被重複消費
(4)上游沒法知道下游的執行結果(所以,調用方實時依賴執行結果的業務場景,請使用調用,而不是MQ)docker
使用場景
(1)上游不關注執行結果
(2)上游關注結果,但執行時間比較長。舉個例子,微信支付,跨公網調用微信的接口,執行時間會比較長,但調用方又很是關注執行結果,此時通常怎麼玩呢?
通常採用「回調網關+MQ」方案來解耦:微信
a、調用方直接跨公網調用微信接口 b、微信返回調用成功,此時並不表明返回成功 c、微信執行完成後,回調統一網關 d、網關將返回結果通知MQ e、請求方收到結果通知
支持發佈/訂閱(Pub/Sub)和點對點(P2P)消息模型
在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
支持拉(pull)和推(push)兩種消息模式
單一隊列百萬消息的堆積能力
支持多種消息協議,如 JMS、MQTT 等
分佈式高可用的部署架構,知足至少一次消息傳遞語義
提供 docker 鏡像用於隔離測試和雲集羣部署
提供配置、指標和監控等功能豐富的 Dashboard架構
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //須要一個producer group名字做爲構造方法的參數,這裏爲producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設置NameServer地址,此處應改成實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,可是也能夠經過環境變量的方式設置,不必定非得寫死在代碼裏 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調用start()方法啓動一個producer實例 producer.start(); //發送10條消息到Topic爲TopicTest,tag爲TagA,消息內容爲「Hello RocketMQ」拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調用producer的send()方法發送消息 //這裏調用的是同步的方式,因此會有返回結果 SendResult sendResult = producer.send(msg); //打印返回結果,能夠看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息以後,調用shutdown()方法關閉producer producer.shutdown(); } }
在開發過程當中,若是想測試生產者是否發出了mq,能夠編寫一個消費者進行測試分佈式
@Test public void testMqConsumer() throws Exception { String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876"; int threadNum = 5; String topics = "WechatUnionCoreTemplateNotifyTopic"; String instanceName = "TemplateComsumer"; String groupName = "wechatUnionTemplateNotifyConsumer"; DefaultMQPushConsumer consumer = null; consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(rocketmqAddress);//MQ地址 consumer.setClientCallbackExecutorThreads(threadNum);//消費現場數量 consumer.setInstanceName(instanceName);//實例名稱 consumer.subscribe(topics, "*"); //註冊監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (int i = 0; i < msgs.size(); i++) { MessageExt msgExt = msgs.get(i); String msgId = msgExt.getMsgId(); Integer flag = msgExt.getFlag(); TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class); logger.info("receive new Msg: " + " msgId=" + msgId + " flag=" + flag + " templateNotifyItem=" + templateNotifyItem); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); logger.info("監聽執行中"); Thread.sleep(1000000); }
參考:
http://blog.csdn.net/manzhizh...
https://www.jianshu.com/p/824...
架構師之路ide