kafka 入門詳解

Kafka

Kafka 核心概念

什麼是 Kafka

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是爲處理實時數據提供一個統1、高吞吐、低延遲的平臺。其持久化層本質上是一個「按照分佈式事務日誌架構的大規模發佈/訂閱消息隊列」,這使它做爲企業級基礎設施來處理流式數據很是有價值。此外,Kafka能夠經過Kafka Connect鏈接到外部系統(用於數據輸入/輸出),並提供了Kafka Streams——一個Java流式處理庫。該設計受事務日誌的影響較大。java

基本概念

Kafka是一個分佈式數據流平臺,能夠運行在單臺服務器上,也能夠在多臺服務器上部署造成集羣。它提供了發佈和訂閱功能,使用者能夠發送數據到Kafka中,也能夠從Kafka中讀取數據(以便進行後續的處理)。Kafka具備高吞吐、低延遲、高容錯等特色。下面介紹一下Kafka中經常使用的基本概念:node

  • Broker

消息隊列中經常使用的概念,在Kafka中指部署了Kafka實例的服務器節點。linux

  • Topic

用來區分不一樣類型信息的主題。好比應用程序A訂閱了主題t1,應用程序B訂閱了主題t2而沒有訂閱t1,那麼發送到主題t1中的數據將只能被應用程序A讀到,而不會被應用程序B讀到。git

  • Partition

每一個topic能夠有一個或多個partition(分區)。分區是在物理層面上的,不一樣的分區對應着不一樣的數據文件。Kafka使用分區支持物理上的併發寫入和讀取,從而大大提升了吞吐量。github

  • Record

實際寫入Kafka中並能夠被讀取的消息記錄。每一個record包含了key、value和timestamp。web

  • Producer

生產者,用來向Kafka中發送數據(record)。spring

  • Consumer

消費者,用來讀取Kafka中的數據(record)。apache

  • Consumer Group

一個消費者組能夠包含一個或多個消費者。使用多分區+多消費者方式能夠極大提升數據下游的處理速度。bootstrap

kafka 核心名詞解釋

  • Topic(主題): 每一條發送到kafka集羣的消息均可以有一個類別,這個類別叫作topic,不一樣的消息會進行分開存儲,若是topic很大,能夠分佈到多個broker上,也能夠這樣理解:topic被認爲是一個隊列,每一條消息都必須指定它的topic,能夠說咱們須要明確把消息放入哪個隊列。對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際上也不必),所以Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。
  • Broker(代理): 一臺kafka服務器就能夠稱之爲broker.一個集羣由多個broker組成,一個broker能夠有多個topic
  • Partition(分區): 爲了使得kafka吞吐量線性提升,物理上把topic分紅一個或者多個分區,每個分區是一個有序的隊列。且每個分區在物理上都對應着一個文件夾,該文件夾下存儲這個分區全部消息和索引文件。

分區的表示: topic名字-分區的id每一個日誌文件都是一個Log Entry序列,每一個Log Entry包含一個4字節整型數值(值爲M+5),1個字節的"magic value",4個字節的CRC校驗碼,而後跟M個字節的消息這個log entries並不是由一個文件構成,而是分紅多個segment,每一個segment以該segment第一條消息的offset命名並以「.kafka」爲後綴。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍分區中每條消息都有一個當前Partition下惟一的64字節的offset,它指明瞭這條消息的起始位置,Kafka只保證一個分區的數據順序發送給消費者,而不保證整個topic裏多個分區之間的順序vim

  • Replicas(副本): 試想:一旦某一個Broker宕機,則其上全部的Partition數據都不可被消費,因此須要對分區備份。其中一個宕機後其它Replica必需要能繼續服務而且即不能形成數據重複也不能形成數據丟失。

若是沒有一個Leader,全部Replica均可同時讀/寫數據,那就須要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性很是難保證,大大增長了Replication實現的複雜性,同時也增長了出現異常的概率。而引入Leader後,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。
每個分區,根據複製因子N,會有N個副本,好比在broker1上有一個topic,分區爲topic-1, 複製因子爲2,那麼在兩個broker的數據目錄裏,就都有一個topic-1,其中一個是leader,一個replicas同一個Partition可能會有多個Replica,而這時須要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica做爲Follower從Leader中複製數據

  • Producer: Producer將消息發佈到指定的topic中,同時,producer還須要指定該消息屬於哪一個partition
  • Consumer: 本質上kafka只支持topic,每個consumer屬於一個consumer group,每一個consumer group能夠包含多個consumer。發送到topic的消息只會被訂閱該topic的每一個group中的一個consumer消費。若是全部的consumer都具備相同的group,這種狀況和queue很類似,消息將會在consumer之間均衡分配;若是全部的consumer都在不一樣的group中,這種狀況就是廣播模式,消息會被髮送到全部訂閱該topic的group中,那麼全部的consumer都會消費到該消息。kafka的設計原理決定,對於同一個topic,同一個group中consumer的數量不能多於partition的數量,不然就會有consumer沒法獲取到消息。
  • Offset: Offset專指Partition以及User Group而言,記錄某個user group在某個partiton中當前已經消費到達的位置。

kafka使用場景

目前主流使用場景基本以下:

  • 消息隊列(MQ)

在系統架構設計中,常常會使用消息隊列(Message Queue)——MQ。MQ是一種跨進程的通訊機制,用於上下游的消息傳遞,使用MQ可使上下游解耦,消息發送上游只須要依賴MQ,邏輯上和物理上都不須要依賴其餘下游服務。MQ的常見使用場景如流量削峯、數據驅動的任務依賴等等。在MQ領域,除了Kafka外還有傳統的消息隊列如ActiveMQ和RabbitMQ等。

  • 追蹤網站活動

Kafka最出就是被設計用來進行網站活動(好比PV、UV、搜索記錄等)的追蹤。能夠將不一樣的活動放入不一樣的主題,供後續的實時計算、實時監控等程序使用,也能夠將數據導入到數據倉庫中進行後續的離線處理和生成報表等。

  • Metrics

Kafka常常被用來傳輸監控數據。主要用來聚合分佈式應用程序的統計數據,將數據集中後進行統一的分析和展現等。

  • 日誌聚合

不少人使用Kafka做爲日誌聚合的解決方案。日誌聚合一般指將不一樣服務器上的日誌收集起來並放入一個日誌中心,好比一臺文件服務器或者HDFS中的一個目錄,供後續進行分析處理。相比於Flume和Scribe等日誌聚合工具,Kafka具備更出色的性能。

kafka 集羣搭建

安裝kefka集羣

因爲kafka依賴zookeeper環境因此先安裝zookeeper,zk安裝

安裝環境

linux: CentSO-7.5_x64
java: jdk1.8.0_191
zookeeper: zookeeper3.4.10
kafka: kafka_2.11-2.0.1
# 下載
$ wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

# 解壓
$ tar -zxvf kafka_2.11-2.1.0.tgz

# 編輯配置文件修改一下幾個配置
$ vim $KAFKA_HOME/config/server.properties

# 每臺服務器的broker.id都不能相同只能是數字
broker.id=1

# 修改成你的服務器的ip或主機名
advertised.listeners=PLAINTEXT://node-1:9092

# 設置zookeeper的鏈接端口,將下面的ip修改成你的IP稱或主機名
zookeeper.connect=node-1:2181,node-2:2181,node-3:2181

啓動Kafka集羣並測試

$ cd $KAFKA_HOME

# 分別在每一個節點啓動kafka服務(-daemon表示在後臺運行)
$ bin/kafka-server-start.sh -daemon config/server.properties

# 建立一個名詞爲 test-topic 的 Topic,partitions 表示分區數量爲3 --replication-factor 表示副本數量爲2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic

# 查看topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看topic狀態
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 查看topic詳細信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic

# 修改topic信息
$ bin/kafka-topics.sh --alter --topic test-topic --zookeeper localhost:2181 --partitions 5

# 刪除topic(簡單的刪除,只是標記刪除)
$ bin/kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181

# 在一臺服務器上建立一個 producer (生產者)
$ bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic test-topic

# 在一臺服務器上建立一個 consumer (消費者)
$ bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic test-topic --from-beginning

# 如今能夠在生產者的控制檯輸入任意字符就能夠看到消費者端有消費消息。

java 客戶端鏈接kafka

普通java形式

  • pom.xml
<dependencies>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>
  • JavaKafkaConsumer.java 消費者
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaConsumer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {
        properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
    }

    public static void main(String[] args) {

        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                // 將偏移設置到最開始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                logger.info("offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value());
            }
        }
    }

}
  • JavaKafkaProducer.java 生產者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaProducer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties;

    static {
        properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_BROKER);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
    }

    public static void main(String[] args) {

        Producer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 200; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String uuid = UUID.randomUUID().toString();
            producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid));
            logger.info("send message success key: {}, value: {}", i, uuid);
        }
        producer.close();
    }

}
  • KafkaClient.java
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

import java.util.*;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class KafkaClient {

    private final static String TOPIC = "kafka-test-topic";

    private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181";

    private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092";

    private static Properties properties = new Properties();

    static {
        properties.put("bootstrap.servers", KAFKA_BROKER);
    }

    /**
     * 建立topic
     */
    @Test
    public void createTopic() {
        AdminClient adminClient = AdminClient.create(properties);
        List<NewTopic> newTopics = Arrays.asList(new NewTopic(TOPIC, 1, (short) 1));
        CreateTopicsResult result = adminClient.createTopics(newTopics);
        try {
            result.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 建立topic
     */
    @Test
    public void create() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 建立一個3個分區2個副本名爲t1的topic
        AdminUtils.createTopic(zkUtils, "t1", 3, 2, new Properties(), RackAwareMode.Enforced$.MODULE$);
        zkUtils.close();
    }

    /**
     * 查詢topic
     */
    @Test
    public void listTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        // 獲取 topic 全部屬性
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "streaming-topic");

        Iterator it = props.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            System.err.println(entry.getKey() + " = " + entry.getValue());
        }
        zkUtils.close();
    }

    /**
     * 修改topic
     */
    @Test
    public void updateTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "log-test");
        // 增長topic級別屬性
        props.put("min.cleanable.dirty.ratio", "0.4");
        // 刪除topic級別屬性
        props.remove("max.message.bytes");
        // 修改topic 'test'的屬性
        AdminUtils.changeTopicConfig(zkUtils, "log-test", props);
        zkUtils.close();

    }

    /**
     * 刪除topic 't1'
     */
    @Test
    public void deleteTopic() {
        ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        AdminUtils.deleteTopic(zkUtils, "t1");
        zkUtils.close();
    }


}
  • log4j.properties 日誌配置
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

基於spirngboot整合kafka

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>spring-boot-kafka</artifactId>
    <groupId>com.andy</groupId>
    <version>1.0.7.RELEASE</version>
    
    <packaging>jar</packaging>
    <modelVersion>4.0.0</modelVersion>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.spring.platform</groupId>
                <artifactId>platform-bom</artifactId>
                <version>Cairo-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.3.RELEASE</version>
                <configuration>
                    <!--<mainClass>${start-class}</mainClass>-->
                    <layout>ZIP</layout>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • application.yml
spring:
  application:
    name: spring-jms
  kafka:
    bootstrap-servers: node-2:9092,node-3:9092,node-4:9092
    producer:
      retries:
      batch-size: 16384
      buffer-memory: 33554432
      compressionType: snappy
      acks: all
    consumer:
      group-id: 0
      auto-offset-reset: earliest
      enable-auto-commit: true
  • Message.java 消息
/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@ToString
public class Message<T> {

    private Long id;

    private T message;

    private Date time;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public T getMessage() {
        return message;
    }

    public void setMessage(T message) {
        this.message = message;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }
}
  • KafkaController.java 控制器
import com.andy.jms.kafka.service.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/kafka/{topic}")
    public String send(@PathVariable("topic") String topic, @RequestParam String message) {
        kafkaSender.send(topic, message);
        return "success";
    }

}
  • KafkaReceiver.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaReceiver {


    @KafkaListener(topics = {"order"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("record:{}", record);
            log.info("message:{}", message);
        }
    }
}
  • KafkaSender.java
import com.andy.jms.kafka.commen.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     *
     * @param topic
     * @param body
     */
    public void send(String topic, Object body) {
        Message<String> message = new Message<>();
        message.setId(System.currentTimeMillis());
        message.setMessage(body.toString());
        message.setTime(new Date());
        String content = null;
        try {
            content = objectMapper.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, content);
        log.info("send {} to {} success!", message, topic);
    }
}
  • 啓動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Leone
 * @since 2018-04-10
 **/
@SpringBootApplication
public class JmsApplication {
    public static void main(String[] args) {
        SpringApplication.run(JmsApplication.class, args);
    }
}

github地址

相關文章
相關標籤/搜索