消息中間件avitiveMQ,RabbitMQ, KafKa, ZeroMQ層出不窮, 做爲如今必備的 一項技術,博客中一次也沒有說起,今天來一發RocketMQ。apache
RocketMQ 火箭,顧名思義速度很是快。選擇RocketMQ的理由:app
1.國產開源項目,支持國產。maven
2.雖然吞吐量不及KafKa,相比其它MQ,性能優。性能
3.穩定性很是好,經受了雙十一的考驗,榮譽滿滿。學習
4. 功能強大,易用性。測試
廢話說了一堆,開始搞起。須要環境: jdk, maven, RocketMQ.ui
下載一個RocketMQ, 本人 3.5.8版本 ,而後解壓url
環境變量 設置:spa
NAMESRV_ADDRserver
127.0.0.1:9876
運行maven命令:
mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U
啓動調度管理器: mqnamesrv
{RocketMQ目錄}/bin/mqnamesrv
啓動mqbroker:
{RocketMQ目錄}/bin/mqbroker
寫producer的測試代碼:
public class Producer { private void startProduce(){ DefaultMQProducer defaultMQProducer= new DefaultMQProducer("Producer"); //nameserver服務,多個以;分開 defaultMQProducer.setNamesrvAddr("127.0.0.1:9876"); try { defaultMQProducer.start(); for(int j=0;j<200;j++) { for (int i = 0; i < 5; i++) { String str = String.valueOf(i); Message message = new Message("PushTopic", "push", str, new String("content" + str).getBytes()); defaultMQProducer.send(message); System.out.println("send OK!:" + str); Thread.sleep(1000); } } }catch (Exception e){ e.printStackTrace(); } finally { defaultMQProducer.shutdown(); } } public static void main(String[] args) { Producer producer=new Producer(); producer .startProduce(); } }
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dennis.rocketmqroot</groupId> <artifactId>productor</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>productor Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.4.6</version> </dependency> </dependencies> <build> <finalName>productor</finalName> </build> </project>
consumer端的代碼:
/** * Created by Dennis on 2017/8/6. */ public class Consumer1 { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); try { //訂閱PushTopic下Tag爲push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次啓動從消息隊列頭取數據 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { for(Message msg:list){ System.out.print("topic:"+msg.getTopic()+",tag:"+msg.getTags()+",key:"+msg.getKeys()); byte[] b=msg.getBody(); System.out.print(",body:"); for(byte b1:b){ System.out.print((char)b1); } System.out.println(); System.out.println("*******************************************************"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
pom.xml 和producer 一致
致此測試代碼寫完,運行producer 和 consumer, consumer 啓動2個類。
Server端打印:
consumer1 的打印:
consumer2 的打印:
至此一個簡單的 生產消費結束,RocketMQ很優秀,也很精深,你們一塊兒來學習吧.