本文簡單介紹下spring-cloud-stream-binder-kafka的一些屬性配置。html
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> <version>1.0.3.RELEASE</version> </dependency>
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/ChannelBindingServiceProperties.javajava
spring: cloud: stream: instanceIndex: 0 ##支持環境變量INSTANCE_INDEX ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka. ## used to partition data across different consumers.
Topic在邏輯上能夠被認爲是一個queue。每條消費都必須指定它的topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得 Kafka的吞吐率能夠水平擴展,物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾下存儲 這個partition的全部消息和索引文件。partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。spring
同一個partition內的消息只能被同一個組中的一個consumer消費。docker
當消費者數量多於partition的數量時,多餘的消費者空閒。api
消費者少於和等於partition的數量時,會出現多個partition對應一個消費者的狀況,個別消費者消費量會比其餘的多。app
instanceCount主要是consumer用的,通常小於或等於topic的partition數量,主要用做消費者的消費分區用。框架
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/BindingProperties.javamaven
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain #group: test ##consumer屬性 #producer: #consumer:
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ProducerProperties.java微服務
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1 headerMode partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass headerMode: raw
spring: cloud: stream: bindings: output: destination: event-demo content-type: text/plain producer: bufferSize: 16384 maxRequestSize: 1048576 sync: true batchTimeout: 0
spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ConsumerProperties.java性能
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: concurrency: 1 ## The concurrency of the inbound consumer. partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false. headerMode: raw
spring: cloud: stream: bindings: input: destination: event-demo content-type: text/plain consumer: autoCommitOffset: false resetOffsets: true startOffset: earliest enableDlq: false recoveryInterval: 5000
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap);
這裏頭topicCountMap告訴Kafka咱們在Consumer中將用多少個線程來消費該topic。topicCountMap的key是topic name,value針對該topic是線程的數量。
總體的話,spring cloud stream本身抽象了一部分,可是有個硬傷就是spring.cloud.stream.instanceIndex這個不大友好,這樣就形成服務的實例是有狀態的了,在基於docker部署起來比較麻煩,還不如直接原生api。若是partition很少,或者每一個consumer性能強悍的話,那麼至少部署兩個,配置起來也還能夠接受。