spring-cloud-stream-binder-kafka屬性配置

本文簡單介紹下spring-cloud-stream-binder-kafka的一些屬性配置。html

maven

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.3.RELEASE</version>
        </dependency>

stream屬性

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/ChannelBindingServiceProperties.javajava

spring:
  cloud:
     stream:
       instanceIndex: 0 ##支持環境變量INSTANCE_INDEX
                        ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka
       instanceCount: 1 ## The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
                        ## used to partition data across different consumers.

Topic在邏輯上能夠被認爲是一個queue。每條消費都必須指定它的topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得 Kafka的吞吐率能夠水平擴展,物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾下存儲 這個partition的全部消息和索引文件。partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。spring

同一個partition內的消息只能被同一個組中的一個consumer消費。docker

當消費者數量多於partition的數量時,多餘的消費者空閒。api

消費者少於和等於partition的數量時,會出現多個partition對應一個消費者的狀況,個別消費者消費量會比其餘的多。app

instanceCount主要是consumer用的,通常小於或等於topic的partition數量,主要用做消費者的消費分區用。框架

bingdings屬性

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/config/BindingProperties.javamaven

spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            #group: test  ##consumer屬性
            #producer:
            #consumer:

producer

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ProducerProperties.java微服務

spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1
              headerMode
              partitionKeyExtractorClass: org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass
              partitionSelectorClass: org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass
              headerMode: raw
  • kafka producer擴展屬性
    spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaProducerProperties.java
spring:
  cloud:
     stream:
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              bufferSize: 16384
              maxRequestSize: 1048576
              sync: true
              batchTimeout: 0

consumer

spring-cloud-stream-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/ConsumerProperties.java性能

spring:
  cloud:
     stream:
        bindings:
          input:
            destination: event-demo
            content-type: text/plain
            consumer:
              concurrency: 1 ## The concurrency of the inbound consumer.
              partitioned: false ## Whether the consumer receives data from a partitioned producer.Default: false.
              headerMode: raw
  • kafka consumer擴展屬性
    spring-cloud-stream-binder-kafka-1.0.3.RELEASE-sources.jar!/org/springframework/cloud/stream/binder/kafka/KafkaConsumerProperties.java
spring:
  cloud:
     stream:
        bindings:
          input:
            destination: event-demo
            content-type: text/plain
            consumer:
              autoCommitOffset: false
              resetOffsets: true
              startOffset: earliest
              enableDlq: false
              recoveryInterval: 5000

原生api

ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

這裏頭topicCountMap告訴Kafka咱們在Consumer中將用多少個線程來消費該topic。topicCountMap的key是topic name,value針對該topic是線程的數量。

小結

總體的話,spring cloud stream本身抽象了一部分,可是有個硬傷就是spring.cloud.stream.instanceIndex這個不大友好,這樣就形成服務的實例是有狀態的了,在基於docker部署起來比較麻煩,還不如直接原生api。若是partition很少,或者每一個consumer性能強悍的話,那麼至少部署兩個,配置起來也還能夠接受。

doc

相關文章
相關標籤/搜索