RocketMQ

       消息中間件avitiveMQ,RabbitMQ, KafKa, ZeroMQ層出不窮, 做爲如今必備的 一項技術,博客中一次也沒有說起,今天來一發RocketMQ。apache

       RocketMQ 火箭,顧名思義速度很是快。選擇RocketMQ的理由:app

       1.國產開源項目,支持國產。maven

       2.雖然吞吐量不及KafKa,相比其它MQ,性能優。性能

       3.穩定性很是好,經受了雙十一的考驗,榮譽滿滿。學習

       4. 功能強大,易用性。測試

廢話說了一堆,開始搞起。須要環境: jdk, maven, RocketMQ.ui

下載一個RocketMQ,  本人 3.5.8版本 ,而後解壓url

環境變量 設置:spa

NAMESRV_ADDRserver

127.0.0.1:9876

運行maven命令:

mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U

啓動調度管理器: mqnamesrv

{RocketMQ目錄}/bin/mqnamesrv

啓動mqbroker:

{RocketMQ目錄}/bin/mqbroker

寫producer的測試代碼:

public class Producer {
    private  void startProduce(){
        DefaultMQProducer defaultMQProducer= new DefaultMQProducer("Producer");
        //nameserver服務,多個以;分開
        defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");
        try {
            defaultMQProducer.start();
            for(int j=0;j<200;j++) {
                for (int i = 0; i < 5; i++) {
                    String str = String.valueOf(i);
                    Message message = new Message("PushTopic", "push", str, new String("content" + str).getBytes());
                    defaultMQProducer.send(message);
                    System.out.println("send OK!:" + str);
                    Thread.sleep(1000);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            defaultMQProducer.shutdown();
        }
    }

    public static void main(String[] args) {
        Producer producer=new Producer();
        producer .startProduce();
    }
}

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dennis.rocketmqroot</groupId>
  <artifactId>productor</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>productor Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.2</version>
  </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.alibaba.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>3.4.6</version>
    </dependency>
</dependencies>
  <build>
    <finalName>productor</finalName>
  </build>
</project>

 

consumer端的代碼:

 

/**
 * Created by Dennis on 2017/8/6.
 */
public class Consumer1 {

    public static void main(String[] args) {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
                consumer.setNamesrvAddr("127.0.0.1:9876");
                try {
                        //訂閱PushTopic下Tag爲push的消息
                        consumer.subscribe("PushTopic", "push");
                        //程序第一次啓動從消息隊列頭取數據
                        consumer.setConsumeFromWhere(
                                ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                        consumer.registerMessageListener(
                                   new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(
                           List<MessageExt> list,
                            ConsumeConcurrentlyContext Context) {
                                            for(Message msg:list){
                                                System.out.print("topic:"+msg.getTopic()+",tag:"+msg.getTags()+",key:"+msg.getKeys());
                                                byte[] b=msg.getBody();
                                                System.out.print(",body:");
                                                for(byte b1:b){
                                                    System.out.print((char)b1);
                                                }
                                                System.out.println();
                                                System.out.println("*******************************************************");
                                            }
                                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                        }
                }
                        );
                            consumer.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            }


}

pom.xml 和producer 一致

致此測試代碼寫完,運行producer 和 consumer,  consumer 啓動2個類。

Server端打印:

 

consumer1 的打印:

consumer2 的打印:

至此一個簡單的 生產消費結束,RocketMQ很優秀,也很精深,你們一塊兒來學習吧.

相關文章
相關標籤/搜索