一:什麼是MQjava
MQ 就是 消息中間件。apache
二:爲何使用MQide
場景:電商雙十一 零點的秒殺。在那一瞬間,來自用戶的請求將會激增,若是不作任何措施,那服務極可能會被壓垮。可是咱們又不能直接把這些請求丟棄,而爲了這個很小的時間段去擴容機器又顯得大題小作。因而咱們天然而言的想到,能不能把這些請求先放到一個消息隊列裏面,而後系統從消息隊列裏面拿出來請求作邏輯的處理和響應。經過拉長時間維度來保證服務的穩定性。這就是MQ。ui
使用MQ只要解決的就是 在生產者消費者模式中,生產者生產的數據可能會忽然激增,消費者來不及消費的問題。spa
三:rocketMQ中間件
rocketMQ是一個MQ的實現。咱們在開發中一直在強調,不要重複造輪子。既然咱們須要一個MQ,那就找個別人實現過的MQ來用就好了。rocketMQ就是其中的一種。固然,還有其餘的MQ組件,好比的 ActiveMQ、RabbitMQ,Kafka。blog
四:rocketMQ下載隊列
http://rocketmq.apache.org/release_notesip
下載bin的包,好比 rocketmq-all-4.3.2-bin-release.zip 開發
五:安裝
將下載的文件解壓到對應目錄。好比我解壓到 C:\rocketmq-all-4.3.2
六:啓動NAMESERVER
去 C:\rocketmq-all-4.3.2\bin目錄下找到 mqnamesrv.cmd,雙擊運營便可。
七:啓動BROKER
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
( 假如彈出提示框提示‘錯誤: 找不到或沒法加載主類 xxxxxx’。打開runbroker.cmd,而後將‘%CLASSPATH%’加上英文雙引號。保存並從新執行start語句。)
至此爲止,rocketMQ就安裝啓動完成了。下面咱們寫的demo來使用rocket作一個helloWord
八:寫一個生產者,發消息
public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep(2000); //每2秒發送一次消息 Message msg = new Message("TopicA-test",// topic "TagA",// tag (new Date() + "Hello RocketMQ ,QuickStart" + i) .getBytes()// body ); SendResult sendResult = producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
九:寫一個消費者,用來監聽消息
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "rmq-group"); consumer.setNamesrvAddr("127.0.0.1:9876");//設置rocketMQ服務的部署地址 consumer.setInstanceName("consumer"); /** * 被訂閱消息的topic 和 subExpression。 * 注意:必定要與消息發佈者的topic 和 subExpression 一致 */ consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() {//監聽器實現 @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody()));//每次拿到消息我就打印出來 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
一個簡單的demo就OK了