Kafka學習

其餘更多java基礎文章:
java基礎學習(目錄)java


學習資料:kafka數據可靠性深度解讀apache

Kafka概述

  • Kafka是一個分佈式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。
  • 不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。
  • 在流式計算中,Kafka通常用來緩存數據,Spark經過消費Kafka的數據進行計算。

Kafka架構

  • Producer :消息生產者,就是向kafka broker發消息的客戶端。
  • Consumer :消息消費者,向kafka broker取消息的客戶端
  • Topic :能夠理解爲一個隊列。
  • Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。
  • Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。
  • Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序。

分區

消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:bootstrap

咱們能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。緩存

  • 分區的緣由
    1. 方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;
    2. 能夠提升併發,由於能夠以Partition爲單位讀寫了。
  • 分區的原則
    1. 指定了patition,則直接使用;
    2. 未指定patition但指定key,經過對key的value進行hash出一個patition
    3. patition和key都未指定,使用輪詢選出一個patition。

副本(Replication)

同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker 宕機,其上全部 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。bash

寫入流程

  1. producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
  2. producer將消息發送給該leader
  3. leader將消息寫入本地log
  4. followers從leader pull消息,寫入本地log後向leader發送ACK
  5. leader收到全部ISR中的replication的ACK後,增長HW(high watermark,最後commit 的offset)並向producer發送ACK

ACK,HW,ISR等能夠閱讀kafka數據可靠性深度解讀學習
簡單來講:服務器

  • HW是HighWatermark的縮寫,是指consumer可以看到的此partition的位置
  • ISR (In-Sync Replicas),這個是指副本同步隊列。全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
  • ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的可以被收到。

Kafka消費過程分析

kafka提供了兩套consumer API:高級Consumer API和低級API。架構

高級API

  • 高級API優勢
    • 高級API 寫起來簡單
    • 不須要去自行去管理offset,系統經過zookeeper自行管理
    • 不須要管理分區,副本等狀況,系統自動管理
    • 消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着獲取數據(默認設置1分鐘更新一下zookeeper中存的的offset)
    • 可使用group來區分對同一個topic的不一樣程序訪問分離開來(不一樣的group記錄不一樣的offset,這樣不一樣程序讀取同一個topic纔不會由於offset互相影響)
  • 高級API缺點
    • 不能自行控制offset(對於某些特殊需求來講)
    • 不能細化控制如分區、副本、zk等

低級API

  • 低級 API 優勢
    • 可以開發者本身控制offset,想從哪裏讀取就從哪裏讀取。
    • 自行控制鏈接分區,對分區自定義進行負載均衡
    • 對zookeeper的依賴性下降(如:offset不必定非要靠zk存儲,自行存儲offset便可,好比存在文件或者內存中)
  • 低級API缺點
    • 太過複雜,須要自行控制offset,鏈接哪一個分區,找到分區leader 等。

消費者組

消費者是以consumer group消費者組的方式工做,由一個或者多個消費者組成一個組,共同消費一個topic。每一個分區在同一時間只能由group中的一個消費者讀取,可是多個group能夠同時消費這個partition。併發

在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也能夠叫作某個消費者是某個分區的擁有者。 在這種狀況下,消費者能夠經過水平擴展的方式同時讀取大量的消息。另外,若是一個消費者失敗了,那麼其餘的group成員會自動負載均衡讀取以前失敗的消費者讀取的分區。

JAVA中使用Kafka

生產者

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaProducerDemo {

    public static void main(String[] args){
        //test();
        test2();
    }

    public static void test(){
        Properties props= new Properties();
        props.put("bootstrap.servers", "172.26.40.181:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer(props);
        for(int i = 0; i < 10; i++){
            producer.send(new ProducerRecord("first",Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }

    /**
     * 帶回調函數
     */
    public static void test2(){
        Properties props = new Properties();
        // Kafka服務端的主機名和端口號
        props.put("bootstrap.servers", "172.26.40.181:9092");
        // 等待全部副本節點的應答
        props.put("acks", "all");
        // 消息發送最大嘗試次數
        props.put("retries", 0);
        // 一批消息處理大小
        props.put("batch.size", 16384);
        // 增長服務端請求延時
        props.put("linger.ms", 1);
// 發送緩存區內存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //攔截器
        List<String> interceptor = new ArrayList<>();
        interceptor.add("com.hiway.practice.kafka.TimeInterceptor");
        interceptor.add("com.hiway.practice.kafka.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        for (int i = 0; i < 50; i++) {

            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (metadata != null) {

                        System.err.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });
        }

        kafkaProducer.close();
    }

}
複製代碼

消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定義kakfa 服務的地址,不須要將全部broker指定上
        props.put("bootstrap.servers", "172.26.40.181:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自動確認offset
        props.put("enable.auto.commit", "true");
        // 自動確認offset的時間間隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 定義consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消費者訂閱的topic, 可同時訂閱多個
        consumer.subscribe(Arrays.asList("first"));

        while (true) {
            // 讀取數據,讀取超時時間爲100ms
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}
複製代碼

攔截器

public class TimeInterceptor implements ProducerInterceptor<String,String> {


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 統計成功和失敗的次數
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 保存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
複製代碼

錯誤總結

1. Uncaught error in kafka producer I/O thread錯誤
這個問題主要是服務器上的kafka版本和IDEA中的kafka版本不一致致使的。負載均衡

2.producer發送數據到集羣上無反應分佈式

將kafka/config/server.properties文件中advertised.listeners改成以下屬性。172.26.40.181是我虛擬機的IP。改完後重啓,OK了。Java端的代碼終於能通訊了 advertised.listeners=PLAINTEXT://172.26.40.181:9092 advertised.listeners上的註釋是這樣的:

#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
複製代碼

意思就是說:hostname、port都會廣播給producer、consumer。若是你沒有配置了這個屬性的話,則使用listeners的值,若是listeners的值也沒有配置的話,則使用 java.net.InetAddress.getCanonicalHostName()返回值(這裏也就是返回localhost了)。

相關文章
相關標籤/搜索