這事沒完,繼續聊spring cloud stream和kafka的這些小事

上一篇文章講了如何用spring cloud stream集成kafka,而且跑起來一個demo,若是這一次宣傳spring cloud stream的文章,其實到這裏就能夠啦。但實際上,工程永遠不是簡單的技術會仍是不會的問題,在實際的開發中,咱們會遇到不少的細節問題(簡稱坑),這篇文章,會把其中一些很小的點說一下,算是用實例告訴你們,工程的複雜性,每每體如今實際的繁瑣步驟中。java

 

一、group的配置

在發送消息的配置裏面,group是不用配置的git

關於這一點的證實,能夠在源代碼的註釋裏面看到github

org.springframework.cloud.stream.config.BindingPropertiesweb

二、修改topic的partitions

配置文件以下spring

bindings:
        output:
          binder: kafka
          destination: wph-d-2 #消息發往的目的地,對應topic
          content-type: text/plain #消息的格式
          producer:  
            partitionCount: 7

partitionCount是用來設置partition的數量,默認是1,若是這個topic已經建了,修改partitionCount無效,會提示錯誤json

Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:384) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	... 14 common frames omitted

根據錯誤的提示,添加autoAddPartitions服務器

kafka: 
        binder:
          brokers: #Kafka的消息中間件服務器地址
          - localhost:9092
          autoAddPartitions: true

再次啓動就能夠看到partitions數已經改了app

autoAddPartitions屬性對應的類是org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationPropertieside

設置partitionCount屬性的類是org.springframework.cloud.stream.binder.ProducerPropertiespost

三、發送json報錯

用postman發送sendMessage/complexType報錯

在服務器端的報錯內容是:

Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]

緣由是數據傳輸格式傳輸錯誤,要改一下postman發送數據的格式

而後就能happy的發出去了

四、正確的發送json並轉換成對象

若是咱們須要傳輸json的信息,那麼在發送消息端須要設置content-type爲json(其實能夠不寫,默認content-type就是json,後面會講)

bindings:
        output:
          binder: kafka
          destination: wph-d-2 #消息發往的目的地,對應topic
          content-type: application/json #消息的格式

而後經過producer發送這個消息

@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
	public String publishMessageComplextType(@RequestBody ChatMessage payload) {
		logger.info(payload.toString());
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "chatMessage").build());
		return "success";
	}

這裏須要注意的一點是ChatMessage的field name必需要有getter和settr方法,二者有一就能夠了,不然json轉換成對象的時候,field name收不到值。

在訂閱消息的時候,application.yml裏面content-type能夠不用配置,這個值默認就是「application/json」,這一點能夠在org.springframework.cloud.stream.config.BindingProperties類的註釋裏面看到

和上面同樣,ChatMessage的field name須要有getter或者setter的方法,兩者之一就行。

接收json並轉換成類的方法以下:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'")
	public void handle(ChatMessage message) {
		logger.info(message.toString());
}

有坑警告:若是咱們把發送消息端的content-type設置成text/plain,消息訂閱端的content-type設置成application/json,就會在消息訂閱端報這個錯誤

Caused by: java.lang.IllegalStateException: argument type mismatch
Endpoint [com.wphmoon.kscsclient.Consumer]

若是顛倒過來的話,發送消息端的content-type設置成application/json,消息訂閱端設置成text/plain,我實際測試過,是能夠收到消息,而且能轉換成ChatMessage對象,沒有問題。

源代碼

相關文章
相關標籤/搜索