中間件notify

 

基本概念

Topic和MessageType

兩者都是用來區別不一樣業務發送方發送的消息:java

  • Topic:一級消息類型(又名消息主題)。如TRADE,ic-common等不一樣的消息主題;
  • MessageType:二級消息類型,區別同一Topic主題下的不一樣類型的消息; 如:TRADE(交易主題)下有2300-trade-created-done(交易建立),2300-trade-success(交易成功)等不一樣類型的消息

GroupId

又稱組名, notify server根據groupId來識別客戶端機器, 配置爲同一groupId視爲同一集羣的機器.git

所以, 各個業務之間,發送方和訂閱方之間的GroupId不能重複.spring

MessageType和GroupId命名長度最大爲64個字符, 且不能包含空格.app

消息ID

消息ID是消息惟一標識.maven

發送方和訂閱最好都在日誌中記錄消息ID,便於排查問題,獲取消息ID方法ide

UniqId.getInstance().bytes2string(stringMessage.getMessageId()) 若是能夠建議同時記錄下TOPIC、MessageTypegitlab

訂閱關係

訂閱方的GroupId和Topic的映射關係.測試

一個GroupId能夠對應多個Topic.ui

客戶端經過spring配置文件配置訂閱方須要訂閱的消息,在應用啓動時,訂閱關係會推送到Notify Server.this

Notify Server端將這份訂閱關係保存,並根據保存在Server上的訂閱關係將不斷到達消息投給訂閱方.

接入申請

申請須知

  • 平常和線上分開申請。預發、線上環境會同時配置到位。

申請地址

平常申請地址 (新平臺新規範)

線上申請地址 (線上配置自動同步到預發環境)

預發問題排查地址 (僅供問題排查,接入申請直接在線上完成,會自動同步到預發)

項目接入示例

maven依賴

maven依賴版本詳見 (推薦使用Pandora容器2.x + notify-tr-client)

demo工程

  • com.taobao.notify.example.publish.SimplePublisher 發送消息示例
  • com.taobao.notify.example.subscribe.SimpleSubscriber 訂閱消息示例
  • com.taobao.notify.example.subscribe.HeaderSubscriber 訂閱消息(消息屬性過濾表達式訂閱)示例
  • com.taobao.notify.example.subscribe.unit.SimpleUnitSubscriber 單元化訂閱示例

http://gitlab.alibaba-inc.com/middleware/notify-example

注意

若是是使用HSF1.x老版本,注意要使用com.taobao.hsf.notify.client.NotifyManagerBean

發消息

spring配置片斷

<beans>
    <bean id="notifyPublishManager" class="com.taobao.notify.remotingclient.NotifyManagerBean" init-method="init">
        <property name="publishTopics"> <!-- 配置所需發佈的topics -->
            <list>
                <value>TOPIC-01</value>
                <value>TOPIC-02</value>
            </list>
        </property>
        <property name="groupId" value="${GroupID}"/> <!-- 發送方的GroupId -->
    </bean>
</beans>

java代碼片斷

private NotifyManagerBean notifyPublishManager;  

public void sendMessage() {   
    StringMessage stringMessage = new StringMessage();

    //必填屬性
    stringMessage.setBody("測試發送消息");
    stringMessage.setTopic(topic);
    stringMessage.setMessageType(messageType);

    //可選屬性
    stringMessage.setStringProperty("customHeader", "customValue"); // 設置用戶自定義的屬性值
    stringMessage.setStringProperty("bizOrderId", "業務主健、方便經過消息軌跡查詢");//用long型數字,不支持字符串 
    SendResult result = notifyManager.sendMessage(stringMessage);

    if (result.isSuccess()) {
        // 發送成功後處理
        System.out.println("消息發送成功");
    }else {
        log.warn("消息發送失敗,緣由:" + result.getErrorMessage()+"messageId:" + UniqId.getInstance().bytes2string(stringMessage.getMessageId()));
    }
}

public NotifyManagerBean getNotifyPublishManager() {
    return notifyPublishManager;
}
public void setNotifyPublishManager(NotifyManagerBean notifyPublishManager) {
    this.notifyPublishManager = notifyPublishManager;
}

收消息

保證消息不丟 就必然可能出現重投的狀況,須要訂閱者在消息監聽器裏作好 冪等 ,保證消息收到一次和收到屢次的結果是同樣的,須要對消息id去重 spring代碼片斷

<beans>
    <bean id="notifySubscribeManager" class="com.taobao.notify.remotingclient.NotifyManagerBean" init-method="init">
        <property name="subscribeMessages">
            <map>
                <entry key="Topic-01">  <!-- topic -->
                    <map>
                        <entry key="messageType-01">  <!-- messageType-->
                            <bean class="com.taobao.notify.config.SubscriptMsgDetailInfo" />
                        </entry>
                    </map>
                </entry>
            </map>
        </property>
        <property name="groupId" value="${GroupID}"/> <!-- 訂閱方的GroupId -->
        <property name="messageListener" ref="messageListener" />   <!-- 消息監聽處理器 -->
    </bean>
    <bean id="messageListener" class="com.taobao.notify.comexample.SimpleMessageListener" />
</beans>

訂閱方消息監聽處理器 SimpleMessageListener.java

import com.taobao.notify.remotingclient.MessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.message.BytesMessage;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.StringMessage;

public class SimpleMessageListener implements MessageListener{

    public void receiveMessage(Message message, MessageStatus status){
        StringBuilder generalInfo = new StringBuilder();
        generalInfo.append("收到消息");
        generalInfo.append("messageId is:").append(UniqId.getInstance().bytes2string(message.getMessageId()));
        generalInfo.append("topic is:").append(message.getTopic());
        generalInfo.append("messageType is:").append(message.getMessageType());
        generalInfo.append("用戶自定義屬性:").append(message.getStringProperty("customHeader"));

        /**
         * 只讀屬性
         */
        generalInfo.append("消息在客戶端產生時間:").append(message.getBornTime());
        generalInfo.append("消息到達notify server時間:").append(message.getGMTCreate());
        generalInfo.append("消息最後一次投遞時間:").append(message.getGMTLastDelivery());

        //消息 body 處理
        if (message instanceof StringMessage) {
            generalInfo.append("String消息body內容:");
            StringMessage stringMessage = (StringMessage) message;
            generalInfo.append(stringMessage.getBody());
        }
        else if (message instanceof BytesMessage) {
            generalInfo.append("Byte消息body內容:");
            BytesMessage bytesMessage = (BytesMessage) message;
            generalInfo.append(new String(bytesMessage.getBody()));
        }

        System.out.println(generalInfo.toString());

        if(業務處理失敗,須要重投){
            status.setRollbackOnly();
            status.setReason("失敗緣由");
        }
    }
}

關於發送消息超時及重試次數

推薦不修改

1.默認是重試三次

在發送配置裏能夠配置重試次數

<property name="maxRetry"   value="n"></property>

n爲重試次數

2.超時時間是3秒

能夠在發送消息手動設置超時時間:message.setClientPostTimeout(MILLISECONDS);

相關文章
相關標籤/搜索