初步認識Kafka

1.什麼是Kafkajava

Apache Kafka是一個 開源的分佈式消息隊列 (生產者消費者模式) Apache Kafka 目標:構建企業中統一的. 高通量的,低延時的消息平臺 ; 大多的消息隊列是基於JMS標準實現的Apache Kafka l相似於JMS的實現.node

2.Kafka的特色算法

做爲緩衝(流量消減),來異構,解耦系統apache

3.基本架構bootstrap

Kafka Cluster:由多個服務器組成。每一個服務器單獨的名字broker(掮客)。緩存

kafka broker:kafka集羣中包含的服務器服務器

Kafka Producer:消息生產者、發佈消息到 kafka 集羣的終端或服務。架構

Kafka consumer:消息消費者、負責消費數據。負載均衡

Kafka Topic: 主題,一類消息的名稱。存儲數據時將一類數據存放在某個topic下,消費數據也是消費一類數據。異步

​ 訂單系統:建立一個topic,叫作order。

​ 用戶系統:建立一個topic,叫作user。

​ 商品系統:建立一個topic,叫作product。

 

注意:Kafka的元數據都是存放在zookeeper中。

4.Kafka的基本使用

4.1使用腳本操做Kafka

1) 建立一個topic

./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic order

2) 使用Kafka 自帶一個命令客戶端啓動一個生產者,生產數據
./kafka-console-producer.sh --broker-list node01:9092 --topic order

3) 使用Kafka自帶一個命令客戶端啓動一個消費者,消費數據

./kafka-console-consumer.sh --bootstrap-server node01:9092  --topic order
該消費語句,只能獲取最新的數據,要想歷史數據,須要添加選項-

4)查看有哪些topic

./kafka-topics.sh --list --zookeeper node01:2181

5) 查看某一個具體的Topic的詳細信息

./kafka-topics.sh --describe --topic order --zookeeper node01:2181

6) 刪除topic

./kafka-topics.sh --delete --topic order --zookeeper node01:2181

4.2 使用java API 操做kafka

第一步: 添加kafka相關的依賴

<dependency>     
    <groupId>org.apache.kafka</groupId>     
    <artifactId>kafka-clients</artifactId>     
    <version>0.11.0.1</version>   
</dependency>

4.2.1 編寫生產者 :

// kakfa的生產者
public class KafkaProducerTest {


    public static void main(String[] args) {

        //1. 建立 kafka的生產者的對象

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
        props.put("acks", "all");  // 消息確認機制 : all 最高級別, 保證數據不會丟失
        props.put("retries", 0); // 重試 : 0 表示發送失敗, 不會重試
        props.put("batch.size", 16384);  // 發送數據時候 一批數據的大小    默認值: 16384字節(16)
        props.put("linger.ms", 1);    // 每次發送數據間隔時間
        props.put("buffer.memory", 33554432); //  緩存池的大小: 默認值:  33554432   32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 效率, java的序列化慢
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
        //2. 發送消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("order", "我是JavaAPI發過來...");
        producer.send(record);
        //3. 關閉資源

        producer.close();
    }
}

4.2.2 編寫消費者

package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

// kafka的消費者
public class KafkaConsumerTest {


    public static void main(String[] args) {


        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
        props.put("group.id", "test"); // 組id號
        props.put("enable.auto.commit", "true");  // 開啓自動提交
        props.put("auto.commit.interval.ms", "1000"); //  每隔多長時間自動提交一次
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //1. 建立 kafka的消費者的對象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //2.  讓消費者訂閱一個topic
        consumer.subscribe(Arrays.asList("order"));

        //3. 獲取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100); // 取出元素,
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            }
        }

    }
}

5 Apache Kafka原理

5.1 分片與副本機制 :

​ 分片機制:主要解決了單臺服務器存儲容量有限的問題

​ 當數據量很是大的時候,一個服務器存放不了,就將數據分紅兩個或者多個部分,存放在多臺服務器上。每一個服務器上的數據,叫作一個分片

副本:副本備份機制解決了數據存儲的高可用問題

​ 當數據只保存一份的時候,有丟失的風險。爲了更好的容錯和容災,將數據拷貝幾份,保存到不一樣的機器上。在放置副本的時候:

​ 假設有三個分片, 三個副本, 共計9個節點

​ 在三臺服務器上 各放置一個分片的副本

​ 第二個副本放置在和這臺服務器同機架上

​ 第三個副本放置在不一樣的機架的服務器上

5.2 Kafka 保證數據不丟失機制

5.2.1保證生產端數據不丟失機制

1)消息生產分爲同步方式和異步方式

2)消息確認分爲三個狀態

a)0:生產者只負責發送數據

b)1:某個分片的leader收到數據給出響應

C)-1:某個分片的副本都收到數據後給出響應

3在同步方式下

生產者等待10秒,若是broke沒有給出ack響應,就認爲失敗

生產者主動將進行重試3次若是尚未響應就報錯

在異步模式下

a)先將數據保存在生產端的buffer中 Buffer大小是2萬條

b)知足數據閥值或者數量(時間)閥值其中的一個條件就能夠發送數據

c 發送一批數據的大小是500條

5,2,2 broker端消息不丟失

broker端的信息不丟失其實就是用partition和副本機制(高可用)來保證

producer ack -1(all)可以保證全部的副本都同步好了數據,其中一臺機器了並不影響數據的完整性

5.2.3 消費端消息不丟失

offSet:偏移量

經過offset commit 來保證數據的不丟失,kafka本身記錄了每次消費的offset數值,下次繼續消費的時候,會接着上次的offset進行消費。

5.3 消息存儲及查詢機制

5.3.1 文件存儲機制

數據都是順序存儲在磁盤中

segment段中有兩個核心的文件一個是log,一個是index。 當log文件等於1G時,新的消息會寫入到下一個segment中。

5.3.2 文件查詢機制

需求: 讀取 offset=368776 的message消息數據

5.4 生產者數據分發策略

1) 若是是用戶制定了parttion ,數據分發策略的時候能夠指定數據發往哪一個parttion

當producerRecord 的構造參數中有partition的時候就能夠發送到對應的parttion上

2) 當用戶指定key, 默認使用hash算法  能夠定義分發策略的算法

3) 當用戶既沒有指定partition 也沒有Key 使用輪詢的方式發送數據

5.5 消費者負載均衡機制

一個parttion能夠被一個組中的某一個成員消費

因此若是消費組中有多於partition數量的消費者,那麼必定會有消費者沒法消費數據。 Kafka 消費者在消費數據的時候 若是數據量過大 如何解決消費者慢的問題

1)在儘量硬件知足的狀況下 多加分片 並將同組中消費者數量和分片數量保持一致

2) 提升消費者處理數據的能力優化代碼

相關文章
相關標籤/搜索