構建基於SpringCloudStream的消息驅動微服務,用於處理第三方開發者接受微信大量推送消息的解決方案

事情的原由源於在使用微信公衆號服務的時候,做爲一個第三方的服務商,騰訊會將各類業務消息推送到第三方開發者的服務器上,而以前的方案是消息直接進到服務上,當使用到一些業務,好比發券等操做時,騰訊服務器會向開發者發送大量的消息,因爲消息服務的處理能力有限,尤爲是高峯的時候,消息請求會直接壓到服務上,致使服務線程繁忙,這時候會報大量服務超時,觸發微信的服務報警,服務不可用,或者服務超時,這時公衆號內的消息服務將沒法繼續爲用戶提供服務。鑑於此問題,咱們從新梳理並構建了基於Spring Cloud Stream的消息驅動的微服務spring

咱們採用 Spring Cloud Finchley.SR2,Spring Boot 2.0.6.RELEASE 版原本開發,系統的初步設計思路,是利用json

消息隊列rabbitmq來解耦服務,來減緩消息直接到服務上的壓力,咱們沒有直接對接mq來使用,而是採用了Spring Cloud Stream, 簡單的來講,Spring Cloud Stream是構建消息驅動的微服務應用程序的框架,將消息整合的處理操做進行了進一步的抽象操做, 實現了更加簡化的消息處理, 可使用不一樣的代理中間件,它抽象了事件驅動的一些概念,對於消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件,切換topic。使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。緩存

Spring Cloud Stream的一些基本概念能夠自行搜索,這裏不作過多描述,下面只是講述一下具體方案和配置方法及遇到的問題。服務器

基本結構是一個很是簡單的消息訂閱發佈模式微信

 

 

message-center 做爲接受微信消息處理中心,爲消息生產者,多線程

message-for-all 做爲消息隊列消息處理服務,爲消息消費者,併發

 

message-center 從微信的服務器接受到消息後,採用異步多線程的方式,處理部分業務邏輯,好比多客服,好比第三方的全網發佈檢測等,須要在5秒內返回給微信服務器消息的一些及時性消息,同時根據消息類型講消息分類,併發送給消息隊列中間件,消息生產者message-center經過SpringCloudStream來做爲和消息隊列中間件的粘合劑,將消息傳遞給消息隊列中間件,這裏能夠隨意切換消息中間件而不用考慮代碼的變動,咱們這裏默認採用的rabbitmq做爲消息隊列中間件服務,app

具體配置方法,新建一個接口,這個接口中能夠定義多個管道用來發送消息,能夠實現向不一樣的exchange發送消息框架

public interface SendOutputChannel {異步

    // 這裏能夠定義不一樣的通道
    String MSG_SENDER = "msgSender"; // 通道名
    @Output(SendOutputChannel.MSG_SENDER)
    MessageChannel msgSender();

}

啓動的類中要加入@EnableBinding

@SpringBootApplication
@EnableBinding(SendOutputChannel.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.yml文件中配置 

#---- mq的基本配置信息

spring:
  rabbitmq:
    host: 
    port: 
    username: 
    password: 
    virtual-host: 

  cloud:
    stream:
      bindings:
        msgSender: #生產者綁定,這個是消息通道的名稱
          destination: message-exchange    # 這裏對應的是rabbitmq中的exchange
          content-type: application/json #定義消息類型

      rabbit:
        bindings:
          msgSender:
            producer:
              delivery-mode: non-persistent   #消息不持久化  

發送消息的類中要注入發送消息的接口,當接到微信發送的消息後,通過業務邏輯處理後,在須要向mq發消息的地方,調用發送消息的方法,經過Spring Cloud Stream來實現消息發送。

    @Autowired
    private SendOutputChannel sendOutputChannel;

   public void sendMessage(Message<?> message) {
        if (!sendOutputChannel.msgSender().send(message, TimeUnit.SECONDS.toMillis(4))) {
            log.error("生產者消息發送失敗:" + message.toString());
        }
    }

這就完成了消息發送者的基本開發

這樣服務啓動之後,rabbitmq的exchange中將會出現一個  message-exchange的 交換機

 

消息接受者 message-for-all

一樣須要定義一個消息接收的管道接口,這個接口中能夠定義多個管道用來接受消息,能夠接受對應不一樣的exchange接受到的消息

public interface ReceiveInputChannel {

    // 這裏能夠定義不一樣的通道
    String MSG_RECEIVER = "msgReceiver"; // 通道名
        
    @Input(ReceiveInputChannel.MSG_RECEIVER)
    SubscribableChannel msgReceiver();
    

}

application.yml中配置

#---- mq的基本配置信息

spring:
  rabbitmq:
    host: 
    port: 
    username: 
    password: 
    virtual-host: 
  cloud:
    stream:
      bindings:
        msgReceiver: #消費者綁定 這個是接受消息通道的名稱
          group: for-all  #持久化, 也就是指定隊列名稱,等同於rabbitmq中的 queue, 同一個服務不一樣的實例,使用相同的隊列名稱,處理消息
          destination: message-exchange #和生產者的消息交換機要相同
          content-type: application/json
          consumer:
            max-attempts: 3 # The number of attempts to process the message (including the first) in the event of processing failures,Default: 3
            concurrency: 1 # The concurrency setting of the consumer. Default: 1.
      rabbit:
        bindings:
          msgReceiver:
            consumer:
              max-concurrency: 10 # maxumum concurrency of this consumer (threads)
              prefetch: 50  # number of prefetched messages pre consumer thread
              requeue-rejected: false # true to requeue rejected messages, false to discard (or route to DLQ)
              republish-to-dlq: true # republish failures to the DLQ with diagnostic headers
              # durable-subscription: false  #隊列是否要持久化

其餘一些配置都是消息每次有幾個線程處理,每一個線程處理多少數量的消息,失敗後從新嘗試處理幾回等,一些配置方案,

max-concurrency:是併發消費者數量,能夠併發處理消息。

若是對隊列的消費順序要求特備苛刻,不但願併發消費,則max-concurrency須要設置爲1,exclusive: true #惟一性 僅建立者可使用的私有隊列,斷開後自動刪除, 固然必須是在autoDelete和exclusive都爲false的時候。隊列是能夠被持久化, Exclusive參數,默認爲false, 若是設置爲true,concurrency就必須設置爲1,即只能單個消費者消費隊列裏的消息,適用於必須嚴格執行消息隊列的消費順序(先進先出)

 

啓動類裏要使用@EnableBinding綁定消息接收管道

@SpringBootApplication
@EnableBinding(ReceiveInputChannel.class)
public class Application {

@StreamListener(ReceiveInputChannel.MSG_RECEIVER)
    public void handle(Message<String> message) throws Exception {
       

// 在這裏再去整合消息處理的業務邏輯

 }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

這樣服務啓動之後,將會出現message-exchange.for-all的一個消息隊列,根據消費者服務啓動數量的不一樣,也將會出現對應的消費者

至此整個消息處理的基本結構就描述完成了,固然實際的開發過程當中,還要考慮消息的異步處理,多線程去處理等,這裏就不詳盡描述了,須要根據本身的業務須要來實現相應的開發,

 

 

有幾點注意的狀況:

@StreamListener(ReceiveInputChannel.MSG_RECEIVER) 這個方法只能放到Application 服務啓動的類中,放到別的地方會報錯:Disp org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,可能和類的加載順序有關係。這樣在啓動類中接受消息,而後能夠再經過業務拆分,將消息轉到其餘的類中實現各自業務邏輯開發。

        <!-- 添加Spring Cloud Stream與RabbitMQ消息中間件的依賴。 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!-- 添加Spring Cloud Stream與Kafaka消息中間件的依賴。 -->
        <!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> 
            </dependency> -->

根據不一樣的消息中間件,選擇不一樣的依賴。

 

當接收消息過多的時候,能夠增長消息生產者實例來加大消息的接受能力,當消費者處理大量阻塞消息時,處理能力降低,能夠經過增長負載的消費者服務實例數量來加大消費能力,這個須要經過實際狀況找到平衡點,消息隊列做爲緩存,下降了因爲消息直接壓到服務器上而致使的服務崩潰問題的風險。

相關文章
相關標籤/搜索