Kafka是分佈式發佈-訂閱消息系統,最初由LinkedIn公司開發,以後成爲以後成爲Apache基金會的一部分,由 Scala和 Java編寫。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。
在開始本文前,須要搭建kafka的環境,若是是在CentOS環境下,能夠看看我前面的文章:CentOS7下Kafka的安裝介紹 。其餘平臺下能夠自行百度或Google。java
在以前的環境中,須要修改server.properties文件,開啓9092端口的監聽:git
listeners=PLAINTEXT://your.host.name:9092
由於SpringCloud是基於SpringBoot的,因此在使用SpringCloudBus整合以前先用SpringBoot整合並記錄下來。web
這裏建立一個名爲kafka-hello的SpringBoot項目,並添加如下依賴:spring
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies>
@Data public class Message { private Long id;//id private String msg; //消息 private Date sendTime; //發送時間 }
在該類中建立一個消息發送的方法,使用KafkaTemplate.send()發送消息,wqh
是Kafka裏的Topic。apache
@Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String,String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send(Long i){ Message message = new Message(); message.setId(i); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("========發送消息 "+i+" >>>>{}<<<<<==========",gson.toJson(message)); kafkaTemplate.send("wqh",gson.toJson(message)); } }
在這個類中,建立consumer方法,並使用@KafkaListener註解監聽指定的topic,如這裏是監聽wanqh和wqh兩個topic。bootstrap
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = {"wanqh","wqh"}) public void consumer(ConsumerRecord<?,?> consumerRecord){ //判斷是否爲null Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); log.info(">>>>>>>>>> record =" + kafkaMessage); if(kafkaMessage.isPresent()){ //獲得Optional實例中的值 Object message = kafkaMessage.get(); log.info(">>>>>>>>接收消息message =" + message); } } }
@SpringBootApplication public class KafkaApplication { @Autowired private KafkaSender kafkaSender; @PostConstruct public void init(){ for (int i = 0; i < 10; i++) { //調用消息發送類中的消息發送方法 kafkaSender.send((long) i); } } public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
spring.application.name=kafka-hello server.port=8080 #============== kafka =================== # 指定kafka 代理地址,能夠多個 spring.kafka.bootstrap-servers=192.168.18.136:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # 每次批量發送消息的數量 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默認消費者group id spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
直接啓動該項目:bash
前面介紹使用RabbitMQ整合SpringCloudBus實現了消息總線,而且測試了動態刷新配置文件。RabbitMQ是經過引入spring-cloud-starter-bus-amqp
模塊來實現消息總線。若使用Kafka實現消息總線,咱們能夠直接將以前添加的spring-cloud-starter-bus-amqp
替換成spring-cloud-starter-bus-kafka
。架構
這裏我將前面的config-client複製一份,更名config-client-kafka。傳送門:SpingCloudBus整合RabbitMQapp
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency> </dependencies>
#Kafka的服務端列表,默認localhost spring.cloud.stream.kafka.binder.brokers=192.168.18.136:9092 #Kafka服務端的默認端口,當brokers屬性中沒有配置端口信息時,就會使用這個默認端口,默認9092 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 #Kafka服務端鏈接的ZooKeeper節點列表,默認localhost spring.cloud.stream.kafka.binder.zkNodes=192.168.18.136:2181 #ZooKeeper節點的默認端口,當zkNodes屬性中沒有配置端口信息時,就會使用這個默認端口,默認2181 spring.cloud.stream.kafka.binder.defaultZkPort=2181
測試方法與前一篇同樣,不介紹了。dom
參考:
項目地址:
原文[地址: