慕課網_《Kafka流處理平臺》學習總結

慕課網《Kafka流處理平臺》學習總結html

第一章:課程介紹

1-1 課程介紹

課程介紹java

  • Kafka概念解析
  • Kafka結構設計
  • Kafka場景應用
  • Kafka高級特性

第二章:概念解析

2-1 發展背景

LinkedIn 開源git

  • Databus 分佈式數據同步系統
  • Cubert 高性能計算引擎
  • ParSeq Java異步處理框架
  • Kafka 分佈式發佈訂閱消息系統,流處理平臺

Kafka發展歷程github

  • LinkedIn 開發
  • 2011年初開源,加入Apache基金會
  • 2012年從Apache Incubator畢業
  • Apache頂級開源項目

Kafka的特性web

  • 能夠發佈和訂閱且記錄數據的流,相似於消息隊列
  • 數據流存儲的平臺,具有容錯能力
  • 在數據產生時就能夠進行處理

Kafka一般被用於spring

  • 構建實時數據流管道
  • 構建實時數據流處理

Kafka是什麼apache

  • 面向於數據流的生產、轉換、存儲、消費總體的流處理平臺
  • Kafka不單單是一個消息隊列

2-2 基本概念

Producer:數據生產者json

  • 消息和數據的生產者
  • 向Kafka的一個topic發佈消息的進程或代碼或服務

Consumer:數據消費者bootstrap

  • 消息和數據的消費者
  • 向Kafka訂閱數據(topic)而且處理其發佈的消息的進程或代碼或服務

Consumer Group:消費者組緩存

  • 對於同一個topic,會廣播給不一樣的Group
  • 一個Group中,只有一個Consumer能夠消費該消息

Broker:服務節點

  • Kafka集羣中的每一個Kafka節點

Topic:主題

  • Kafka消息的類別
  • 對數據進行區分、隔離

Partition:分區

  • Kafka中數據存儲的基本單元
  • 一個topic數據,會被分散存儲到多個Partition
  • 一個Partition只會存在一個Broker上
  • 每一個Partition是有序的

Replication:分區的副本

  • 同一個Partition可能會有多個Replication
  • 多個Replication之間數據是同樣的

Replication Leader:副本的老大

  • 一個Partition的多個Replication上
  • 須要一個Leader負責該Partition上與Producer和Consumer交互

Replication Manager:副本的管理者

  • 負責管理當前Broker全部分區和副本的信息
  • 處理KafkaController發起的一些請求
  • 副本狀態的切換
  • 添加、讀取消息等

2-3 概念延伸

Partition:分區

  • 每個Topic被切分爲多個Partition
  • 消費者數目少於或等於Partition的數目
  • Broker Group中的每個Broker保存Topic的一個或多個Partition
  • Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partition,而且是唯一的Consumer

Replication:分區的副本

  • 當集羣中有Broker掛掉的狀況,系統能夠主動地使Replication提供服務
  • 系統默認設置每個Topic的Replication係數爲1,能夠在建立Topic時單獨設置
  • Replication的基本單位是Topic的Partition
  • 全部的讀和寫都從Replication Leader進行,Replication Followers只是做爲備份
  • Replication Followers必須可以及時複製Replication Leader的數據
  • 增長容錯性與可擴展性

第三章:結構設計

3-1 基本結構

Kafka功能結構

clipboard.png

Kafka數據流勢

clipboard.png

Kafka消息結構

clipboard.png

  • Offset:當前消息所處於的偏移
  • Length:消息的長度
  • CRC32:校驗字段,用於校驗當前信息的完整性
  • Magic:不少分佈式系統都會設計該字段,固定的數字,用於快速斷定當前信息是否爲Kafka消息
  • attributes:可選字段,消息的屬性
  • Timestamp:時間戳
  • Key Length:Key的長度
  • Key:Key
  • Value Length:Value的長度
  • Value:Value

3-2 功能特色

Kafka特色:分佈式

  • 多分區
  • 多副本
  • 多訂閱者
  • 基於Zookeeper調度

Kafka特色:高性能

  • 高吞吐量
  • 低延遲
  • 高併發
  • 時間複雜度爲O(1)

Kafka特色:持久性與擴展性

  • 數據可持久化
  • 容錯性
  • 支持在線水平擴展
  • 消息自動平衡

第四章:場景應用

4-1 應用場景

Kafka應用場景

  • 消息隊列
  • 行爲跟蹤
  • 元信息監控
  • 日誌收集
  • 流處理
  • 事件源
  • 持久性日誌(commit log)

4-2 應用案例

Kafka簡單案例

  • 部署啓動
  • 簡單生產者
  • 簡單消費者

學習筆記

1.下載與安裝
Zookeeper下載:https://zookeeper.apache.org/releases.html#download
Kafka下載:http://kafka.apache.org/downloads
安裝:解壓、配置環境變量

2.Zookeeper啓動
解壓:tar -zxf zookeeper-3.4.12.tar.gz
目錄:cd zookeeper-3.4.12/bin
啓動:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties

3.Kafka啓動
解壓:tar -zxf kafka_2.12-2.0.0.tgz
目錄:cd kafka_2.12-2.0.0
啓動:sudo bin/kafka-server-start.sh  config/server.properties

4.使用控制檯操做生產者與消費者
建立Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic
查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
啓動生產者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic
啓動消費者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning
生產消息:first message
生產消息:second message

4-3 代碼案例

建立49-kafka-example的maven工程pom以下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>49-kafka</artifactId>
        <groupId>com.myimooc</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>49-kafka-example</artifactId>

    <properties>
        <spring.boot.version>2.0.4.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.編寫MessageEntity

package com.myimooc.kafka.example.common;

import java.util.Objects;

/**
 * <br>
 * 標題: 消息實體<br>
 * 描述: 消息實體<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
public class MessageEntity {
    /**
     * 標題
     */
    private String title;
    /**
     * 內容
     */
    private String body;

    @Override
    public String toString() {
        return "MessageEntity{" +
                "title='" + title + '\'' +
                ", body='" + body + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        MessageEntity that = (MessageEntity) o;
        return Objects.equals(title, that.title) &&
                Objects.equals(body, that.body);
    }

    @Override
    public int hashCode() {
        return Objects.hash(title, body);
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

2.編寫SimpleProducer

package com.myimooc.kafka.example.producer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * <br>
 * 標題: 生產者<br>
 * 描述: 生產者<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
@Component
public class SimpleProducer<T> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, Object entity) {
        logger.info("發送消息入參:{}", entity);
        ProducerRecord<String, Object> record = new ProducerRecord<>(
                topic,
                key,
                JSON.toJSONString(entity)
        );

        long startTime = System.currentTimeMillis();
        ListenableFuture<SendResult<String, Object>> future = this.kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("消息發送失敗:{}", ex);
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                long elapsedTime = System.currentTimeMillis() - startTime;

                RecordMetadata metadata = result.getRecordMetadata();
                StringBuilder record = new StringBuilder(128);
                record.append("message(")
                        .append("key = ").append(key).append(",")
                        .append("message = ").append(entity).append(")")
                        .append("send to partition(").append(metadata.partition()).append(")")
                        .append("with offset(").append(metadata.offset()).append(")")
                        .append("in ").append(elapsedTime).append(" ms");
                logger.info("消息發送成功:{}", record.toString());
            }
        });
    }
}

3.編寫SimpleConsumer

package com.myimooc.kafka.example.consumer;

import com.alibaba.fastjson.JSONObject;
import com.myimooc.kafka.example.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <br>
 * 標題: 消費者<br>
 * 描述: 消費者<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
@Component
public class SimpleConsumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = "${kafka.topic.default}")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判斷是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            //獲取消息
            Object message = kafkaMessage.get();

            MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class);

            logger.info("接收消息Topic:{}", topic);
            logger.info("接收消息Record:{}", record);
            logger.info("接收消息Message:{}", messageEntity);
        }
    }

}

4.編寫Response

package com.myimooc.kafka.example.common;

import java.io.Serializable;

/**
 * <br>
 * 標題: REST請求統一響應對象<br>
 * 描述: REST請求統一響應對象<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
public class Response implements Serializable {

    private static final long serialVersionUID = -972246069648445912L;
    /**
     * 響應編碼
     */
    private int code;
    /**
     * 響應消息
     */
    private String message;

    public Response() {
    }

    public Response(int code, String message) {
        this.code = code;
        this.message = message;
    }

    @Override
    public String toString() {
        return "Response{" +
                "code=" + code +
                ", message='" + message + '\'' +
                '}';
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

5.編寫ErrorCode

package com.myimooc.kafka.example.common;

/**
 * <br>
 * 標題: 錯誤編碼<br>
 * 描述: 錯誤編碼<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
public class ErrorCode {
    /**
     * 成功
     */
    public final static int SUCCESS = 200;
    /**
     * 失敗
     */
    public final static int EXCEPTION = 500;

}

6.編寫ProducerController

package com.myimooc.kafka.example.controller;

import com.alibaba.fastjson.JSON;
import com.myimooc.kafka.example.common.ErrorCode;
import com.myimooc.kafka.example.common.MessageEntity;
import com.myimooc.kafka.example.common.Response;
import com.myimooc.kafka.example.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

/**
 * <br>
 * 標題: 生產者Controller<br>
 * 描述: 生產者Controller<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpleProducer simpleProducer;

    @Value("${kafka.topic.default}")
    private String topic;

    private static final String KEY = "key";

    @PostMapping("/send")
    public Response sendKafka(@RequestBody MessageEntity message) {
        try {
            logger.info("kafka的消息:{}", JSON.toJSONString(message));
            this.simpleProducer.send(topic, KEY, message);
            logger.info("kafka消息發送成功!");
            return new Response(ErrorCode.SUCCESS,"kafka消息發送成功");
        } catch (Exception ex) {
            logger.error("kafka消息發送失敗:", ex);
            return new Response(ErrorCode.EXCEPTION,"kafka消息發送失敗");
        }
    }
}

7.編寫application.properties

##----------kafka配置
## TOPIC
kafka.topic.default=myimooc-kafka-topic
# kafka地址
spring.kafka.bootstrap-servers=192.168.0.105:9092
# 生產者配置
spring.kafka.producer.retries=0
# 批量發送消息的數量
spring.kafka.producer.batch-size=4096
# 緩存容量
spring.kafka.producer.buffer-memory=40960
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消費者配置
spring.kafka.consumer.group-id=myimooc
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的線程數,用於提升併發量
spring.kafka.listener.concurrency=3

8.編寫ExampleApplication

package com.myimooc.kafka.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * <br>
 * 標題: 啓動類<br>
 * 描述: 啓動類<br>
 * 時間: 2018/09/09<br>
 *
 * @author zc
 */
@SpringBootApplication
@EnableKafka
public class ExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}

第五章:高級特性

5-1 消息事務

爲何要支持事務

  • 知足「讀取-處理-寫入」模式
  • 流處理需求的不斷加強
  • 不許確的數據處理的容忍度不斷下降

數據傳輸的事務定義

  • 最多一次:消息不會被重複發送,最多被傳輸一次,但也有可能一次不傳輸
  • 最少一次:消息不會被漏發送,最少被傳輸一次,但也有可能被重複傳輸
  • 精確的一次(Exactly once):不會漏傳輸也不會重複傳輸,每一個消息都被傳輸一次且僅僅被傳輸一次,這是你們所指望的

事務保證

  • 內部重試問題:Procedure冪等處理
  • 多分區原子寫入
  • 避免殭屍實例

    每一個事務Procedure分配一個 transactionl. id,在進程從新啓動時可以識別相同的Procedure實例
    Kafka增長了一個與transactionl.id相關的epoch,存儲每一個transactionl.id內部元數據
    一旦epoch被觸發,任務具備相同的transactionl.id和更舊的epoch的Producer被視爲殭屍,Kafka會拒絕來自這些Producer的後續事務性寫入

5-2 零拷貝

零拷貝簡介

  • 網絡傳輸持久性日誌塊
  • Java Nio channel.transforTo()方法
  • Linux sendfile系統調用

文件傳輸到網絡的公共數據路徑

  • 第一次拷貝:操做系統將數據從磁盤讀入到內核空間的頁緩存
  • 第二次拷貝:應用程序將數據從內核空間讀入到用戶空間緩存中
  • 第三次拷貝:應用程序將數據寫回到內核空間到socket緩存中
  • 第四次拷貝:操做系統將數據從socket緩衝區複製到網卡緩衝區,以便將數據經網絡發出

零拷貝過程(指內核空間和用戶空間的交互拷貝次數爲零)

  • 第一次拷貝:操做系統將數據從磁盤讀入到內核空間的頁緩存
  • 將數據的位置和長度的信息的描述符增長至內核空間(socket緩存區)
  • 第二次拷貝:操做系統將數據從內核拷貝到網卡緩衝區,以便將數據經網絡發出

文件傳輸到網絡的公共數據路徑演變

clipboard.png

第六章:課程總結

6-1 課程總結

課程總結

  • Kafka基礎概念與結構
  • Kafka的特色
  • Kafka應用場景
  • Kafka應用案例
  • Kafka高級特性
相關文章
相關標籤/搜索