Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.java
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.node
野生翻譯:spring cloud stream是打算統一消息中間件後宮的男人,他身手靈活,身後有靠山spring,會使十八般武器(消息訂閱模式啦,消費者組,stateful partitions什麼的),目先後宮有東宮娘娘kafka和西宮娘娘rabbitMQ。git
八卦黨:今天咱們扒一扒spring cloud stream和kafka的關係,rabbitMQ就讓她在冷宮裏面呆着吧。github
A streaming platform has three key capabilities:web
野生翻譯:老孃是個流處理平臺,能幹的活可多了:spring
總結一句話,就是快、穩、準。apache
kafka的運行很是簡單,從這裏下載,而後先運行zookeeper。在最新的kafka的下載包裏面也包含了一個zookeeper,能夠直接用裏面的。zookeeper啓動後,須要在kafka的配置文件裏面配置好zookeeper的ip和端口,配置文件是config/server.properties。windows
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000複製代碼
而後運行bin目錄下的命令,啓動kafka就能夠啦bash
bin/kafka-server-start.sh -daemon config/server.properties複製代碼
kafka雖然啓動了,但咱們須要瞭解她的話,仍是須要一個總管來彙報狀況,我這邊用的就是kafka-manager,下載地址在這裏。很惋惜的是隻有源代碼的下載,沒有可運行版本的,須要自行編譯,這個編譯速度還挺慢的,我這邊提供一個編譯好的版本給你們,點這裏。服務器
kafka-manager一樣須要配置一下和kafka的關係,在conf/application.conf文件裏面,不過配置的不是kafka本身,而是kafka掛載的zookeeper。
kafka-manager.zkhosts="localhost:2181"複製代碼
而後啓動bin/kafka-manager就能夠了(windows環境下也有kafka-manager.bat能夠運行)
這裏有個坑,在windows下面運行的話,可能啓動失敗,提示輸入行太長
這個是由於目錄太長,把kafak-manager-2.0.0.2目錄名縮短就能夠正常運行了。
啓動後經過Add Cluster把Cluster Zookeeper Host把zookeeper的地址端口填上,Kafka Version的版本必定要和正在使用的kafka版本對上,不然可能看不到kafka的內容。
而後咱們就能看到kafka的broker,topic,consumers,partitions等信息了。
一切的起點,還在start.spring.io
這黑乎乎的界面是spring爲了萬聖節搞的事情。和咱們相關的是右邊這兩個依賴,這兩個依賴在pom.xml裏面對應的是這些
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>複製代碼
不過只憑這些還不行,直接運行的話,會提示
Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka複製代碼
還須要加上一個依賴包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>複製代碼
spring cloud stream項目框架搭好後,咱們須要分兩個部分,一個是發消息的部分,一個是收消息的地方。咱們先看發消息的部分,首先是配置文件,application.yml
spring:
cloud:
stream:
default-binder: kafka #默認的綁定器,
kafka: #若是用的是rabbitMQ這裏填 rabbit
binder:
brokers: #Kafka的消息中間件服務器地址
- localhost:9092
bindings:
output: #通道名稱
binder: kafka
destination: test1 #消息發往的目的地,對應topic
group: output-group-1 #對應kafka的group
content-type: text/plain #消息的格式複製代碼
注意這裏的output,表示是發佈消息的,和後面訂閱消息是對應的。這個output的名字是消息通道名稱,是能夠自定義的,後面會講到。
而後咱們須要建立一個發佈者
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
public class Producer {
private Source mySource;
public Producer(Source mySource) {
super();
this.mySource = mySource;
}
public Source getMysource() {
return mySource;
}
public void setMysource(Source mysource) {
mySource = mySource;
}
}複製代碼
@EnableBinding 按字面理解就知道是綁定通道的,綁定的通道名就是上面的output,Soure.class是spring 提供的,表示這是一個可綁定的發佈通道,它的通道名稱就是output,和application.yml裏面的output對應
源碼能夠看的很清楚
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* Bindable interface with one output channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Source {
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
* @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();
}
複製代碼
若是咱們須要定義咱們本身的通道,能夠本身寫一個類,好比下面這種,通道名就改爲了my-out
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MySource {
String INPUT = "my-in";
String OUTPUT = "my-out";
@Input(INPUT)
SubscribableChannel myInput();
@Output(OUTPUT)
MessageChannel myOutput();
}複製代碼
這樣的話,application.yml就要改了
my-out:
binder: kafka
destination: mytest #消息發往的目的地,對應topic
group: output-group-2 #對應kafka的group
content-type: text/plain #消息的格式複製代碼
Product.class的@EnableBinding也須要改,爲了作對應,我另外寫了一個MyProducer
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(MySource.class)
public class MyProducer {
private MySource mySource;
public MyProducer(MySource mySource) {
super();
this.mySource = mySource;
}
public MySource getMysource() {
return mySource;
}
public void setMysource(MySource mysource) {
mySource = mySource;
}
}複製代碼
這樣,發佈消息的部分就寫好了,咱們寫個controller來發送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.wphmoon.kscs.service.ChatMessage;
import com.wphmoon.kscs.service.MyProducer;
import com.wphmoon.kscs.service.Producer;
@RestController
public class MyController {
@Autowired
private Producer producer;
@Autowired
private MyProducer myProducer;
// get the String message via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
public String publishMessageString(@RequestBody String payload) {
// send message to channel output
producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
return "success";
}
@RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST)
public String publishMyMessageString(@RequestBody String payload) {
// send message to channel myoutput
myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
return "success";
}
}複製代碼
很簡單,直接調用producer發送一個字符串就好了,我使用postman來發起這個動做
消息發送出去了,咱們怎麼收消息呢?往下看。
一樣的,咱們用以前的spring cloud stream項目框架作收消息的部分,首先是application.yml文件
server:
port: 8081
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers:
- localhost:9092
bindings:
input:
binder: kafka
destination: test1
content-type: text/plain
group: input-group-1
my-in:
binder: kafka
destination: mytest
content-type: text/plain
group: input-group-2
複製代碼
重點關注的就是input和my-in ,這個和以前的output和my-out一一對應。
默認和Source類對應的是Sink,這個是官方提供的,代碼以下
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* Bindable interface with one input channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Sink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
SubscribableChannel input();
}
複製代碼
調用它的類Consumer用來接收消息,代碼以下
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;
@EnableBinding(Sink.class)
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
logger.info("recieved a string message : " + message);
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
public void handle(@Payload ChatMessage message) {
final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
.withZone(ZoneId.systemDefault());
final String time = df.format(Instant.ofEpochMilli(message.getTime()));
logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
}
}複製代碼
而咱們自定義channel的類MySink和MyConsumer代碼以下:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String INPUT = "my-in";
@Input(INPUT)
SubscribableChannel myInput();
}複製代碼
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;
@EnableBinding(MySink.class)
public class MyConsumer {
private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);
@StreamListener(target = MySink.INPUT)
public void consume(String message) {
logger.info("recieved a string message : " + message);
}
@StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'")
public void handle(@Payload ChatMessage message) {
final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
.withZone(ZoneId.systemDefault());
final String time = df.format(Instant.ofEpochMilli(message.getTime()));
logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
}
}複製代碼
這樣就OK了,當上面咱們用postman發了消息後,這邊就能直接在日誌裏面看到
2019-10-29 18:42:39.455 INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer : recieved a string message : 你瞅啥
2019-10-29 18:43:17.017 INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer : recieved a string message : 你瞅啥複製代碼
咱們在application.yml裏面定義的destination,就是kafka的topic,在kafka-manager的topic list裏面能夠看到
而接收消息的consumer也能夠看到
這就是spring cloud stream和kafka的帝后之戀,不過他們這種政治聯姻哪有這麼簡單,裏面複雜的部分咱們後面再講,敬請期待,起駕回宮(野生翻譯:The Return of the King)