這裏還要講解一下什麼是Spring Integration ? Integration 集成node
企業應用集成(EAI)是集成應用之間數據和服務的一種應用技術。四種集成風格:web
1.文件傳輸:兩個系統生成文件,文件的有效負載就是由另外一個系統處理的消息。該類風格的例子之一是針對文件輪詢目錄或FTP目錄,並處理該文件。spring
2.共享數據庫:兩個系統查詢同一個數據庫以獲取要傳遞的數據。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。數據庫
3.遠程過程調用:兩個系統都暴露另外一個能調用的服務。該類例子有EJB服務,或SOAP和REST服務。服務器
4.消息:兩個系統鏈接到一個公用的消息系統,互相交換數據,並利用消息調用行爲。該風格的例子就是衆所周知的中心輻射式的(hub-and-spoke)JMS架構。架構
比方說咱們用到了RabbitMQ和Kafka,因爲這兩個消息中間件的架構上的不一樣,像RabbitMQ有exchange,kafka有Topic,partitions分區,這些中間件的差別性致使咱們實際項目開發給咱們形成了必定的困擾,咱們若是用了兩個消息隊列的其中一種,app
後面的業務需求,我想往另一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要從新推倒從新作,由於它跟咱們的系統耦合了,這時候springcloud Stream給咱們提供了一種解耦合的方式。框架
Spring Cloud Stream由一箇中間件中立的核組成。應用經過Spring Cloud Stream插入的input(至關於消費者consumer,它是從隊列中接收消息的)和output(至關於生產者producer,它是從隊列中發送消息的。)通道與外界交流。spring-boot
通道經過指定中間件的Binder實現與外部代理鏈接。業務開發者再也不關注具體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務便可。微服務
Binder
經過定義綁定器做爲中間層,實現了應用程序與消息中間件(Middleware)細節之間的隔離。經過嚮應用程序暴露統一的Channel經過,使得應用程序不須要再考慮各類不一樣的消息中間件的實現。當須要升級消息中間件,或者是更換其餘消息中間件產品時,咱們須要作的就是更換對應的Binder綁定器而不須要修改任何應用邏輯 。甚至能夠任意的改變中間件的類型而不須要修改一行代碼。目前只提供了RabbitMQ和Kafka的Binder實現。
Springcloud Stream還有個好處就是像Kafka同樣引入了一點分區的概念,像RabbitMQ不支持分區的隊列,你用了SpringCloud Stream技術,它就會幫RabbitMQ引入了分區的特性,SpringCloud Stream就是自然支持分區的,咱們用起來仍是很方便的。後面會詳細講解
接下來進行一個Demo進行演練。
首先咱們要在先前的工程中新建三個子模塊,分別是springcloud-stream,springcloud-stream1,springcloud-stream2 這三個模塊,其中springcloud-stream做爲生產者進行發消息模塊,springcloud-stream1,springcloud-stream2做爲消息接收模塊。
以下圖所示:
分別在springcloud-stream,springcloud-stream1,springcloud-stream2 這三個模塊引入以下依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
接着進行application.yml進行配置以下:
server: port: 7888 spring: application: name: producer cloud: stream: kafka: binder: #Kafka的消息中間件服務器 brokers: localhost:9092 #Zookeeper的節點,若是集羣,後面加,號分隔 zk-nodes: localhost:2181 #若是設置爲false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。 auto-create-topics: true bindings: #這裏用stream給咱們提供的默認output,後面會講到自定義output output: #消息發往的目的地 destination: stream-demo #消息發送的格式,接收端不用指定格式,可是發送端要 content-type: text/plain
接下來進行第一個springcloud-stream模塊的代碼編寫,在該模塊下定義一個SendService,以下:
package hjc.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; /** * Created by cong on 2018/5/28. */ //這個註解給咱們綁定消息通道的,Source是Stream給咱們提供的,能夠點進去看源碼,能夠看到output和input,這和配置文件中的output,input對應的。 @EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMsg(String msg){ source.output().send(MessageBuilder.withPayload(msg).build()); } }
springcloud-stream 的controller層代碼以下:
package hjc.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * Created by cong 2018/5/28 */ @RestController public class ProducerController { @Autowired private SendService sendService; @RequestMapping("/send/{msg}") public void send(@PathVariable("msg") String msg){ sendService.sendMsg(msg); } }
接下來進行springcloud-stream1,springcloud-stream2兩個模塊的代碼編寫
首先須要引入的依賴,上面已經提到。
接着進行springcloud-stream1和springcloud-stream2模塊application.yml的配置,以下:
springcloud-stream1配置以下:
server: port: 7889 spring: application: name: consumer_1 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: #input是接收,注意這裏不能再像前面同樣寫output了 input: destination: stream-demo
springcloud-stream2模塊application.yml的配置以下:
server: port: 7890 spring: application: name: consumer_2 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: input: destination: stream-demo
好了接下來進行springcloud-stream1模塊和springcloud-stream2模塊的消息接受代碼的編寫,springcloud-stream1模塊和springcloud-stream2模塊的消息接受代碼都是同樣的,以下:
//消息接受端,stream給咱們提供了Sink,Sink源碼裏面是綁定input的,要跟咱們配置文件的imput關聯的。 @EnableBinding(Sink.class) public class RecieveService { @StreamListener(Sink.INPUT) public void recieve(Object payload){ System.out.println(payload); } }
好了接着咱們首先要啓動上一篇隨筆所提到的zookeeper,和Kafka,以下:
接着分別現後啓動啓動springcloud-stream,springcloud-stream1,springcloud-stream2,模塊運行結果以下:
首先進行springcloud-stream模塊的訪問,以下:
回車後能夠看到,Kafka CommitId,說明消息發送成功,再看一下,那兩個消息接受模塊的輸出,以下:
能夠看到這兩消息模塊都接收到了消息而且打印了出來。
好了到如今爲止,咱們進行了一個簡單的消息發送和接收,用的是Stream給咱們提供的默認Source,Sink,接下來咱們要本身進行自定義,這種方式在工做中仍是用的比較多的,由於咱們要往不一樣的消息通道發消息,
必然不能全都叫input,output的,那樣的話就亂套了,所以首先自定義一個接口,以下:
/** * Created by cong on 2018/5/28. */ public interface MySource { @Output("myOutput") MessageChannel myOutput(); }
這裏要注意一下,能夠看到上面的代碼,其中myOutput是要和你的配置文件的消息發送端配置對應的,所以修改springcloud-stream中application.yml配置,以下:
server: port: 7888 spring: application: name: producer cloud: stream: kafka: binder: #Kafka的消息中間件服務器 brokers: localhost:9092 #Zookeeper的節點,若是集羣,後面加,號分隔 zk-nodes: localhost:2181 #若是設置爲false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。 auto-create-topics: true bindings: #自定義output myOutput: #消息發往的目的地 destination: stream-demo #消息發送的格式,接收端不用指定格式,可是發送端要 content-type: text/plain
這樣還不行,還必須改造springcloud-stream消息發送端的SendService這個類,代碼以下:
package hjc.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; /** * Created by cong on 2018/5/28. */ @EnableBinding(MySource.class) public class SendService { @Autowired private MySource source; public void sendMsg(String msg){ source.myOutput().send(MessageBuilder.withPayload(msg).build()); } }
接下來從新啓動那三個模塊,運行結果以下:
能夠看到兩個消息接收端仍是依然能接受消息。
接收端的自定義接收也是相似的修改的,這裏就不演示了。
springcloud-stream還給咱們提供了一個Processor接口,用於進行消息處理後再進行發送出去,至關於一個消息中轉站。下面咱們進行演示
首先咱們須要改造springcloud-stream1模塊,把它做爲一個消息中轉站。用於springcloud-stream1消息處理後再進行發送給springcloud-stream2模塊
首先修改springcloud-stream1模塊的配置,以下:
server: port: 7889 spring: application: name: consumer_1 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: #input是接收,注意這裏不能再像前面同樣寫output了 input: destination: stream-demo #進行消息中轉處理後,在進行轉發出去 output: destination: stream-demo-trans
接着在新建一個消息中轉類,代碼以下:
package hjc.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.ServiceActivator; /** * Created by cong on 2018/5/28. */ @EnableBinding(Processor.class) public class TransFormService { @ServiceActivator(inputChannel = Processor.INPUT,outputChannel = Processor.OUTPUT) public Object transform(Object payload){ System.out.println("消息中轉站:"+payload); return payload; } }
接着要修改消息中轉站發送消息出去的接收端springcloud-stream2的配置,以下:
server: port: 7890 spring: application: name: consumer_2 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: input: destination: stream-demo-trans
這裏要強調一下,要把先前RecieveService類的綁定註解全都註釋掉,否則,會綁定衝突的,接下來分別重啓這三個模塊,運行結果以下:
先進性springcloud-stream模塊的訪問。
中轉站運行結果取下:
接下來,看中轉後的的接受端Springcloud-stream2的消息,到底有沒有消息過來,以下:
能夠看到,中轉後消息被接受到了。
咱們還可能會遇到一個場景就是,咱們接收到消息後,給別人一個反饋ACK,SpringCloud stream 給咱們提供了一個SendTo註解能夠幫咱們幹這些事情。
首先咱們先實現一個接口SendToBinder去實現output和input,代碼以下:
package hjc.consumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * Created by cong on 2018/5/28. */ public interface SendToBinder { @Output("output") MessageChannel output(); @Input("input") SubscribableChannel input(); }
接着再新建一個SendToService類來綁定本身的SendToBinder接口,而後監聽input,返回ACK表示中轉站收到消息了,再轉發消息出去,代碼以下:
package hjc.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; /** * Created by cong on 2018/5/28. */ @EnableBinding(SendToBinder.class) public class SendToService { @StreamListener("input") @SendTo("output") public Object receiveFromInput(Object payload){ System.out.println("中轉消息。。"+payload); return "xxxxx"; } }
這裏要注意一點就是,啓動前下那邊以前的用到的哪些綁定註解,先註釋掉,否則與這裏會發生衝突。
運行結果以下:
能夠看到發送端受到一個ACK
能夠看到先前的例子,咱們都是一端發消息,兩個消息接受者都接收到了,可是有時候有些業務場景我只想讓其中一個消息接收者接收到消息,那麼該怎麼辦呢?
這時候就涉及一個消息分組(Consumer Groups)的概念了。
「Group」,若是使用過 Kafka 的讀者並不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是很是必須的。對於這種狀況,同一個事件防止被重複消費,
只要把這些應用放置於同一個 「group」 中,就可以保證消息只會被其中一個應用消費一次。不一樣的組是能夠消費的,同一個組內會發生競爭關係,只有其中一個能夠消費。
server: port: 7889 spring: application: name: consumer_1 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: #input是接收,注意這裏不能再像前面同樣寫output了 input: destination: stream-demo #分組的組名 group: group
接着修改springcloud-stream2模塊的配置,代碼以下:
server: port: 7890 spring: application: name: consumer_2 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: input: destination: stream-demo-trans group: group
能夠看到springcloud-stream1和springcloud-stream2是屬於同一組的。springcloud-stream模塊的發的消息只能被springcloud-stream1或springcloud-stream2其中一個接收到,這樣避免了重複消費。
springcloud-stream1模塊代碼恢復成以下代碼:
package hjc.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; /** * Created by cong on 2018/5/28. */ //消息接受端,stream給咱們提供了Sink,Sink源碼裏面是綁定input的,要跟咱們配置文件的imput關聯的。 @EnableBinding(Sink.class) public class RecieveService { @StreamListener(Sink.INPUT) public void recieve(Object payload){ System.out.println(payload); } }
springcloud-stream2的接收端代碼不變,依然跟上面代碼同樣。
接着,運行結果以下:
控制檯以下:
能夠看到只有其中一個受到消息。避免了消息重複消費。
有時候咱們只想給特定的消費者消費消息,那麼又該真麼作呢?
這是後又涉及到消息分區的概念了。
Spring Cloud Stream對給定應用的多個實例之間分隔數據予以支持。在分隔方案中,物理交流媒介(如:代理主題)被視爲分隔成了多個片(partitions)。一個或者多個生產者應用實例給多個消費者應用實例發送消息並確保相同特徵的數據被同一消費者實例處理。
Spring Cloud Stream對分割的進程實例實現進行了抽象。使得Spring Cloud Stream 爲不具有分區功能的消息中間件(RabbitMQ)也增長了分區功能擴展。
那麼咱們就要進行一些配置了,好比我只想要springcloud-stream2模塊接收到消息,
springcloud-stream2配置以下:
server: port: 7890 spring: application: name: consumer_2 cloud: stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2181 auto-create-topics: true bindings: input: destination: stream-demo-trans group: group consumer: #開啓分區 partitioned: true #分區數量 instance-count: 2
生產者端springcloud-stream模塊配置以下:
server: port: 7888 spring: application: name: producer cloud: stream: kafka: binder: #Kafka的消息中間件服務器 brokers: localhost:9092 #Zookeeper的節點,若是集羣,後面加,號分隔 zk-nodes: localhost:2181 #若是設置爲false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。 auto-create-topics: true bindings: #自定義output myOutput: #消息發往的目的地 destination: stream-demo #消息發送的格式,接收端不用指定格式,可是發送端要 content-type: text/plain producer: #分區的主鍵,根據什麼來分區,下面的payload.id只是一個對象的id用於作爲Key,用來講明的。但願不要誤解 partitionKeyExpression: payload.id #Key和分區數量進行取模去分配消息,這裏分區數量配置爲2 partitionCount: 2
其餘的代碼基本不變,這裏就不演示了。這裏要給你們說明一下,好比分區的Key是一個對象的id,好比說id=1,每次發送消息的對象的id爲相同值1,則消息只會被同一個消費者消費,好比說Key和分區數量取模計算的結果是分到stream2模塊中,那麼下一次進行進行消息發送,
只要分組的key即id的值依然仍是1的話,消息永遠只會分配到stream2模塊中。