RocketMQ源碼解析:Filtersrv

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右

1. 概述

Filtersrv ,負責自定義規則過濾 ConsumerBroker 拉取的消息。後端

Filtersrv.png
Filtersrv.png

爲何 Broker 不提供過濾消息的功能呢?咱們來看看官方的說法:微信

  • Broker 端消息過濾
    在 Broker 中,按照 Consumer 的要求作過濾,優勢是減小了對於 Consumer 無用消息的網絡傳輸。 缺點是增長了 Broker 的負擔,實現相對複雜。
    (1). 淘寶 Notify 支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎能夠知足最苛刻的過濾需求。
    (2). 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。
    (3). CORBA Notification 規範中也支持靈活的語法表達式過濾。
  • Consumer 端消息過濾
    這種過濾方式可由應用徹底自定義實現,可是缺點是不少無用的消息要傳輸到 Consumer 端。

就是在這種考慮下,Filtersrv 出現了。減小了 Broker 的負擔,又減小了 Consumer 接收無用的消息。固然缺點也是有的,多了一層 Filtersrv 網絡開銷。網絡

2. Filtersrv 註冊到 Broker

  • 🦅 一個 Filtersrv 對應一個 Broker
  • 🦅 一個 Broker 能夠對應多個 FiltersrvFiltersrv 的高可用經過啓動多個 Filtersrv 實現
  • 🦅 Filtersrv 註冊失敗時,主動退出關閉

核心代碼以下:app

1: // ⬇️⬇️⬇️【FiltersrvController.java】
  2: public boolean initialize() {
  3:     // ....(省略代碼)
  4: 
  5:     // 固定間隔註冊到Broker
  6:     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  7: 
  8:         @Override
  9:         public void run() {
 10:             FiltersrvController.this.registerFilterServerToBroker();
 11:         }
 12:     }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay時間過短,可能致使初始化失敗。從3=》15
 13: 
 14:     // ....(省略代碼)
 15: }
 16: 
 17: /** 18: * 註冊Filtersrv 到 Broker 19: * !!!若是註冊失敗,關閉Filtersrv 20: */
 21: public void registerFilterServerToBroker() {
 22:     try {
 23:         RegisterFilterServerResponseHeader responseHeader =
 24:             this.filterServerOuterAPI.registerFilterServerToBroker(
 25:                 this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
 26:         this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
 27:             .setDefaultBrokerId(responseHeader.getBrokerId());
 28: 
 29:         if (null == this.brokerName) {
 30:             this.brokerName = responseHeader.getBrokerName();
 31:         }
 32: 
 33:         log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
 34:             this.localAddr(),
 35:             this.filtersrvConfig.getConnectWhichBroker(),
 36:             responseHeader.getBrokerName(),
 37:             responseHeader.getBrokerId());
 38:     } catch (Exception e) {
 39:         log.warn("register filter server Exception", e);
 40: 
 41:         log.warn("access broker failed, kill oneself");
 42:         System.exit(-1); // 異常退出
 43:     }
 44: }複製代碼

3. 過濾類

Filtersrv過濾類
Filtersrv過濾類

3.1 Consumer 訂閱時設置 過濾類代碼

  • 🦅 Consumer 針對每一個 Topic 能夠訂閱不一樣的 過濾類代碼
1: // ⬇️⬇️⬇️【DefaultMQPushConsumer.java】
  2: @Override
  3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
  4:     this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
  5: }複製代碼

3.2 Consumer 上傳 過濾類代碼

  • 🦅 Consumer 心跳註冊到 Broker 的同時,上傳 過濾類代碼Broker 對應的全部 Filtersrv
1: // ⬇️⬇️⬇️【MQClientInstance.java】
  2: /** 3: * 發送心跳到Broker,上傳過濾類源碼到Filtersrv 4: */
  5: public void sendHeartbeatToAllBrokerWithLock() {
  6:     if (this.lockHeartbeat.tryLock()) {
  7:         try {
  8:             this.sendHeartbeatToAllBroker();
  9:             this.uploadFilterClassSource();
 10:         } catch (final Exception e) {
 11:             log.error("sendHeartbeatToAllBroker exception", e);
 12:         } finally {
 13:             this.lockHeartbeat.unlock();
 14:         }
 15:     } else {
 16:         log.warn("lock heartBeat, but failed.");
 17:     }
 18: }
 19: 
 20: /** 21: * 上傳過濾類到Filtersrv 22: */
 23: private void uploadFilterClassSource() {
 24:     Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
 25:     while (it.hasNext()) {
 26:         Entry<String, MQConsumerInner> next = it.next();
 27:         MQConsumerInner consumer = next.getValue();
 28:         if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
 29:             Set<SubscriptionData> subscriptions = consumer.subscriptions();
 30:             for (SubscriptionData sub : subscriptions) {
 31:                 if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
 32:                     final String consumerGroup = consumer.groupName();
 33:                     final String className = sub.getSubString();
 34:                     final String topic = sub.getTopic();
 35:                     final String filterClassSource = sub.getFilterClassSource();
 36:                     try {
 37:                         this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
 38:                     } catch (Exception e) {
 39:                         log.error("uploadFilterClassToAllFilterServer Exception", e);
 40:                     }
 41:                 }
 42:             }
 43:         }
 44:     }
 45: }複製代碼

3.3 Filter 編譯 過濾類代碼

  • 🦅 Filtersrv 處理 Consumer 上傳的 過濾類代碼,並進行編譯使用。

核心代碼以下:dom

1: // ⬇️⬇️⬇️【FilterClassManager.java】
  2: /** 3: * 註冊過濾類 4: * 5: * @param consumerGroup 消費分組 6: * @param topic Topic 7: * @param className 過濾類名 8: * @param classCRC 過濾類源碼CRC 9: * @param filterSourceBinary 過濾類源碼 10: * @return 是否註冊成功 11: */
 12: public boolean registerFilterClass(final String consumerGroup, final String topic, 13: final String className, final int classCRC, final byte[] filterSourceBinary) {
 14:     final String key = buildKey(consumerGroup, topic);
 15:     // 判斷是否要註冊新的過濾類
 16:     boolean registerNew = false;
 17:     FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
 18:     if (null == filterClassInfoPrev) {
 19:         registerNew = true;
 20:     } else {
 21:         if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
 22:             if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 類有變化
 23:                 registerNew = true;
 24:             }
 25:         }
 26:     }
 27:     // 註冊新的過濾類
 28:     if (registerNew) {
 29:         synchronized (this.compileLock) {
 30:             filterClassInfoPrev = this.filterClassTable.get(key);
 31:             if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
 32:                 return true;
 33:             }
 34:             try {
 35:                 FilterClassInfo filterClassInfoNew = new FilterClassInfo();
 36:                 filterClassInfoNew.setClassName(className);
 37:                 filterClassInfoNew.setClassCRC(0);
 38:                 filterClassInfoNew.setMessageFilter(null);
 39: 
 40:                 if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
 41:                     String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
 42:                     // 編譯新的過濾類
 43:                     Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
 44:                     // 建立新的過濾類對象
 45:                     Object newInstance = newClass.newInstance();
 46:                     filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
 47:                     filterClassInfoNew.setClassCRC(classCRC);
 48:                 }
 49: 
 50:                 this.filterClassTable.put(key, filterClassInfoNew);
 51:             } catch (Throwable e) {
 52:                 String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
 53:                             consumerGroup, topic, className);
 54:                 log.error(info, e);
 55:                 return false;
 56:             }
 57:         }
 58:     }
 59: 
 60:     return true;
 61: }複製代碼

4. 過濾消息

Filtersrv.png
Filtersrv.png

4.1 Consumer 從 Filtersrv 拉取消息

  • 🦅 Consumer 拉取 使用過濾類方式訂閱 的消費消息時,從 Broker 對應的 Filtersrv 列表隨機選擇一個拉取消息。若是選擇不到 Filtersrv,則沒法拉取消息。所以,Filtersrv 必定要作高可用
1: // ⬇️⬇️⬇️【PullAPIWrapper.java】
  2: /** 3: * 拉取消息核心方法 4: * 5: * @param mq 消息嘟列 6: * @param subExpression 訂閱表達式 7: * @param subVersion 訂閱版本號 8: * @param offset 拉取隊列開始位置 9: * @param maxNums 批量拉 取消息數量 10: * @param sysFlag 拉取系統標識 11: * @param commitOffset 提交消費進度 12: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間 13: * @param timeoutMillis 請求broker超時時間 14: * @param communicationMode 通信模式 15: * @param pullCallback 拉取回調 16: * @return 拉取消息結果。只有通信模式爲同步時,才返回結果,不然返回null。 17: * @throws MQClientException 當尋找不到 broker 時,或發生其餘client異常 18: * @throws RemotingException 當遠程調用發生異常時 19: * @throws MQBrokerException 當 broker 發生異常時。只有通信模式爲同步時纔會發生該異常。 20: * @throws InterruptedException 當發生中斷異常時 21: */
 22: protected PullResult pullKernelImpl( 23: final MessageQueue mq, 24: final String subExpression, 25: final long subVersion, 26: final long offset, 27: final int maxNums, 28: final int sysFlag, 29: final long commitOffset, 30: final long brokerSuspendMaxTimeMillis, 31: final long timeoutMillis, 32: final CommunicationMode communicationMode, 33: final PullCallback pullCallback 34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 35:     // // ....(省略代碼)
 36:     // 請求拉取消息
 37:     if (findBrokerResult != null) {
 38:         // ....(省略代碼)
 39:         // 若訂閱topic使用過濾類,使用filtersrv獲取消息
 40:         String brokerAddr = findBrokerResult.getBrokerAddr();
 41:         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
 42:             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
 43:         }
 44: 
 45:         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
 46:             brokerAddr,
 47:             requestHeader,
 48:             timeoutMillis,
 49:             communicationMode,
 50:             pullCallback);
 51: 
 52:         return pullResult;
 53:     }
 54: 
 55:     // Broker信息不存在,則拋出異常
 56:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
 57: }
 58: 
 59: /** 60: * 計算filtersrv地址。若是有多個filtersrv,隨機選擇一個。 61: * 62: * @param topic Topic 63: * @param brokerAddr broker地址 64: * @return filtersrv地址 65: * @throws MQClientException 當filtersrv不存在時 66: */
 67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) 68: throws MQClientException {
 69:     ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
 70:     if (topicRouteTable != null) {
 71:         TopicRouteData topicRouteData = topicRouteTable.get(topic);
 72:         List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
 73:         if (list != null && !list.isEmpty()) {
 74:             return list.get(randomNum() % list.size());
 75:         }
 76:     }
 77:     throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
 78:         + topic, null);
 79: }複製代碼

4.2 Filtersrv 從 Broker 拉取消息

  • 🦅 Filtersrv 拉取消息後,會建議 ConsumerBroker主節點 拉取消息。
  • 🦅 Filtersrv 能夠理解成一個 Consumer,向 Broker 拉取消息時,實際使用的 DefaultMQPullConsumer.java 的方法和邏輯。
1: // ⬇️⬇️⬇️【DefaultRequestProcessor.java】
  2: /** 3: * 拉取消息 4: * 5: * @param ctx 拉取消息context 6: * @param request 拉取消息請求 7: * @return 響應 8: * @throws Exception 當發生異常時 9: */
 10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
 11:     final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
 12:     final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
 13:     final PullMessageRequestHeader requestHeader =
 14:         (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
 15: 
 16:     final FilterContext filterContext = new FilterContext();
 17:     filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
 18: 
 19:     response.setOpaque(request.getOpaque());
 20: 
 21:     DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
 22: 
 23:     // 校驗Topic過濾類是否完整
 24:     final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
 25:     if (null == findFilterClass) {
 26:         response.setCode(ResponseCode.SYSTEM_ERROR);
 27:         response.setRemark("Find Filter class failed, not registered");
 28:         return response;
 29:     }
 30:     if (null == findFilterClass.getMessageFilter()) {
 31:         response.setCode(ResponseCode.SYSTEM_ERROR);
 32:         response.setRemark("Find Filter class failed, registered but no class");
 33:         return response;
 34:     }
 35: 
 36:     // 設置下次請求從 Broker主節點。
 37:     responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
 38: 
 39:     MessageQueue mq = new MessageQueue();
 40:     mq.setTopic(requestHeader.getTopic());
 41:     mq.setQueueId(requestHeader.getQueueId());
 42:     mq.setBrokerName(this.filtersrvController.getBrokerName());
 43:     long offset = requestHeader.getQueueOffset();
 44:     int maxNums = requestHeader.getMaxMsgNums();
 45: 
 46:     final PullCallback pullCallback = new PullCallback() {
 47: 
 48:         @Override
 49:         public void onSuccess(PullResult pullResult) {
 50:             responseHeader.setMaxOffset(pullResult.getMaxOffset());
 51:             responseHeader.setMinOffset(pullResult.getMinOffset());
 52:             responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
 53:             response.setRemark(null);
 54: 
 55:             switch (pullResult.getPullStatus()) {
 56:                 case FOUND:
 57:                     response.setCode(ResponseCode.SUCCESS);
 58: 
 59:                     List<MessageExt> msgListOK = new ArrayList<MessageExt>();
 60:                     try {
 61:                         for (MessageExt msg : pullResult.getMsgFoundList()) {
 62:                             // 使用過濾類過濾消息
 63:                             boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
 64:                             if (match) {
 65:                                 msgListOK.add(msg);
 66:                             }
 67:                         }
 68: 
 69:                         if (!msgListOK.isEmpty()) {
 70:                             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
 71:                             return;
 72:                         } else {
 73:                             response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
 74:                         }
 75:                     } catch (Throwable e) {
 76:                         final String error =
 77:                             String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
 78:                                 requestHeader.getConsumerGroup(), requestHeader.getTopic());
 79:                         log.error(error, e);
 80: 
 81:                         response.setCode(ResponseCode.SYSTEM_ERROR);
 82:                         response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
 83:                         returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
 84:                         return;
 85:                     }
 86: 
 87:                     break;
 88:                 case NO_MATCHED_MSG:
 89:                     response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
 90:                     break;
 91:                 case NO_NEW_MSG:
 92:                     response.setCode(ResponseCode.PULL_NOT_FOUND);
 93:                     break;
 94:                 case OFFSET_ILLEGAL:
 95:                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
 96:                     break;
 97:                 default:
 98:                     break;
 99:             }
100: 
101:             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
102:         }
103: 
104:         @Override
105:         public void onException(Throwable e) {
106:             response.setCode(ResponseCode.SYSTEM_ERROR);
107:             response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
108:             returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
109:             return;
110:         }
111:     };
112: 
113:     // 拉取消息
114:     pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
115:     return null;
116: }複製代碼

5. Filtersrv 高可用

Filtersrv過可用
Filtersrv過可用
相關文章
相關標籤/搜索