慕課網《Kafka流處理平臺》學習總結html
課程介紹java
LinkedIn 開源git
Kafka發展歷程github
Kafka的特性web
Kafka一般被用於spring
Kafka是什麼apache
Producer:數據生產者json
Consumer:數據消費者bootstrap
Consumer Group:消費者組緩存
Broker:服務節點
Topic:主題
Partition:分區
Replication:分區的副本
Replication Leader:副本的老大
Replication Manager:副本的管理者
Partition:分區
Replication:分區的副本
Kafka功能結構
Kafka數據流勢
Kafka消息結構
Kafka特色:分佈式
Kafka特色:高性能
Kafka特色:持久性與擴展性
Kafka應用場景
Kafka簡單案例
學習筆記
1.下載與安裝 Zookeeper下載:https://zookeeper.apache.org/releases.html#download Kafka下載:http://kafka.apache.org/downloads 安裝:解壓、配置環境變量 2.Zookeeper啓動 解壓:tar -zxf zookeeper-3.4.12.tar.gz 目錄:cd zookeeper-3.4.12/bin 啓動:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties 3.Kafka啓動 解壓:tar -zxf kafka_2.12-2.0.0.tgz 目錄:cd kafka_2.12-2.0.0 啓動:sudo bin/kafka-server-start.sh config/server.properties 4.使用控制檯操做生產者與消費者 建立Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic 查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181 啓動生產者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic 啓動消費者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning 生產消息:first message 生產消息:second message
建立49-kafka-example的maven工程pom以下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>49-kafka</artifactId> <groupId>com.myimooc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>49-kafka-example</artifactId> <properties> <spring.boot.version>2.0.4.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
1.編寫MessageEntity
package com.myimooc.kafka.example.common; import java.util.Objects; /** * <br> * 標題: 消息實體<br> * 描述: 消息實體<br> * 時間: 2018/09/09<br> * * @author zc */ public class MessageEntity { /** * 標題 */ private String title; /** * 內容 */ private String body; @Override public String toString() { return "MessageEntity{" + "title='" + title + '\'' + ", body='" + body + '\'' + '}'; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } MessageEntity that = (MessageEntity) o; return Objects.equals(title, that.title) && Objects.equals(body, that.body); } @Override public int hashCode() { return Objects.hash(title, body); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } }
2.編寫SimpleProducer
package com.myimooc.kafka.example.producer; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * <br> * 標題: 生產者<br> * 描述: 生產者<br> * 時間: 2018/09/09<br> * * @author zc */ @Component public class SimpleProducer<T> { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public void send(String topic, String key, Object entity) { logger.info("發送消息入參:{}", entity); ProducerRecord<String, Object> record = new ProducerRecord<>( topic, key, JSON.toJSONString(entity) ); long startTime = System.currentTimeMillis(); ListenableFuture<SendResult<String, Object>> future = this.kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { logger.error("消息發送失敗:{}", ex); } @Override public void onSuccess(SendResult<String, Object> result) { long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); StringBuilder record = new StringBuilder(128); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(entity).append(")") .append("send to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); logger.info("消息發送成功:{}", record.toString()); } }); } }
3.編寫SimpleConsumer
package com.myimooc.kafka.example.consumer; import com.alibaba.fastjson.JSONObject; import com.myimooc.kafka.example.common.MessageEntity; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; /** * <br> * 標題: 消費者<br> * 描述: 消費者<br> * 時間: 2018/09/09<br> * * @author zc */ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判斷是否NULL Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //獲取消息 Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info("接收消息Topic:{}", topic); logger.info("接收消息Record:{}", record); logger.info("接收消息Message:{}", messageEntity); } } }
4.編寫Response
package com.myimooc.kafka.example.common; import java.io.Serializable; /** * <br> * 標題: REST請求統一響應對象<br> * 描述: REST請求統一響應對象<br> * 時間: 2018/09/09<br> * * @author zc */ public class Response implements Serializable { private static final long serialVersionUID = -972246069648445912L; /** * 響應編碼 */ private int code; /** * 響應消息 */ private String message; public Response() { } public Response(int code, String message) { this.code = code; this.message = message; } @Override public String toString() { return "Response{" + "code=" + code + ", message='" + message + '\'' + '}'; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
5.編寫ErrorCode
package com.myimooc.kafka.example.common; /** * <br> * 標題: 錯誤編碼<br> * 描述: 錯誤編碼<br> * 時間: 2018/09/09<br> * * @author zc */ public class ErrorCode { /** * 成功 */ public final static int SUCCESS = 200; /** * 失敗 */ public final static int EXCEPTION = 500; }
6.編寫ProducerController
package com.myimooc.kafka.example.controller; import com.alibaba.fastjson.JSON; import com.myimooc.kafka.example.common.ErrorCode; import com.myimooc.kafka.example.common.MessageEntity; import com.myimooc.kafka.example.common.Response; import com.myimooc.kafka.example.producer.SimpleProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; /** * <br> * 標題: 生產者Controller<br> * 描述: 生產者Controller<br> * 時間: 2018/09/09<br> * * @author zc */ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key"; @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka的消息:{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka消息發送成功!"); return new Response(ErrorCode.SUCCESS,"kafka消息發送成功"); } catch (Exception ex) { logger.error("kafka消息發送失敗:", ex); return new Response(ErrorCode.EXCEPTION,"kafka消息發送失敗"); } } }
7.編寫application.properties
##----------kafka配置 ## TOPIC kafka.topic.default=myimooc-kafka-topic # kafka地址 spring.kafka.bootstrap-servers=192.168.0.105:9092 # 生產者配置 spring.kafka.producer.retries=0 # 批量發送消息的數量 spring.kafka.producer.batch-size=4096 # 緩存容量 spring.kafka.producer.buffer-memory=40960 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消費者配置 spring.kafka.consumer.group-id=myimooc spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的線程數,用於提升併發量 spring.kafka.listener.concurrency=3
8.編寫ExampleApplication
package com.myimooc.kafka.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; /** * <br> * 標題: 啓動類<br> * 描述: 啓動類<br> * 時間: 2018/09/09<br> * * @author zc */ @SpringBootApplication @EnableKafka public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
爲何要支持事務
數據傳輸的事務定義
事務保證
避免殭屍實例
每一個事務Procedure分配一個 transactionl. id,在進程從新啓動時可以識別相同的Procedure實例
Kafka增長了一個與transactionl.id相關的epoch,存儲每一個transactionl.id內部元數據
一旦epoch被觸發,任務具備相同的transactionl.id和更舊的epoch的Producer被視爲殭屍,Kafka會拒絕來自這些Producer的後續事務性寫入
零拷貝簡介
文件傳輸到網絡的公共數據路徑
零拷貝過程(指內核空間和用戶空間的交互拷貝次數爲零)
文件傳輸到網絡的公共數據路徑演變
課程總結