Kafka流處理平臺

1. Kafka簡介

Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。java

Kafka具備如下特性:

  • 高吞吐量、低延遲:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒,每一個topic能夠分多個partition, consumer group 對partition進行consume操做。
  • 可擴展性:kafka集羣支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失
  • 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗)
  • 高併發:支持數千個客戶端同時讀寫

Kafka的使用場景:

  • 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生產者和消費者、緩存消息等。
  • 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
  • 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
  • 流式處理:好比spark streaming和storm
  • 事件源

經過上面的介紹也能夠看出:Kafka給自身的定位並不只僅是一個消息系統,而是經過發佈訂閱消息機制實現的分佈式流平臺。nginx

流平臺有三個關鍵的能力:

  • 發佈訂閱記錄流,和消息隊列或者企業新消息系統相似。
  • 以可容錯、持久的方式保存記錄流
  • 當記錄流產生時就進行處理

Kafka一般用於應用中的兩種廣播類型:

  • 在系統和應用間創建實時的數據管道,可以可信賴的獲取數據。
  • 創建實時的流應用,能夠處理或者響應數據流。

2. Kafka基本概念及延伸

2.1 基本概念

Producer:數據生產者web

  • 消息和數據的生產者
  • 向Kafka的一個topic發佈消息的進程或代碼或服務

Consumer:數據消費者spring

  • 消息和數據的消費者
  • 向Kafka訂閱數據(topic)而且處理其發佈的消息的進程或代碼或服務

Consumer Group:消費者組apache

  • 對於同一個topic,會廣播給不一樣的Group
  • 一個Group中,只有一個Consumer能夠消費該消息

Broker:服務節點json

  • Kafka集羣中的每一個Kafka節點

Topic:主題bootstrap

  • Kafka消息的類別
  • 對數據進行區分、隔離

Partition:分區緩存

  • Kafka中數據存儲的基本單元
  • 一個topic數據,會被分散存儲到多個Partition
  • 一個Partition只會存在一個Broker上
  • 每一個Partition是有序的

Replication:分區的副本服務器

  • 同一個Partition可能會有多個Replication
  • 多個Replication之間數據是同樣的

Replication Leader:副本的老大網絡

  • 一個Partition的多個Replication上
  • 須要一個Leader負責該Partition上與Producer和Consumer交互

Replication Manager:副本的管理者

  • 負責管理當前Broker全部分區和副本的信息
  • 處理KafkaController發起的一些請求
  • 副本狀態的切換
  • 添加、讀取消息等

2.2 概念延伸

Partition:分區

  • 每個Topic被切分爲多個Partition
  • 消費者數目少於或等於Partition的數目
  • Broker Group中的每個Broker保存Topic的一個或多個Partition
  • Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partition,而且是唯一的Consumer

Replication:分區的副本

  • 當集羣中有Broker掛掉的狀況,系統能夠主動地使Replication提供服務
  • 系統默認設置每個Topic的Replication係數爲1,能夠在建立Topic時單獨設置
  • Replication的基本單位是Topic的Partition
  • 全部的讀和寫都從Replication Leader進行,Replication Followers只是做爲備份
  • Replication Followers必須可以及時複製Replication Leader的數據
  • 增長容錯性與可擴展性

3. 基本結構

Kafka功能結構

 

Kafka數據流勢

 

Kafka消息結構

  • Offset:當前消息所處於的偏移
  • Length:消息的長度
  • CRC32:校驗字段,用於校驗當前信息的完整性
  • Magic:不少分佈式系統都會設計該字段,固定的數字,用於快速斷定當前信息是否爲Kafka消息
  • attributes:可選字段,消息的屬性
  • Timestamp:時間戳
  • Key Length:Key的長度
  • Key:Key
  • Value Length:Value的長度
  • Value:Value

 4. Kafka安裝部署

Kafka依賴於zookeeper實現分佈式系統的協調,因此須要同時安裝zookeeper。兩個的安裝包到官網下載。

4.1 zookeeper安裝配置

在zookeeper解壓後的目錄下找到conf文件夾,進入後,複製文件zoo_sample.cfg,並命名爲zoo.cfg。zoo.cfg中一共五個配置項,可使用默認配置。

 

4.2 Kafka安裝配置

進入kafka根目錄下的config文件夾下,打開server.properties,修改以下配置項(通常默認即爲以下,無需修改)

zookeeper.connect=localhost:2181
broker.id=0
log.dirs=/tmp/kafka-logs

另外,config文件夾下也包含有zookeeper的配置文件,能夠在其中設置配置項,啓動zookeeper時引用這個配置文件,實現定製化。

Kafka的bin目錄包含了大多數功能的啓動腳本,能夠經過它們控制Kafka的功能開啓。

 啓動Kafka

4.3 使用控制檯操做生產者和消費者

建立Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-kafka-topic
查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
啓動生產者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic
啓動消費者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning
生產消息:first message
生產消息:second message

 5. 代碼示例

引入依賴pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zang</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

相應實體

package com.zang.kafka.common;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;


/**
 * 〈消息實體〉<br>
 */
@Getter
@Setter
@EqualsAndHashCode
@ToString
public class MessageEntity {
    /**
     * 標題
     */
    private String title;
    /**
     * 內容
     */
    private String body;

}
package com.zang.kafka.common;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;


/**
 * 〈REST請求統一響應對象〉<br>
 */
@Getter
@Setter
public class Response implements Serializable{

    private static final long serialVersionUID = -1523637783561030117L;
    /**
     * 響應編碼
     */
    private int code;
    /**
     * 響應消息
     */
    private String message;

    public Response(int code, String message) {
        this.code = code;
        this.message = message;
    }
}
package com.zang.kafka.common;

/**
 * 〈錯誤編碼〉<br>
 */
public class ErrorCode {

    /**
     * 成功
     */
    public final static int SUCCESS = 200;
    /**
     * 失敗
     */
    public final static int EXCEPTION = 500;

}

生產者

package com.zang.kafka.producer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 〈生產者〉
 */
@Component
public class SimpleProducer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, Object entity) {
        logger.info("發送消息入參:{}", entity);
        ProducerRecord<String, Object> record = new ProducerRecord<>(
                topic,
                key,
                JSON.toJSONString(entity)
        );

        long startTime = System.currentTimeMillis();
        ListenableFuture<SendResult<String, Object>> future = this.kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("消息發送失敗:{}", ex);
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                long elapsedTime = System.currentTimeMillis() - startTime;

                RecordMetadata metadata = result.getRecordMetadata();
                StringBuilder record = new StringBuilder(128);
                record.append("message(")
                        .append("key = ").append(key).append(",")
                        .append("message = ").append(entity).append(")")
                        .append("send to partition(").append(metadata.partition()).append(")")
                        .append("with offset(").append(metadata.offset()).append(")")
                        .append("in ").append(elapsedTime).append(" ms");
                logger.info("消息發送成功:{}", record.toString());
            }
        });
    }

}

消費者

package com.zang.kafka.consumer;

import com.alibaba.fastjson.JSONObject;
import com.zang.kafka.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 〈消費者〉<br>
 */
@Component
public class SimpleConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = "${kafka.topic.default}")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判斷是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            //獲取消息
            Object message = kafkaMessage.get();

            MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class);

            logger.info("接收消息Topic:{}", topic);
            logger.info("接收消息Record:{}", record);
            logger.info("接收消息Message:{}", messageEntity);
        }
    }

}

控制器

package com.zang.kafka.controller;

import com.alibaba.fastjson.JSON;
import com.zang.kafka.common.ErrorCode;
import com.zang.kafka.common.MessageEntity;
import com.zang.kafka.common.Response;
import com.zang.kafka.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

/**
 * 〈生產者〉<br>
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpleProducer simpleProducer;

    @Value("${kafka.topic.default}")
    private String topic;
    private static final String KEY = "key";/**
     * 消息發送
     * @param message
     * @return
     */
    @PostMapping("/send")
    public Response sendKafka(@RequestBody MessageEntity message) {
        try {
            logger.info("kafka的消息:{}", JSON.toJSONString(message));
            this.simpleProducer.send(topic, KEY, message);
            logger.info("kafka消息發送成功!");
            return new Response(ErrorCode.SUCCESS,"kafka消息發送成功");
        } catch (Exception ex) {
            logger.error("kafka消息發送失敗:", ex);
            return new Response(ErrorCode.EXCEPTION,"kafka消息發送失敗");
        }
    }
}

配置application.properties

##----------kafka配置
## TOPIC
kafka.topic.default=my-kafka-topic
# kafka地址
spring.kafka.bootstrap-servers=47.88.156.142:9092
# 生產者配置
spring.kafka.producer.retries=0
# 批量發送消息的數量
spring.kafka.producer.batch-size=4096
# 緩存容量
spring.kafka.producer.buffer-memory=40960
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消費者配置
spring.kafka.consumer.group-id=my
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的線程數,用於提升併發量
spring.kafka.listener.concurrency=3

啓動類

package com.zang.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

}

6. Kafka的高級特性

6.1 消息事務

爲何要支持事務

  • 知足「讀取-處理-寫入」模式
  • 流處理需求的不斷加強
  • 不許確的數據處理的容忍度不斷下降

數據傳輸的事務定義

  • 最多一次:消息不會被重複發送,最多被傳輸一次,但也有可能一次不傳輸
  • 最少一次:消息不會被漏發送,最少被傳輸一次,但也有可能被重複傳輸
  • 精確的一次(Exactly once):不會漏傳輸也不會重複傳輸,每一個消息都被傳輸一次且僅僅被傳輸一次,這是你們所指望的

事務保證

  • 內部重試問題:Procedure冪等處理
  • 多分區原子寫入
  • 避免殭屍實例

    •  每一個事務Procedure分配一個 transactionl. id,在進程從新啓動時可以識別相同的Procedure實例
    •  Kafka增長了一個與transactionl.id相關的epoch,存儲每一個transactionl.id內部元數據
    •  一旦epoch被觸發,任務具備相同的transactionl.id和更舊的epoch的Producer被視爲殭屍,Kafka會拒絕來自這些Producer的後續事務性寫入

6.2 零拷貝

零拷貝簡介

  • 經過網絡傳輸持久性日誌塊
  • 使用Java Nio channel.transforTo()方法實現
  • 底層使用Linux sendfile系統調用

文件傳輸到網絡的公共數據路徑

  • 第一次拷貝:操做系統將數據從磁盤讀入到內核空間的頁緩存
  • 第二次拷貝:應用程序將數據從內核空間讀入到用戶空間緩存中
  • 第三次拷貝:應用程序將數據寫回到內核空間到socket緩存中
  • 第四次拷貝:操做系統將數據從socket緩衝區複製到網卡緩衝區,以便將數據經網絡發出

零拷貝過程(指內核空間和用戶空間的交互拷貝次數爲零)

  • 第一次拷貝:操做系統將數據從磁盤讀入到內核空間的頁緩存
  • 將數據的位置和長度的信息的描述符增長至內核空間(socket緩存區)
  • 第二次拷貝:操做系統將數據從內核拷貝到網卡緩衝區,以便將數據經網絡發出

 

 

 

來源:

慕課網課程:https://www.imooc.com/learn/1043

參考:

https://blog.csdn.net/liyiming2017/article/details/82790574

https://blog.csdn.net/YChenFeng/article/details/74980531

相關文章
相關標籤/搜索