RocketMq使用過程的那些小事

在使用rocketmq以前使用了rabbitmq,會出現丟消息的狀況,進而果斷放棄,繼續投入到大Java的懷抱,不過也遇到了一些問題,這裏總結一下:java

  1. 使用過程當中新加節點須要手動建立topic多線程

  2. 消費者處理不合理,不能實現負載均衡負載均衡

針對第二點:我以前一直使用的是pull方式,按順序來消費,一旦程序重啓則從頭一個一個消費,顯然這種效率很低,
並且由於一個代碼問題,若是我在offsize = 0的狀況獲取不了數據,則min offsize不增加,這種狀況致使若是數據隔天了則不能消費,這時候消費者至關於在空跑。fetch

List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

使用這種方法能夠穩定的pull出數據,可是這種狀況效率很低。url

使用多線程方式:線程

private Queue<List<MessageVo>> messageQueue = new LinkedBlockingQueue<List<MessageVo>>();
            
           DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(groupName);
            consumer.setNamesrvAddr(url);
            scheduleService = new MQPullConsumerScheduleService(groupName);
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
            scheduleService.setDefaultMQPullConsumer(consumer);
            
          List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
          List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

經過回調的方式來定時調用。這種方式是多線程來實現的。code

使用這種方式不能部署兩個程序,由於會致使groupName衝突 rabbitmq

相關文章
相關標籤/搜索