事情的原由源於在使用微信公衆號服務的時候,做爲一個第三方的服務商,騰訊會將各類業務消息推送到第三方開發者的服務器上,而以前的方案是消息直接進到服務上,當使用到一些業務,好比發券等操做時,騰訊服務器會向開發者發送大量的消息,因爲消息服務的處理能力有限,尤爲是高峯的時候,消息請求會直接壓到服務上,致使服務線程繁忙,這時候會報大量服務超時,觸發微信的服務報警,服務不可用,或者服務超時,這時公衆號內的消息服務將沒法繼續爲用戶提供服務。鑑於此問題,咱們從新梳理並構建了基於Spring Cloud Stream的消息驅動的微服務spring
咱們採用 Spring Cloud Finchley.SR2,Spring Boot 2.0.6.RELEASE 版原本開發,系統的初步設計思路,是利用json
消息隊列rabbitmq來解耦服務,來減緩消息直接到服務上的壓力,咱們沒有直接對接mq來使用,而是採用了Spring Cloud Stream, 簡單的來講,Spring Cloud Stream是構建消息驅動的微服務應用程序的框架,將消息整合的處理操做進行了進一步的抽象操做, 實現了更加簡化的消息處理, 可使用不一樣的代理中間件,它抽象了事件驅動的一些概念,對於消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件,切換topic。使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。緩存
Spring Cloud Stream的一些基本概念能夠自行搜索,這裏不作過多描述,下面只是講述一下具體方案和配置方法及遇到的問題。服務器
基本結構是一個很是簡單的消息訂閱發佈模式微信
message-center 做爲接受微信消息處理中心,爲消息生產者,多線程
message-for-all 做爲消息隊列消息處理服務,爲消息消費者,併發
message-center 從微信的服務器接受到消息後,採用異步多線程的方式,處理部分業務邏輯,好比多客服,好比第三方的全網發佈檢測等,須要在5秒內返回給微信服務器消息的一些及時性消息,同時根據消息類型講消息分類,併發送給消息隊列中間件,消息生產者message-center經過SpringCloudStream來做爲和消息隊列中間件的粘合劑,將消息傳遞給消息隊列中間件,這裏能夠隨意切換消息中間件而不用考慮代碼的變動,咱們這裏默認採用的rabbitmq做爲消息隊列中間件服務,app
具體配置方法,新建一個接口,這個接口中能夠定義多個管道用來發送消息,能夠實現向不一樣的exchange發送消息框架
public interface SendOutputChannel {異步
// 這裏能夠定義不一樣的通道
String MSG_SENDER = "msgSender"; // 通道名
@Output(SendOutputChannel.MSG_SENDER)
MessageChannel msgSender();
}
啓動的類中要加入@EnableBinding
@SpringBootApplication
@EnableBinding(SendOutputChannel.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
application.yml文件中配置
#---- mq的基本配置信息
spring:
rabbitmq:
host:
port:
username:
password:
virtual-host:
cloud:
stream:
bindings:
msgSender: #生產者綁定,這個是消息通道的名稱
destination: message-exchange # 這裏對應的是rabbitmq中的exchange
content-type: application/json #定義消息類型
rabbit:
bindings:
msgSender:
producer:
delivery-mode: non-persistent #消息不持久化
發送消息的類中要注入發送消息的接口,當接到微信發送的消息後,通過業務邏輯處理後,在須要向mq發消息的地方,調用發送消息的方法,經過Spring Cloud Stream來實現消息發送。
@Autowired
private SendOutputChannel sendOutputChannel;
public void sendMessage(Message<?> message) {
if (!sendOutputChannel.msgSender().send(message, TimeUnit.SECONDS.toMillis(4))) {
log.error("生產者消息發送失敗:" + message.toString());
}
}
這就完成了消息發送者的基本開發
這樣服務啓動之後,rabbitmq的exchange中將會出現一個 message-exchange的 交換機
消息接受者 message-for-all
一樣須要定義一個消息接收的管道接口,這個接口中能夠定義多個管道用來接受消息,能夠接受對應不一樣的exchange接受到的消息
public interface ReceiveInputChannel {
// 這裏能夠定義不一樣的通道
String MSG_RECEIVER = "msgReceiver"; // 通道名
@Input(ReceiveInputChannel.MSG_RECEIVER)
SubscribableChannel msgReceiver();
}
application.yml中配置
#---- mq的基本配置信息
spring:
rabbitmq:
host:
port:
username:
password:
virtual-host:
cloud:
stream:
bindings:
msgReceiver: #消費者綁定 這個是接受消息通道的名稱
group: for-all #持久化, 也就是指定隊列名稱,等同於rabbitmq中的 queue, 同一個服務不一樣的實例,使用相同的隊列名稱,處理消息
destination: message-exchange #和生產者的消息交換機要相同
content-type: application/json
consumer:
max-attempts: 3 # The number of attempts to process the message (including the first) in the event of processing failures,Default: 3
concurrency: 1 # The concurrency setting of the consumer. Default: 1.
rabbit:
bindings:
msgReceiver:
consumer:
max-concurrency: 10 # maxumum concurrency of this consumer (threads)
prefetch: 50 # number of prefetched messages pre consumer thread
requeue-rejected: false # true to requeue rejected messages, false to discard (or route to DLQ)
republish-to-dlq: true # republish failures to the DLQ with diagnostic headers
# durable-subscription: false #隊列是否要持久化
其餘一些配置都是消息每次有幾個線程處理,每一個線程處理多少數量的消息,失敗後從新嘗試處理幾回等,一些配置方案,
max-concurrency:是併發消費者數量,能夠併發處理消息。
若是對隊列的消費順序要求特備苛刻,不但願併發消費,則max-concurrency須要設置爲1,exclusive: true #惟一性 僅建立者可使用的私有隊列,斷開後自動刪除, 固然必須是在autoDelete和exclusive都爲false的時候。隊列是能夠被持久化, Exclusive參數,默認爲false, 若是設置爲true,concurrency就必須設置爲1,即只能單個消費者消費隊列裏的消息,適用於必須嚴格執行消息隊列的消費順序(先進先出)
啓動類裏要使用@EnableBinding綁定消息接收管道
@SpringBootApplication
@EnableBinding(ReceiveInputChannel.class)
public class Application {
@StreamListener(ReceiveInputChannel.MSG_RECEIVER)
public void handle(Message<String> message) throws Exception {
// 在這裏再去整合消息處理的業務邏輯
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
這樣服務啓動之後,將會出現message-exchange.for-all的一個消息隊列,根據消費者服務啓動數量的不一樣,也將會出現對應的消費者
至此整個消息處理的基本結構就描述完成了,固然實際的開發過程當中,還要考慮消息的異步處理,多線程去處理等,這裏就不詳盡描述了,須要根據本身的業務須要來實現相應的開發,
有幾點注意的狀況:
@StreamListener(ReceiveInputChannel.MSG_RECEIVER) 這個方法只能放到Application 服務啓動的類中,放到別的地方會報錯:Disp org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,可能和類的加載順序有關係。這樣在啓動類中接受消息,而後能夠再經過業務拆分,將消息轉到其餘的類中實現各自業務邏輯開發。
<!-- 添加Spring Cloud Stream與RabbitMQ消息中間件的依賴。 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 添加Spring Cloud Stream與Kafaka消息中間件的依賴。 -->
<!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> -->
根據不一樣的消息中間件,選擇不一樣的依賴。
當接收消息過多的時候,能夠增長消息生產者實例來加大消息的接受能力,當消費者處理大量阻塞消息時,處理能力降低,能夠經過增長負載的消費者服務實例數量來加大消費能力,這個須要經過實際狀況找到平衡點,消息隊列做爲緩存,下降了因爲消息直接壓到服務器上而致使的服務崩潰問題的風險。