RocketMQ入門(消費者)_3 RocketMQ入門(生產者)_2

消費者角色:html

1. 推式(通常建議用推式)ide

2. 拉式post

消費模式:fetch

1. 集羣(cluster)                --均衡負載消費ui

2. 廣播(broadcasting) --發佈和訂閱者模式url

MQ的消費不會清除broker中的數據,broker數據一直存在隊列中,隊列offset會一直遞增,所以能夠經過回查來獲取到丟失數據。這個時候咱們能夠採用pull形式較好。spa

push形式,MQ會記錄訪問的偏移量,即便宕機下次重啓也會按照順序繼續消費,不會出現重複消費。線程

RocketMQ入門(生產者)_2中已經寫過一個推式的代碼,接下來就看下拉式。code

/**
 * 普通拉式消費者,代碼編寫
 * @author DennyZhao
 *
 */
public class PullConsumer {
    
    /**
     * 暫時以map做爲offset入庫看待。<queueId, offset>
     */
    private static Map<String, Long> offsetMap = new HashMap<String, Long>();

    public static void main(String[] args) throws UnsupportedEncodingException {
        //建立拉式消費者
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("pullConsumerGroup");
        pullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
        try {
            pullConsumer.start();
            Set<MessageQueue> mqSet= pullConsumer.fetchSubscribeMessageQueues("fruit");
            while(true) {
            //循環隊列
            for(MessageQueue mq: mqSet) {
                // 從隊列中獲取固定偏移值
                PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, "*", getOffset(mq), 32);
                setOffset(mq, pullResult.getNextBeginOffset());
                switch(pullResult.getPullStatus()) {
                case FOUND:
                    List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
                    for(MessageExt msg : msgFoundList) {
                        String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println(fruit + "   -----fruit");
                    }
                    break;
                case NO_NEW_MSG:
                    break;
                case NO_MATCHED_MSG:
                    break;
                }
            }
            Thread.sleep(2000);
            }
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (RemotingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (MQBrokerException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * set offset
     * @param mq
     * @param nextBeginOffset
     */
    private static void setOffset(MessageQueue mq, long nextBeginOffset) {
        String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
        offsetMap.put(queueId, nextBeginOffset);
    }

    /**
     * 獲取固定偏移值
     * @param mq queueId
     * @return int
     */
    private static long getOffset(MessageQueue mq) {
        String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
        Long offset =  offsetMap.get(queueId);
        if(offset == null) {
            offset = 0l;
        }
        System.out.println(offset + "---------------");
        return offset;
    }

}

 

 使用Schedule拉式:htm

/**
 * ScheduleService 進行數據拉取
 * @author DennyZhao
 *
 */
public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {
        MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("scheduleConsumers");
        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        scheduleService.setPullThreadNums(4);
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pullConsumer");
        defaultMQPullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
        scheduleService.setDefaultMQPullConsumer(defaultMQPullConsumer);
        scheduleService.registerPullTaskCallback("fruit", new PullTaskCallback() {
            /**
             * 數據處理
             */
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer pullConsumer = context.getPullConsumer();
                try {
                    long offset = pullConsumer.fetchConsumeOffset(mq, false);
                    PullResult pull = pullConsumer.pull(mq, "*", offset, 32);
                    switch(pull.getPullStatus()) {
                    case FOUND:
                        // 結果輸出
                        List<MessageExt> msgFoundList = pull.getMsgFoundList();
                        for(MessageExt msg : msgFoundList) {
                            String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            System.out.println("result:   " + fruit);
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    default:
                        
                    }
                    // 獲取下一個循環的offset
                    pullConsumer.updateConsumeOffset(mq, pull.getNextBeginOffset());
                    // 設置下次訪問時間
                    context.setPullNextDelayTimeMillis(1000);
                } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        });
        scheduleService.start();
    }

}

 參數說明:

//push主要參數
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumerGroup");
// 從何地開始,默認(CONSUME_FROM_LAST_OFFSET) 
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
pushConsumer.setConsumeThreadMin(2); //最小線程數
pushConsumer.setConsumeThreadMax(8); //最大線程數
pushConsumer.setConsumeTimeout(5000); //鏈接超時
pushConsumer.setMessageModel(MessageModel.CLUSTERING);//消息模式(集羣CLUSTERING和廣播BROADCASTING,default:cluster)
pushConsumer.setConsumeConcurrentlyMaxSpan(1000);//單隊列最大消費數1000
pushConsumer.setConsumeMessageBatchMaxSize(1); //批量消費數1
pushConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");//集羣IP
pushConsumer.setHeartbeatBrokerInterval(2000); //心跳監測
pushConsumer.setMaxReconsumeTimes(3);//重複消費次數,用於失敗後重試
pushConsumer.queryMessage(topic, key, maxNum, begin, end); //獲取消息
pushConsumer.fetchSubscribeMessageQueues(topic);//訂閱topic
pushConsumer.registerMessageListener(new MessageListenerConcurrently());//及時普通消費型
pushConsumer.registerMessageListener(new MessageListenerOrderly()); //嚴格順序消費型;
        // pull經常使用參數
//消息模式(集羣CLUSTERING和廣播BROADCASTING,default:cluster) pullConsumer.setMessageModel(MessageModel.CLUSTERING); pullConsumer.fetchSubscribeMessageQueues(topic); //訂閱主題 pullConsumer.fetchConsumeOffset(mq, false); //獲取queue當前offset位置 pullConsumer.pullBlockIfNotFound(mq, subExpression, offset, maxNums);//獲取消費內容 pullConsumer.updateConsumeOffset(mq, offset); //更新消費位置 pullConsumer.setConsumerPullTimeoutMillis(5000); //鏈接超時
相關文章
相關標籤/搜索