Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。java
經過上面的介紹也能夠看出:Kafka給自身的定位並不只僅是一個消息系統,而是經過發佈訂閱消息機制實現的分佈式流平臺。nginx
Producer:數據生產者web
Consumer:數據消費者spring
Consumer Group:消費者組apache
Broker:服務節點json
Topic:主題bootstrap
Partition:分區緩存
Replication:分區的副本服務器
Replication Leader:副本的老大網絡
Replication Manager:副本的管理者
Partition:分區
Replication:分區的副本
Kafka依賴於zookeeper實現分佈式系統的協調,因此須要同時安裝zookeeper。兩個的安裝包到官網下載。
在zookeeper解壓後的目錄下找到conf文件夾,進入後,複製文件zoo_sample.cfg,並命名爲zoo.cfg。zoo.cfg中一共五個配置項,可使用默認配置。
進入kafka根目錄下的config文件夾下,打開server.properties,修改以下配置項(通常默認即爲以下,無需修改)
zookeeper.connect=localhost:2181 broker.id=0 log.dirs=/tmp/kafka-logs
另外,config文件夾下也包含有zookeeper的配置文件,能夠在其中設置配置項,啓動zookeeper時引用這個配置文件,實現定製化。
Kafka的bin目錄包含了大多數功能的啓動腳本,能夠經過它們控制Kafka的功能開啓。
啓動Kafka
建立Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-kafka-topic 查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181 啓動生產者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic 啓動消費者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning 生產消息:first message 生產消息:second message
引入依賴pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zang</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
相應實體
package com.zang.kafka.common; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; /** * 〈消息實體〉<br> */ @Getter @Setter @EqualsAndHashCode @ToString public class MessageEntity { /** * 標題 */ private String title; /** * 內容 */ private String body; }
package com.zang.kafka.common; import lombok.Getter; import lombok.Setter; import java.io.Serializable; /** * 〈REST請求統一響應對象〉<br> */ @Getter @Setter public class Response implements Serializable{ private static final long serialVersionUID = -1523637783561030117L; /** * 響應編碼 */ private int code; /** * 響應消息 */ private String message; public Response(int code, String message) { this.code = code; this.message = message; } }
package com.zang.kafka.common; /** * 〈錯誤編碼〉<br> */ public class ErrorCode { /** * 成功 */ public final static int SUCCESS = 200; /** * 失敗 */ public final static int EXCEPTION = 500; }
生產者
package com.zang.kafka.producer; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * 〈生產者〉 */ @Component public class SimpleProducer { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void send(String topic, String key, Object entity) { logger.info("發送消息入參:{}", entity); ProducerRecord<String, Object> record = new ProducerRecord<>( topic, key, JSON.toJSONString(entity) ); long startTime = System.currentTimeMillis(); ListenableFuture<SendResult<String, Object>> future = this.kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { logger.error("消息發送失敗:{}", ex); } @Override public void onSuccess(SendResult<String, Object> result) { long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); StringBuilder record = new StringBuilder(128); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(entity).append(")") .append("send to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); logger.info("消息發送成功:{}", record.toString()); } }); } }
消費者
package com.zang.kafka.consumer; import com.alibaba.fastjson.JSONObject; import com.zang.kafka.common.MessageEntity; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; /** * 〈消費者〉<br> */ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判斷是否NULL Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //獲取消息 Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info("接收消息Topic:{}", topic); logger.info("接收消息Record:{}", record); logger.info("接收消息Message:{}", messageEntity); } } }
控制器
package com.zang.kafka.controller; import com.alibaba.fastjson.JSON; import com.zang.kafka.common.ErrorCode; import com.zang.kafka.common.MessageEntity; import com.zang.kafka.common.Response; import com.zang.kafka.producer.SimpleProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; /** * 〈生產者〉<br> */ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key";/** * 消息發送 * @param message * @return */ @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka的消息:{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka消息發送成功!"); return new Response(ErrorCode.SUCCESS,"kafka消息發送成功"); } catch (Exception ex) { logger.error("kafka消息發送失敗:", ex); return new Response(ErrorCode.EXCEPTION,"kafka消息發送失敗"); } } }
配置application.properties
##----------kafka配置 ## TOPIC kafka.topic.default=my-kafka-topic # kafka地址 spring.kafka.bootstrap-servers=47.88.156.142:9092 # 生產者配置 spring.kafka.producer.retries=0 # 批量發送消息的數量 spring.kafka.producer.batch-size=4096 # 緩存容量 spring.kafka.producer.buffer-memory=40960 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消費者配置 spring.kafka.consumer.group-id=my spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的線程數,用於提升併發量 spring.kafka.listener.concurrency=3
啓動類
package com.zang.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; @SpringBootApplication @EnableKafka public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
避免殭屍實例
來源:
慕課網課程:https://www.imooc.com/learn/1043
參考:
https://blog.csdn.net/liyiming2017/article/details/82790574
https://blog.csdn.net/YChenFeng/article/details/74980531