1.啓動zookeeper
解壓,首先啓動zookeeper,修改zookeeper/conf下面的zoo_sample.cfg, 修改裏面的dataDir=/opt/zkdata(舉例),將這個文件更名爲zoo.cfg或者複製出一個名爲zoo.cfg的文件。
啓動zookeeper
./bin/zkServer.sh startspring
2.啓動kafkaapache
修改kafka/config 的server.properties文件,修改
log.dirs=/opt/kadata
advertised.listeners=PLAINTEXT://192.168.113.136:9092(這一步很重要,暴露出地址外網能訪問,主機才能訪問虛擬機)
啓動kafka
./bin/kafka-server-start.sh -daemon config/server.properties bootstrap
必定要加上 -daemon,加上意味着程序在後臺運行springboot
3.啓動kafka建立topicapp
建立一個名爲test 的topicthis
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testspa
4.添加spring依賴server
<dependency>get
<groupId>org.apache.kafka</groupId>kafka
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.9.RELEASE</version>
</dependency>
5.修改springboot配置文件
在application.properties添加一些配置
spring.kafka.bootstrap-servers=192.168.113.136:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.bootstrap-servers=192.168.113.136:9092
spring.kafka.consumer.auto-offset-reset=earliest
6.編寫一個ProducerController
@RestController
@RequestMapping("/kafka")
public class ProducerController {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "/send", method = RequestMethod.GET)
public String sendKafka(@RequestParam("message") String message) {
try {
logger.info("kafka的消息={}", message);
kafkaTemplate.send("test", "key", message);
logger.info("發送kafka成功.");
return "successs";
} catch (Exception e) {
logger.error("發送kafka失敗", e);
return "failure";
}
}
}
7.
/**
* @Auth justinniu
* @Date 2018/9/3
* @Desc
*/
@Component
public class TestConsumer {
@KafkaListener(topics = "test")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %s, value = %s \n", record.topic(), record.key(), record.value());
}
}