RocketMQ入門(生產者)_2

從 RocketMQ環境搭建_1 咱們已經創建了MQ的Server,接下來就是簡單的生產和消費的過程。java

1. rocketMQ的源碼中有個示例代碼example  ,咱們從Apache官網中能夠下載源碼source找到example,進行學習。數據庫

下載地址:http://rocketmq.apache.org/docs/quick-start/apache

創建簡單的工程,mvn最主要依賴clientapi

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.1</version>
</dependency>

 在作transactionProducer時,發現沒法消費,問題是須要依賴parent,所以借鑑demo中的mvn依賴:安全

    <parent>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-all</artifactId>
        <version>4.3.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <artifactId>rocketmq-example</artifactId>
    <name>rocketmq-example ${project.version}</name>

    <dependencies>
        <dependency>
            <groupId>${project.groupId}</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>
        <dependency>
            <groupId>${project.groupId}</groupId>
            <artifactId>rocketmq-srvutil</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>
         <dependency>
            <groupId>org.javassist</groupId>
            <artifactId>javassist</artifactId>
        </dependency>
        <dependency>
            <groupId>io.openmessaging</groupId>
            <artifactId>openmessaging-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-openmessaging</artifactId>
            <version>4.3.0</version> 
        </dependency>
    </dependencies>

建立Producer類:微信

/**
 * 簡單生產者
 * 
 * @author DennyZhao
 * @since 2018/10/29
 * @version 1.0
 */
public class Producer {

    /**
     * main方法
     * 
     * @param args
     * @throws InterruptedException
     * @throws MQBrokerException
     * @throws RemotingException
     * @throws MQClientException
     * @throws UnsupportedEncodingException
     */
    public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException, UnsupportedEncodingException {
        //建立生產者實例,並肯定生產組
        DefaultMQProducer producer = new DefaultMQProducer("fruitProducerGroup");
        // 指定服務NameServer服務
        producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
        // 生產者啓動
        producer.start();
        String[] fruitArray = { "apple", "strawbarry", "pear", "banana", "orange" };
        for (String fruit : fruitArray) {
            // 建立消息
            Message message = new Message("fruit", "common", fruit.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 發送消息
            SendResult result = producer.send(message);
            SendStatus sendStatus = result.getSendStatus();
            // 獲取回執
            System.out.println(result);
            if (sendStatus == SendStatus.SEND_OK) {
                System.out.println("信息發送成功!");
            } else {
                System.out.println("信息發送失敗!");
            }
        }
        // 關閉生產者
        producer.shutdown();
    }
}

 建立Consumer類:多線程

/**
 * 消費者羣體
 * @author DennyZhao
 * @since 2018/10/29
 * @version 1.0
 */
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        //建立消費者實例和組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("fruitConsumerGroup");
        // 指定nameServer服務地址
        consumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
        // 訂閱消費Topic
        consumer.subscribe("fruit", "*");
        // 訂閱從何地方開始讀(先進先出,仍是先進後出)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 添加監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //獲取數據,防止一次獲取太多沒法消化,可一次取單個條數。
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext arg1) {
                if(msgList != null && msgList.size() > 0) {
                    MessageExt msg = msgList.get(0);
                    System.out.println(msg);
                    try {
                        System.out.println(new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
        });
        // 啓動消費者
        consumer.start();
    }
}

 

※注意:啓動最好先啓動消費者而後再啓動服務者。app

從RocketMQ-console中能夠看到建立的消費者羣和生產者羣。至此,簡單的生產消費就算大功告成。分佈式

配置說明:

1.producer-生產者類型post

  • 1. NormalProducer         (普通生產者)
  • 2. OrderProducer           (嚴格順序生產者,例如:訂單建立,付款,發貨等)
  • 3. TransactionProducer  (事務生產者)

msgId和msgKey很是關鍵:msgId是mq自動生成的,可在控制檯message中查找數據。

                                            msgKey大可能是業務主鍵key,用於跟蹤數據,好比訂單號等。

msg中有個很重要的屬性:在producer端放置:msg.putUserProperty([key],  [value]);  //是個map,可放內容,在consumer端獲取

主要可選參數:

        // 設置超過多大進行compress壓縮
        producer.setCompressMsgBodyOverHowmuch(1024 * 10);
        // 設置發送失敗的嘗試次數。
        producer.setRetryTimesWhenSendFailed(3);
        // 設置若是返回值不是send_ok,是否要從新發送
        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
        // 設置限制最大的文件大小
        producer.setMaxMessageSize(1024*50);
        // 設置默認主題對應的隊列數
        producer.setDefaultTopicQueueNums(4);
        //建立新的topic
        producer.createTopic("1121", "vegetables", 4);
        // 設置發送超時時間 ms
        producer.setSendMsgTimeout(1000);

OrderProducer 採用將有序內容放在單個queue,保證消費的順序進行。可參見示例中的order代碼。

         Producer類不一樣點展現,發送消息:

            // 發送消息
            SendResult result = producer.send(message, new MessageQueueSelector() {
                public MessageQueue select(List<MessageQueue> msgList, Message message, Object queueId) {
                    return msgList.get(Integer.valueOf(queueId.toString()));
                }}, 0); //這個0表示將這些msg放入到隊列0中

         Consumer類不一樣點展現,接受消息:

// 添加監聽
        consumer.registerMessageListener(new MessageListenerOrderly() {

            //獲取數據,防止一次獲取太多沒法消化,可一次取單個條數。
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext arg1) {
                if(msgList != null && msgList.size() > 0) {
                    MessageExt msg = msgList.get(0);
                    System.out.println(msg);
                        System.out.println(msg.getBody());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
            
        });

 TransactionProducer :

       用於解決事務同步,尤爲是金融方面,在單體應用中咱們能夠經過數據庫事務來控制,但大的電商系統表和表可能分屬於不一樣的數據庫,數據庫事務則失效。

       好比微信轉銀行操做:咱們從銀行扣款100元到微信,若是微信增長100元,而此時出現問題,銀行扣款沒有成功這樣理論上是要回滾微信的增長100元。

       通常分佈式採用2pc(2 phase commit)模式(兩階段提交協議:預留和確認),安全性高,可是由於長鏈接致使長時間等待。

      而RocketMQ採用兩階段補償型,TCC(Try-Confirm-Cancel)的簡稱。

     應用場景:買2張票(春運回家,從天津->上海->武漢),先買預留,而後在規定時間內付款則commit,不然過時後rollback.(不管哪一個)

    異常回溯:在3.2.6+版本的非商業版已經取消,

須要手動回查參見別人的文章RocketMQ事務消息回查設計方案

 常對應參數:

/**
 * 作數據反查輪詢用 UN KNOW,時會用到反查目前已經 deprecated
 */
producer.setCheckThreadPoolMaxSize(5);
producer.setCheckThreadPoolMinSize(2);
producer.setCheckRequestHoldMax(200);//回查最大數

 生產者修改:添加

sendMessage變化:TransactionSendResult 
producer變化: TransactionMQProducer
添加監聽: TransactionListener
因非商業3.2.6取消回查:所以 producer.setExecutorService(executorService);沒有做用,原本是用於開啓多線程進行回查用
/**
 * 事務生產者
 * 
 * @author DennyZhao
 * @since 2018/10/31
 * @version 1.0
 */
public class Producer {

    /**
     * main方法
     * 
     * @param args
     * @throws InterruptedException
     * @throws MQBrokerException
     * @throws RemotingException
     * @throws MQClientException
     * @throws UnsupportedEncodingException
     */
    public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException, UnsupportedEncodingException {
        //建立生產者實例,並肯定生產組
        TransactionMQProducer producer = new TransactionMQProducer("transProducerGroup");
        TransactionListener transListener = new FruitTransactionListener();
        // 指定服務NameServer服務
        producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
//        // 設置超過多大進行compress壓縮
//        producer.setCompressMsgBodyOverHowmuch(1024 * 10);
//        // 設置發送失敗的嘗試次數。
//        producer.setRetryTimesWhenSendFailed(3);
//        // 設置若是返回值不是send_ok,是否要從新發送
//        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
//        // 設置限制最大的文件大小
//        producer.setMaxMessageSize(1024*50);
//        // 設置默認主題對應的隊列數
//        producer.setDefaultTopicQueueNums(4);
//        // 設置發送超時時間 ms
//        producer.setSendMsgTimeout(1000);
        /**
         * 作數據反查輪詢用
         */
//        producer.setCheckThreadPoolMaxSize(5);
//        producer.setCheckThreadPoolMinSize(2);
//        producer.setCheckRequestHoldMax(200);//回查最大數
        
        
        
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {

            public Thread newThread(Runnable r) {
                Thread th = new Thread(r);
                th.setName("client-transaction-msg-check-thread");
                System.out.println("id:" + th.getId());
                System.out.println("name:" + th.getName());
                return th;
            }
       });
        producer.setExecutorService(executorService);
        /**
         * 添加監聽
         */
        producer.setTransactionListener(transListener);
        
        // 生產者啓動
        producer.start();
        String[] fruitArray = { "apple-蘋果", "strawbarry-草莓", "pear-梨子", "banana-香蕉", "orange-橘子"};
        for (String fruit : fruitArray) {
            // 建立消息
            Message message = new Message("transactionFruit", "common", "key"+fruit, fruit.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 發送消息
            TransactionSendResult result = producer.sendMessageInTransaction(message, "abcd");
             Thread.sleep(10);
            SendStatus sendStatus = result.getSendStatus();
            // 獲取回執
            System.out.println(result);
            if (sendStatus == SendStatus.SEND_OK) {
                System.out.println("信息發送成功!");
            } else {
                System.out.println("信息發送失敗!");
            }
        }
        int j = 0;
        while(j <500) {
            Thread.sleep(1000);
            j++;
        }
        // 關閉生產者
        producer.shutdown();
    }

 

 監聽:

因回查被取消所以:checkLocalTransaction(MessageExt msg)沒有做用了,因此若是
LocalTransactionState.UNKNOW 將沒法處理,會使得topic一直處於不顯示狀態。
/**
 * 事務執行監聽
 * @author DennyZhao
 *
 */
public class FruitTransactionListener implements TransactionListener {

    /**
     * 執行事務,事務成功commit,不成功rollback,未知unknown
     * msg Message
     * arg 附加參數,用於處理傳遞內容加以判斷,使用
     * 
     */
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println((String) arg);
        // 執行事務處
        // 假設以水果入庫爲例:蘋果,香蕉 commit,梨子 rollback, 橘子和草莓不知道怎麼處理
        System.out.println(msg + "---executeLocal");
        System.out.println(msg.getTransactionId());
        String fruit = "";
        try {
            fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        if(StringUtils.contains(fruit, "蘋果")
                || StringUtils.contains(fruit, "香蕉")) {//提交
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(fruit, "梨")) {//回滾
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else { //經過輪詢去處理
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * 輪詢反查,對於unknow的內容,進行反查獲取結果
     */
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println(msg + "---checkAgain");
        String fruit = "";
        try {
            fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //從庫中反查知道庫中 存在葡萄,不存在橘子 
        if(StringUtils.contains(fruit, "葡萄")) {//提交
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(fruit, "橘子")) {//回滾
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }

 

 

 

錯誤說明:

1. No route info of this topic

因在啓動broker時,參數中未設置可自動建立topic,所以生產者建立topic被認爲不合法,須要在console中先建立topic,或者服務端先建立topic。

2. transactionProducer 生產後沒法消費

mvn依賴中缺乏 <parent>rocketmq-all</parent>致使。

3. 事務回查無效

 版本 3.2.6 後rocketmq取消了事務回查機制,若是丟失須要本身手動經過key值回查.

相關文章
相關標籤/搜索