一、group:java
組內只有1個實例消費。若是不設置group,則stream會自動爲每一個實例建立匿名且獨立的group——因而每一個實例都會消費spring
組內單次只有1個實例消費,而且會輪詢負載均衡。一般,在將應用程序綁定到給定目標時,最好始終指定consumer group負載均衡
二、destination binder:ide
與外部消息系統通訊的組件,爲構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。Binder使Spring Cloud Stream應用程序能夠靈活地鏈接到中間件,目前spring爲kafka、rabbitmq提供bindercode
三、destination binding:orm
Binding 是鏈接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder建立中間件
四、partitionblog
一個或多個生產者將數據發送到多個消費者,並確保有共同特徵標識的數據由同一個消費者處理。默認是對消息進行hashCode,而後根據分區個數取餘,因此對於相同的消息,總會落到同一個消費者上接口
注:嚴格來講partition不屬於概念,而是一種Stream提升伸縮性、吞吐量的一種方式rabbitmq
一、@Input,使用示例:
public interface MySink { @Input("my-input") SubscribableChannel input(); }
做用:
二、@Output,使用示例:
public interface MySource { @Output("my-output") MessageChannel output(); }
做用:
@Input
相似,只不過是用來生產消息三、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void receive(String messageBody) { log.info("Received: {}", messageBody); }
做用:
四、@SendTo,使用示例:
// 接收INPUT這個channel的消息,並將返回值發送到OUTPUT這個channel @StreamListener(Sink.INPUT) @SendTo(Source.OUTPUT) public String receive(String receiveMsg) { return "handle..."; }
做用:
四、@InboundChannelAdapter,使用示例:
@Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> producer() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); }
做用:
五、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
做用:
六、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); }
做用:
@ServiceActivator
相似,標註該註解的方法可以轉換消息,消息頭,或消息有效內容PollableMessageSource容許消費者能夠控制消費速率。舉個例子簡單演示一下,首先定義一個接口:
public interface PolledProcessor { @Input("pollable-input") PollableMessageSource input(); }
使用示例:
@Autowired private PolledProcessor polledProcessor; @Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }
參考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers