RocketMQ在Windows平臺下環境搭建

一.  環境搭建

須要jdk1.6(以上) 64bit, maven, eclipse

二.  RocketMQ項目下載

項目地址:https://github.com/alibaba/RocketMQ,將下載的RocketMQ-master放到eclipse工做空間中java

三. 將RocketMQ-master導入到eclipse中

  1. 將項目導入eclipse,以下圖



2. 在我下載的RocketMQ-master的pom.xml文件的parent有個問題,默認以下:
    <parent>
        <groupId>com.taobao</groupId>
        <artifactId>parent</artifactId>
        <version>1.0.2</version>
    </parent>
    
    <!-- <parent>
        <groupId>org.sonatype.oss</groupId>
        <artifactId>oss-parent</artifactId>
        <version>7</version>
    </parent> -->
        
可是編譯時總報錯parent找不到,而用下面的parent,則編譯經過。

    <!--<parent>
        <groupId>com.taobao</groupId>
        <artifactId>parent</artifactId>
        <version>1.0.2</version>
    </parent>-->
    
    <parent>
        <groupId>org.sonatype.oss</groupId>
        <artifactId>oss-parent</artifactId>
        <version>7</version>
    </parent>

3.  因爲我用的是jdk1.7,故修改RocketMQ-master.pom的jdk版本

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--maven properties -->
        <maven.test.skip>true</maven.test.skip>
        <maven.jdoc.skip>true</maven.jdoc.skip>
        <downloadSources>true</downloadSources>
        <!-- compiler settings properties -->
        <java_source_version>1.7</java_source_version>
        <java_target_version>1.7</java_target_version>
        <file_encoding>UTF-8</file_encoding>
    </properties>

四.  編譯RocketMQ項目

1.  在命令行執行在RocketMQ-master文件夾下的install.bat批處理


2.  該命令會編譯整個項目,並在RocketMQ-master目錄下生成一個target文件夾

3.  進入剛生成的target文件夾下的bin目錄,在命令行中執行: start mqnamesrv.exe,會彈出一個信息窗口,顯示The name Server boot success 說明啓動成功了,接着啓動borker,在命令行中執行:start mqbroker.exe -n 127.0.0.1:9876 一樣的彈出一個窗口,看到success表示成功了。 
   

  

五.   啓動Producer和Customer

        1.   在RocketMQ-example項目中加入Producer.java

public class Producer {  
     public static void main(String[] args) throws MQClientException,  
     InterruptedException{  
  /** 
   * 一個應用建立一個Producer,由應用來維護此對象,能夠設置爲全局對象或者單例<br> 
   * 注意:ProducerGroupName須要由應用來保證惟一<br> 
   * ProducerGroup這個概念發送普通的消息時,做用不大,可是發送分佈式事務消息時,比較關鍵, 
   * 由於服務器會回查這個Group下的任意一個Producer 
   */  
  final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
  producer.setNamesrvAddr("127.0.0.1:9876");  
  producer.setInstanceName("Producer");  
  
  /** 
   * Producer對象在使用以前必需要調用start初始化,初始化一次便可<br> 
   * 注意:切記不能夠在每次發送消息時,都調用start方法 
   */  
  producer.start();  
  
  /** 
   * 下面這段代碼代表一個Producer對象能夠發送多個topic,多個tag的消息。 
   * 注意:send方法是同步調用,只要不拋異常就標識成功。可是發送成功也可會有多種狀態,<br> 
   * 例如消息寫入Master成功,可是Slave不成功,這種狀況消息屬於成功,可是對於個別應用若是對消息可靠性要求極高,<br> 
   * 須要對這種狀況作處理。另外,消息可能會存在發送失敗的狀況,失敗重試由應用來處理。 
   */  
  for (int i = 0; i < 10; i++){  
     try {  
        {  
            Message msg = new Message("TopicTest1",// topic  
                  "TagA",// tag  
                  "OrderID001",// key  
                  ("Hello MetaQA").getBytes());// body  
            SendResult sendResult = producer.send(msg);  
            System.out.println(sendResult);  
        }  
  
        {  
            Message msg = new Message("TopicTest2",// topic  
                  "TagB",// tag  
                  "OrderID0034",// key  
                  ("Hello MetaQB").getBytes());// body  
            SendResult sendResult = producer.send(msg);  
            System.out.println(sendResult);  
        }  
  
        {  
            Message msg = new Message("TopicTest3",// topic  
                  "TagC",// tag  
                  "OrderID061",// key  
                  ("Hello MetaQC").getBytes());// body  
            SendResult sendResult = producer.send(msg);  
            System.out.println(sendResult);  
        }  
     }catch(Exception e) {  
        e.printStackTrace();  
     }  
     TimeUnit.MILLISECONDS.sleep(1000);  
  }  
  
  /** 
   * 應用退出時,要調用shutdown來清理資源,關閉網絡鏈接,從MetaQ服務器上註銷本身 
   * 注意:咱們建議應用在JBOSS、Tomcat等容器的退出鉤子裏調用shutdown方法 
   */  
//producer.shutdown();  
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  
     public void run() {  
        producer.shutdown();  
     }  
  }));  
  System.exit(0);  
}  
}  

2.  加入Customer.java

public class Consumer {  
     /**  
     * 當前例子是PushConsumer用法,使用方式給用戶感受是消息從RocketMQ服務器推到了應用客戶端。<br>  
     * 可是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,而後再回調用戶Listener方法<br>  
     */    
    public static void main(String[] args) throws InterruptedException,    
                       MQClientException{    
              /**  
               * 一個應用建立一個Consumer,由應用來維護此對象,能夠設置爲全局對象或者單例<br>  
               * 注意:ConsumerGroupName須要由應用來保證惟一  
               */    
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(    
                                "ConsumerGroupName");    
              consumer.setNamesrvAddr("127.0.0.1:9876");    
              consumer.setInstanceName("Consumber");    
  
              /**  
               * 訂閱指定topic下tags分別等於TagA或TagC或TagD  
               */    
              consumer.subscribe("TopicTest1","TagA || TagC || TagD");    
              /**  
               * 訂閱指定topic下全部消息<br>  
               * 注意:一個consumer對象能夠訂閱多個topic  
               */    
              consumer.subscribe("TopicTest2","*");    
  
              consumer.registerMessageListener(new MessageListenerConcurrently() {    
  
                       public ConsumeConcurrentlyStatus consumeMessage(    
                                          List<MessageExt>msgs, ConsumeConcurrentlyContext context) {    
  
                                System.out.println(Thread.currentThread().getName()    
                                                   +" Receive New Messages: " + msgs.size());    
  
                                MessageExt msg = msgs.get(0);    
                                if(msg.getTopic().equals("TopicTest1")) {    
                                          //執行TopicTest1的消費邏輯    
                                          if(msg.getTags() != null && msg.getTags().equals("TagA")) {    
                                                   //執行TagA的消費    
                                                   System.out.println(new String(msg.getBody()));    
                                          }else if (msg.getTags() != null    
                                                            &&msg.getTags().equals("TagC")) {    
                                                   //執行TagC的消費    
                                                   System.out.println(new String(msg.getBody()));    
                                          }else if (msg.getTags() != null    
                                                            &&msg.getTags().equals("TagD")) {    
                                                   //執行TagD的消費    
                                                   System.out.println(new String(msg.getBody()));    
                                          }    
                                }else if (msg.getTopic().equals("TopicTest2")) {    
                                          System.out.println(new String(msg.getBody()));    
                                }    
  
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    
  
                       }    
              });    
  
              /**  
               * Consumer對象在使用以前必需要調用start初始化,初始化一次便可<br>  
               */    
              consumer.start();    
  
              System.out.println("ConsumerStarted.");    
    }    
}  
3.  運行Producer

19:50:57.233 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
19:50:57.259 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 4
19:50:57.274 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
19:50:57.274 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
19:50:57.274 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
19:50:57.275 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
19:50:57.275 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows
19:50:57.276 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7
19:50:57.276 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
19:50:57.276 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
19:50:57.277 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
19:50:57.506 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: available
19:50:57.507 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\tannj\AppData\Local\Temp (java.io.tmpdir)
19:50:57.507 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
19:50:57.507 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
19:50:57.544 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
19:50:57.544 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
19:50:57.875 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0xfaf4f653b3d8be50 (took 52 ms)
19:50:57.927 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: unpooled
19:50:57.927 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
19:50:57.958 [NettyClientSelector_1] DEBUG i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated: io.netty.util.internal.__matchers__.com.alibaba.rocketmq.remoting.protocol.RemotingCommandMatcher
19:50:58.006 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacity.default: 262144
19:50:58.027 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
19:50:58.255 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A36A, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=0], queueOffset=73578]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A3F2, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=1], queueOffset=73577]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A47A, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=2], queueOffset=73577]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A502, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=3], queueOffset=73576]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A58A, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=0], queueOffset=73579]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A612, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=1], queueOffset=73578]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A69A, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=2], queueOffset=73578]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A722, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=3], queueOffset=73577]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A7AA, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=0], queueOffset=73580]
SendResult [sendStatus=SEND_OK, msgId=0A016F9600002A9F000000000272A832, messageQueue=MessageQueue [topic=TopicTest, brokerName=tannj-PC, queueId=1], queueOffset=73579]
4.  運行Customer

19:51:49.059 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
19:51:49.069 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 4
19:51:49.083 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
19:51:49.084 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
19:51:49.084 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
19:51:49.085 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
19:51:49.085 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows
19:51:49.086 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7
19:51:49.086 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
19:51:49.086 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
19:51:49.086 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
19:51:49.299 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: available
19:51:49.300 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\tannj\AppData\Local\Temp (java.io.tmpdir)
19:51:49.300 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
19:51:49.300 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
19:51:49.338 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
19:51:49.338 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
19:51:49.617 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0x51d30e47b2203417 (took 19 ms)
19:51:49.685 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: unpooled
19:51:49.685 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
19:51:49.728 [NettyClientSelector_1] DEBUG i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated: io.netty.util.internal.__matchers__.com.alibaba.rocketmq.remoting.protocol.RemotingCommandMatcher
19:51:49.779 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacity.default: 262144
19:51:49.811 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
19:51:50.034 [NettyClientSelector_1] DEBUG io.netty.util.internal.Cleaner0 - java.nio.ByteBuffer.cleaner(): available
Consumer Started.
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=73578, sysFlag=0, bornTimestamp=1421236258344, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258374, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A36A, commitLogOffset=41067370, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73581, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=73577, sysFlag=0, bornTimestamp=1421236258376, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258377, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A3F2, commitLogOffset=41067506, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73580, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=73578, sysFlag=0, bornTimestamp=1421236258387, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258387, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A612, commitLogOffset=41068050, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73580, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=73576, sysFlag=0, bornTimestamp=1421236258382, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258383, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A502, commitLogOffset=41067778, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73578, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=73577, sysFlag=0, bornTimestamp=1421236258379, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258381, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A47A, commitLogOffset=41067642, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73579, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=73579, sysFlag=0, bornTimestamp=1421236258384, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258385, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A58A, commitLogOffset=41067914, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73581, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=73580, sysFlag=0, bornTimestamp=1421236258394, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258395, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A7AA, commitLogOffset=41068458, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73581, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=73577, sysFlag=0, bornTimestamp=1421236258392, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258393, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A722, commitLogOffset=41068322, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73578, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=73578, sysFlag=0, bornTimestamp=1421236258390, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258391, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A69A, commitLogOffset=41068186, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73579, MIN_OFFSET=0}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=73579, sysFlag=0, bornTimestamp=1421236258398, bornHost=/10.1.111.150:53985, storeTimestamp=1421236258397, storeHost=/10.1.111.150:10911, msgId=0A016F9600002A9F000000000272A832, commitLogOffset=41068594, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=73580, MIN_OFFSET=0}, body=16]]]
至此,已把RocketMQ在window平臺下運行起來。
相關文章
相關標籤/搜索