下載kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz java
kafka最爲重要三個配置依次爲:broker.id、log.dir、zookeeper.connectgit
在kafka server端 config/server.properties中設置web
必需要配置:spring
advertised.listeners=PLAINTEXT://192.168.3.201:9092 # 公佈訪問地址和端口 apache
啓動kafka bootstrap
bin/kafka-server-start.sh ../config/server.properties api
檢測是否啓動 app
netstat -tunlp | egrep " (2181|9092)"tcp
或 lsof -i:9092post
測試發送信息和消費消息
建立主題
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 - topic test
生產者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消費者
./kafkaconsole-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
若是想在外部使用kafka必須 9092 端口加入到防火牆列表
firewall-cmd --list-ports 查詢全部放行端口
firewall-cmd --add-port=9092/tcp # 臨時端口放行
firewall-cmd --add-port=9092/tcp --permanent # 永久放行
firewall-cmd --reload # 從新載入放行列表
引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
編寫生成者
package com.example.springkafka.api; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @Date: 2018/11/6 20:25 * @Description: 生產者 */ public class KafkaProducerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers","192.168.3.221:9092"); properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); String topic = "message"; // 主題 Integer partition = 0; // 指定分區 long timeMillis = System.currentTimeMillis(); // 毫秒值 15分鐘 String key = "key-message"; // key String value = "value-message"; // value // 建立ProducerRecord ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value); // 生產消息 kafkaProducer.send(producerRecord); kafkaProducer.close(); } }
編寫消費者
package com.example.springkafka.api; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; /** * @Date: 2018/11/6 20:25 * @Description: 消費者 */ public class KafkaConsumerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.3.221:9092"); properties.setProperty("group.id", "group-1"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer", StringDeserializer.class.getName()); // 建立kafka的消費者對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 訂閱kafka主題 kafkaConsumer.subscribe(Arrays.asList("message")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.printf("========offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
生成者與消費者配置
# 生成者配置 spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 192.168.3.221:9092 consumer: # 消費者 group-id: gerry-1 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer kafka: topic: gerry
生成者代碼
package com.example.springcloudkafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @Date: 2018/11/6 21:03 * @Description: */ @RestController public class KafkaProducerController { public final KafkaTemplate<String, String> kafkaTemplate; private final String topic; public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic}") String topic) { this.kafkaTemplate = kafkaTemplate; this.topic = topic; } @PostMapping("message/send") // 這種方式只支持post public boolean sendMessage(@RequestParam String message) { kafkaTemplate.send(topic,message); return true; } }
消費者代碼
package com.example.springcloudkafka.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @Date: 2018/11/6 21:20 * @Description: */ @Component public class KafkaConsumerListener { @KafkaListener(topics={"${kafka.topic}"}) public void getMessage(String message) { System.out.println("kafka 消費者監聽,接收到消息:" + message); } }
官方定義三個接口
Source=> 發送者 Producer、Publisher
Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce
引入依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
配置:
# 生成者配置
spring:
kafka:
bootstrap-servers: 192.168.3.221:9092
cloud:
stream:
bindings:
output:
destination: ${kafka.topic}
input:
destination: ${kafka.topic}
kafka:
topic: cloud-stream
生產者:
package com.example.springcloudstreamkafkademo.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component @EnableBinding(Source.class) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel messageChannel; @Autowired private Source source; /** * 發送信息 * @param message */ public void send(String message) { // 經過消息管道發送消息 // messageChannel.send(MessageBuilder.withPayload(message).build()); source.output().send(MessageBuilder.withPayload(message).build()); } }
消費者
package com.example.springcloudstreamkafkademo.consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @EnableBinding(value={Sink.class}) public class MessageConsumerBean { @Autowired @Qualifier(Sink.INPUT) private SubscribableChannel subscribableChannel; //一、 當subscribableChannel注入完成後完成回調 @PostConstruct public void init() { subscribableChannel.subscribe(message->{ System.out.println(message.getPayload()); }); } // 二、@ServiceActivator @ServiceActivator(inputChannel=Sink.INPUT) public void message(String message) { System.out.println("@ServiceActivator:"+message); } //三、@StreamListener @StreamListener(Sink.INPUT) public void onMessage(String message) { System.out.println("@StreamListener:"+message); } }
引入依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
配置
spring:
cloud:
stream:
bindings:
output:
destination: ${rabbit.queue}
input:
destination: ${rabbit.queue}
rabbitmq:
host: 192.168.3.221
port: 5672
username: rabbit
password: rabbit
rabbit:
queue: cloud-stream-queue
代碼同kafka
完整代碼詳見:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20