🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
Filtersrv
,負責自定義規則過濾 Consumer
從 Broker
拉取的消息。後端
爲何 Broker
不提供過濾消息的功能呢?咱們來看看官方的說法:微信
- Broker 端消息過濾
在 Broker 中,按照 Consumer 的要求作過濾,優勢是減小了對於 Consumer 無用消息的網絡傳輸。 缺點是增長了 Broker 的負擔,實現相對複雜。
(1). 淘寶 Notify 支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎能夠知足最苛刻的過濾需求。
(2). 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。
(3). CORBA Notification 規範中也支持靈活的語法表達式過濾。- Consumer 端消息過濾
這種過濾方式可由應用徹底自定義實現,可是缺點是不少無用的消息要傳輸到 Consumer 端。
就是在這種考慮下,Filtersrv
出現了。減小了 Broker
的負擔,又減小了 Consumer
接收無用的消息。固然缺點也是有的,多了一層 Filtersrv
網絡開銷。網絡
Filtersrv
只對應一個 Broker
。Broker
能夠對應多個 Filtersrv
。Filtersrv
的高可用經過啓動多個 Filtersrv
實現。Filtersrv
註冊失敗時,主動退出關閉。核心代碼以下:app
1: // ⬇️⬇️⬇️【FiltersrvController.java】
2: public boolean initialize() {
3: // ....(省略代碼)
4:
5: // 固定間隔註冊到Broker
6: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
7:
8: @Override
9: public void run() {
10: FiltersrvController.this.registerFilterServerToBroker();
11: }
12: }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay時間過短,可能致使初始化失敗。從3=》15
13:
14: // ....(省略代碼)
15: }
16:
17: /** 18: * 註冊Filtersrv 到 Broker 19: * !!!若是註冊失敗,關閉Filtersrv 20: */
21: public void registerFilterServerToBroker() {
22: try {
23: RegisterFilterServerResponseHeader responseHeader =
24: this.filterServerOuterAPI.registerFilterServerToBroker(
25: this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
26: this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
27: .setDefaultBrokerId(responseHeader.getBrokerId());
28:
29: if (null == this.brokerName) {
30: this.brokerName = responseHeader.getBrokerName();
31: }
32:
33: log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",
34: this.localAddr(),
35: this.filtersrvConfig.getConnectWhichBroker(),
36: responseHeader.getBrokerName(),
37: responseHeader.getBrokerId());
38: } catch (Exception e) {
39: log.warn("register filter server Exception", e);
40:
41: log.warn("access broker failed, kill oneself");
42: System.exit(-1); // 異常退出
43: }
44: }複製代碼
Consumer
針對每一個 Topic
能夠訂閱不一樣的 過濾類代碼
。1: // ⬇️⬇️⬇️【DefaultMQPushConsumer.java】
2: @Override
3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
4: this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
5: }複製代碼
Consumer
心跳註冊到 Broker
的同時,上傳 過濾類代碼
到 Broker
對應的全部 Filtersrv
。1: // ⬇️⬇️⬇️【MQClientInstance.java】
2: /** 3: * 發送心跳到Broker,上傳過濾類源碼到Filtersrv 4: */
5: public void sendHeartbeatToAllBrokerWithLock() {
6: if (this.lockHeartbeat.tryLock()) {
7: try {
8: this.sendHeartbeatToAllBroker();
9: this.uploadFilterClassSource();
10: } catch (final Exception e) {
11: log.error("sendHeartbeatToAllBroker exception", e);
12: } finally {
13: this.lockHeartbeat.unlock();
14: }
15: } else {
16: log.warn("lock heartBeat, but failed.");
17: }
18: }
19:
20: /** 21: * 上傳過濾類到Filtersrv 22: */
23: private void uploadFilterClassSource() {
24: Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
25: while (it.hasNext()) {
26: Entry<String, MQConsumerInner> next = it.next();
27: MQConsumerInner consumer = next.getValue();
28: if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
29: Set<SubscriptionData> subscriptions = consumer.subscriptions();
30: for (SubscriptionData sub : subscriptions) {
31: if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
32: final String consumerGroup = consumer.groupName();
33: final String className = sub.getSubString();
34: final String topic = sub.getTopic();
35: final String filterClassSource = sub.getFilterClassSource();
36: try {
37: this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
38: } catch (Exception e) {
39: log.error("uploadFilterClassToAllFilterServer Exception", e);
40: }
41: }
42: }
43: }
44: }
45: }複製代碼
Filtersrv
處理 Consumer
上傳的 過濾類代碼
,並進行編譯使用。核心代碼以下:dom
1: // ⬇️⬇️⬇️【FilterClassManager.java】
2: /** 3: * 註冊過濾類 4: * 5: * @param consumerGroup 消費分組 6: * @param topic Topic 7: * @param className 過濾類名 8: * @param classCRC 過濾類源碼CRC 9: * @param filterSourceBinary 過濾類源碼 10: * @return 是否註冊成功 11: */
12: public boolean registerFilterClass(final String consumerGroup, final String topic, 13: final String className, final int classCRC, final byte[] filterSourceBinary) {
14: final String key = buildKey(consumerGroup, topic);
15: // 判斷是否要註冊新的過濾類
16: boolean registerNew = false;
17: FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
18: if (null == filterClassInfoPrev) {
19: registerNew = true;
20: } else {
21: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
22: if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 類有變化
23: registerNew = true;
24: }
25: }
26: }
27: // 註冊新的過濾類
28: if (registerNew) {
29: synchronized (this.compileLock) {
30: filterClassInfoPrev = this.filterClassTable.get(key);
31: if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
32: return true;
33: }
34: try {
35: FilterClassInfo filterClassInfoNew = new FilterClassInfo();
36: filterClassInfoNew.setClassName(className);
37: filterClassInfoNew.setClassCRC(0);
38: filterClassInfoNew.setMessageFilter(null);
39:
40: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
41: String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
42: // 編譯新的過濾類
43: Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
44: // 建立新的過濾類對象
45: Object newInstance = newClass.newInstance();
46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
47: filterClassInfoNew.setClassCRC(classCRC);
48: }
49:
50: this.filterClassTable.put(key, filterClassInfoNew);
51: } catch (Throwable e) {
52: String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
53: consumerGroup, topic, className);
54: log.error(info, e);
55: return false;
56: }
57: }
58: }
59:
60: return true;
61: }複製代碼
Consumer
拉取 使用過濾類方式訂閱 的消費消息時,從 Broker
對應的 Filtersrv
列表隨機選擇一個拉取消息。若是選擇不到 Filtersrv
,則沒法拉取消息。所以,Filtersrv
必定要作高可用。1: // ⬇️⬇️⬇️【PullAPIWrapper.java】
2: /** 3: * 拉取消息核心方法 4: * 5: * @param mq 消息嘟列 6: * @param subExpression 訂閱表達式 7: * @param subVersion 訂閱版本號 8: * @param offset 拉取隊列開始位置 9: * @param maxNums 批量拉 取消息數量 10: * @param sysFlag 拉取系統標識 11: * @param commitOffset 提交消費進度 12: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間 13: * @param timeoutMillis 請求broker超時時間 14: * @param communicationMode 通信模式 15: * @param pullCallback 拉取回調 16: * @return 拉取消息結果。只有通信模式爲同步時,才返回結果,不然返回null。 17: * @throws MQClientException 當尋找不到 broker 時,或發生其餘client異常 18: * @throws RemotingException 當遠程調用發生異常時 19: * @throws MQBrokerException 當 broker 發生異常時。只有通信模式爲同步時纔會發生該異常。 20: * @throws InterruptedException 當發生中斷異常時 21: */
22: protected PullResult pullKernelImpl( 23: final MessageQueue mq, 24: final String subExpression, 25: final long subVersion, 26: final long offset, 27: final int maxNums, 28: final int sysFlag, 29: final long commitOffset, 30: final long brokerSuspendMaxTimeMillis, 31: final long timeoutMillis, 32: final CommunicationMode communicationMode, 33: final PullCallback pullCallback 34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
35: // // ....(省略代碼)
36: // 請求拉取消息
37: if (findBrokerResult != null) {
38: // ....(省略代碼)
39: // 若訂閱topic使用過濾類,使用filtersrv獲取消息
40: String brokerAddr = findBrokerResult.getBrokerAddr();
41: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
43: }
44:
45: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
46: brokerAddr,
47: requestHeader,
48: timeoutMillis,
49: communicationMode,
50: pullCallback);
51:
52: return pullResult;
53: }
54:
55: // Broker信息不存在,則拋出異常
56: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
57: }
58:
59: /** 60: * 計算filtersrv地址。若是有多個filtersrv,隨機選擇一個。 61: * 62: * @param topic Topic 63: * @param brokerAddr broker地址 64: * @return filtersrv地址 65: * @throws MQClientException 當filtersrv不存在時 66: */
67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) 68: throws MQClientException {
69: ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
70: if (topicRouteTable != null) {
71: TopicRouteData topicRouteData = topicRouteTable.get(topic);
72: List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
73: if (list != null && !list.isEmpty()) {
74: return list.get(randomNum() % list.size());
75: }
76: }
77: throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
78: + topic, null);
79: }複製代碼
Filtersrv
拉取消息後,會建議 Consumer
向 Broker主節點
拉取消息。Filtersrv
能夠理解成一個 Consumer
,向 Broker
拉取消息時,實際使用的 DefaultMQPullConsumer.java
的方法和邏輯。1: // ⬇️⬇️⬇️【DefaultRequestProcessor.java】
2: /** 3: * 拉取消息 4: * 5: * @param ctx 拉取消息context 6: * @param request 拉取消息請求 7: * @return 響應 8: * @throws Exception 當發生異常時 9: */
10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
11: final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
12: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
13: final PullMessageRequestHeader requestHeader =
14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
15:
16: final FilterContext filterContext = new FilterContext();
17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
18:
19: response.setOpaque(request.getOpaque());
20:
21: DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
22:
23: // 校驗Topic過濾類是否完整
24: final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
25: if (null == findFilterClass) {
26: response.setCode(ResponseCode.SYSTEM_ERROR);
27: response.setRemark("Find Filter class failed, not registered");
28: return response;
29: }
30: if (null == findFilterClass.getMessageFilter()) {
31: response.setCode(ResponseCode.SYSTEM_ERROR);
32: response.setRemark("Find Filter class failed, registered but no class");
33: return response;
34: }
35:
36: // 設置下次請求從 Broker主節點。
37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
38:
39: MessageQueue mq = new MessageQueue();
40: mq.setTopic(requestHeader.getTopic());
41: mq.setQueueId(requestHeader.getQueueId());
42: mq.setBrokerName(this.filtersrvController.getBrokerName());
43: long offset = requestHeader.getQueueOffset();
44: int maxNums = requestHeader.getMaxMsgNums();
45:
46: final PullCallback pullCallback = new PullCallback() {
47:
48: @Override
49: public void onSuccess(PullResult pullResult) {
50: responseHeader.setMaxOffset(pullResult.getMaxOffset());
51: responseHeader.setMinOffset(pullResult.getMinOffset());
52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
53: response.setRemark(null);
54:
55: switch (pullResult.getPullStatus()) {
56: case FOUND:
57: response.setCode(ResponseCode.SUCCESS);
58:
59: List<MessageExt> msgListOK = new ArrayList<MessageExt>();
60: try {
61: for (MessageExt msg : pullResult.getMsgFoundList()) {
62: // 使用過濾類過濾消息
63: boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
64: if (match) {
65: msgListOK.add(msg);
66: }
67: }
68:
69: if (!msgListOK.isEmpty()) {
70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
71: return;
72: } else {
73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
74: }
75: } catch (Throwable e) {
76: final String error =
77: String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
78: requestHeader.getConsumerGroup(), requestHeader.getTopic());
79: log.error(error, e);
80:
81: response.setCode(ResponseCode.SYSTEM_ERROR);
82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
84: return;
85: }
86:
87: break;
88: case NO_MATCHED_MSG:
89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
90: break;
91: case NO_NEW_MSG:
92: response.setCode(ResponseCode.PULL_NOT_FOUND);
93: break;
94: case OFFSET_ILLEGAL:
95: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
96: break;
97: default:
98: break;
99: }
100:
101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
102: }
103:
104: @Override
105: public void onException(Throwable e) {
106: response.setCode(ResponseCode.SYSTEM_ERROR);
107: response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
109: return;
110: }
111: };
112:
113: // 拉取消息
114: pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
115: return null;
116: }複製代碼