Kafka實戰解惑

#1、Kafka簡介# Kafka是LinkedIn使用Scala開發的一個分佈式消息中間件,它以水平擴展能力和高吞吐率著稱,被普遍用於日誌處理、ETL等應用場景。Kafka具備如下主要特色:html

  • **消息的發佈、訂閱均具備高吞吐量:**據統計數字代表,Kafka每秒能夠生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
  • **消息可持久化:**消息可持久化到磁盤而且經過Replication機制防止數據丟失。
  • **分佈式系統,可水平擴展:**全部的生產者(Producer)、消費者(Consumer)、消息中間存儲(Broker Server)均可實現多實例分佈式部署,且在不停機狀況下實現水平擴展。
  • **高可靠性:**消息被處理的狀態由消費者同步到Zookeeper而非Broker Server中,當Broker Server失效時,經過副本切換機制選擇一個新的Broker Server,消費者在從Zookeeper中讀取以前消費消息的位置,不會引發消息丟失。
  • 支持Online和Offline的場景。

隨着Kafka開源後被業界成功且普遍的使用,LinkedIn開發Kafka的核心技術人員Jay Kreps離開LinkedIn成立了一個新公司Confluent,打造了一個基於Kafka且擁有更爲豐富的產品線,意圖構建一個基於Kafka的生態系統,與Kafka相比,Confluent包含了更多的組件:java

  • Confluent Control Center(閉源):,管理和監控Kafka最全面的GUI系統
  • Confluent Kafka Connectors(開源): 用於鏈接SQL數據庫、Hadoop、Hive。
  • Confluent Kafka Clients(開源): 爲C/C++、Python等語言提供客戶端。
  • Confluent Kafka REST Proxy(開源): 容許一些系統經過HTTP和Kafka之間發送和接收消息。
  • Confluent Schema Registry(開源): 幫助肯定每個應用使用正確的schema寫數據或者讀數據到Kafka中。

輸入圖片說明

#2、 Kafka架構方案# 從物理結構上看,整個Kafka系統由消息生產者(Producer)、消息消費者(Consumer)、消費存儲服務器(Broker Server)外加Zookeeper構成。整個Kafka的架構方案很是簡單,典型的無狀態水平擴展架構,經過水平增長Broker實例實現系統的高吞吐率,而有狀態的數據則存儲到Zookeeper中。
Alt text
Kafka採用Push-Pull模式,生產者發送消息時,可根據策略將消息存儲在Kafka集羣的任意一臺Broker上,消費者經過定時輪詢(非固定週期)的方式從Broker上取得消息。消息發送到哪一臺服務器上,又從哪臺服務器上獲取消息,則是由邏輯結構解決的,或者說邏輯結構創建在物理結構基礎上,對於生產者、消費者而言,只要瞭解邏輯結構就能夠了。
Alt text
從邏輯上講,一個Kafka集羣中包含若干個消息隊列,每一個消息隊列都有本身的名稱,在Kafka中消息隊列的名稱被稱爲Topic,爲了實現系統的高吞吐率,每一個消息隊列被拆分紅不一樣部分,即咱們所說的分區(Partition),分區存儲在不一樣的Broker中。生產者發送消息時可根據必定策略發送到不一樣的分區中,這相似於數據庫的分庫分表操做,一樣消費者拉取消息時,也能夠根據必定策略從某個分區中讀取消息。就物理結構而言,每一個分區就是Broker上的一組文件,試想一下併發的對多個分佈在不一樣Broker上的文件進行讀寫,性能固然顯著優於對單臺Broker上的文件進行讀寫,咱們所說的Kafka具備高吞吐率就是這個道理。linux

Kafak每一個Topic的消息都存儲在日誌文件中,Kafka消息日誌文件由一個索引文件和若干個具體的消息文件構成。每一個消息文件都由起始消息編號構成,經過索引能夠快速定位消息文件進行讀寫,因爲消息是順序寫入文件中,因此讀寫效率很是高。在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差很少是600MB/s,可是隨即寫的速度倒是100k/s,差了差很少6000倍。現代的操做系統都對讀寫作了大量的優化,好比使用read-ahead和write-behind技巧,讀取時候成塊的預讀取數據,寫入時將各類微小瑣碎的邏輯組織合併成一次較大的物理寫入,不少時候線性讀寫磁盤比隨機讀取內存都快。數據庫

與其餘常見的消息隊列不一樣,Kafka有一個叫作消費組的概念,多個消費者被邏輯上合併在一塊兒叫作消費組。一個消息隊列理論上可擁有無限個消費組,消費組是Kafka有別於其餘消息隊列的一個重要概念,同一個分區的消息只能被一個消費組內的某個消費者讀取,但其餘消費組內的消費者仍然可讀取這個分區的消費。以下圖所示整個Kafka消息隊列由兩個broker server構成,server1上包含兩個分區p0、p3,server2上包含兩個分區p一、p2。如今有兩個消費組A、B,消費組A中包含兩個消費者C一、C2,消費組B中包含4個消費者C三、C四、C五、C6。那麼假定P0分區上有一條消息。Consumer Group A中的C一、C2其中之一會消費這條消息,Consumer Group B中的C三、C四、C五、C6其中之一也會消費這條消息,也就是說兩個消費組A、B中的消費者都會同時消費這條消息,而組內只能有一個消費者消費這條消息。
Alt text
咱們所說的C一、C2只是一個邏輯上的劃分就具體實現而言,C一、C2能夠是一個進程內部的兩個線程,也能夠是兩個獨立的進程,對於C三、C四、C五、C6也是一樣的道理。咱們知道Kafka每一個分區中的消息都是以順序結構保存到文件中的,那麼消費者每次從什麼位置讀取消息呢,奧祕就是每一個消費者都保存Offset到Zookeeper中。
Alt text
如前所述,Kafka是一個Push-Pull模式的消息隊列,而且能夠有多個生產者、多個消費者,那麼這些生產者和消費者是如何協同工做的呢?首先咱們來看生產者怎麼肯定把消費發送到哪一個分區上。默認狀況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。apache

def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}

這就保證了相同key的消息必定會被路由到相同的分區。若是你沒有指定key,那麼Kafka是如何肯定這條消息去往哪一個分區的呢?咱們來看下面的代碼:bootstrap

if(key == null) {  // 若是沒有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有沒有緩存的現成的分區Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 若是有的話直接使用這個分區Id就行了
          case None => // 若是沒有的話,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出全部可用分區的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 從中隨機挑一個
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
            partitionId
        }
      }

能夠看出,Kafka幾乎就是隨機找一個分區發送無key的消息,而後把這個分區號加入到緩存中以備後面直接使用——固然了,Kafka自己也會清空該緩存(默認每10分鐘或每次請求Topic元數據時清空緩存)。api

接下來咱們來看消費者如何獲取消息。對於消費者Kafka提供的兩種分配策略: range和roundrobin,由參數 partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那麼每一個線程都分配哪些分區呢?緩存

C0 消費分區 0, 1, 2, 3 C1 消費分區 4, 5, 6 C2 消費分區 7, 8, 9安全

爲了保證高可靠,Kafka每一個分區都有必定數量的副本,當故障發生時經過Zookeeper選擇其一做爲領導者,Kafka採用同步複製機制,寫Leader完成後在寫副本。若是某個副本寫失敗,則將這個副本從當前分區一致集合中摘除,後期根據必定策略在進行異步補償,將不一致狀態變爲一致狀態。極端狀況下若是全部副本寫入均失敗,變爲不一致狀態,若是在變成一致狀態前Leader崩潰,那麼消息纔可能真正丟失,但極端狀況很難出現,一旦出現這種極端狀況,任何系統都無能爲力了,因此咱們說Kafka仍是很是可靠的。bash

#3、 Kafka最小安裝# 咱們如今使用192.168.104.10一、192.168.104.102兩臺Centos 6服務器安裝Kafka。安裝Kafka以前首先須要安裝Zookeeper,爲了簡便起見咱們採用單機僞分佈式集羣安裝Zookeeper,將Zookeeper安裝在192.168.104.101這臺服務器上,並啓動三個實例,組成高可靠的Zookeeper集羣。

接下來咱們安裝Kafka_2.11-0.10.0.1,由於咱們有192.168.104.10一、192.168.104.102兩臺服務器,所以咱們能夠構建一個完整的Kafka集羣。在192.168.104.101服務器上修改Kafka安裝目錄下的config/server.properties文件,設置以下參數:
broker.id=0
listeners=PLAINTEXT://192.168.104.101:9092
advertised.listeners=PLAINTEXT://192.168.104.101:9092
zookeeper.connect=192.168.104.101:2181,192.168.104.101:2182,192.168.104.101:2183
對於192.168.104.102這臺機器,咱們將listeners、advertised.listeners中的ip地址改成192.168.104.102。通過上述設置咱們能夠在兩個服務器上分別使用bin/kafka-server-start.sh config/server.properties命令啓動Kafka集羣了。

#4、 Kafka Client API# 如前所述Kafka是一個消息隊列,生產者發送消息到Kafka,消費者從Kafka中拉取消息,所以Kafka提供生產者、消費者兩類API供程序開發使用。咱們先來看一個生產者、消費者的簡單例子,瞭解一下Kafka Client API的基本用法,然後在深刻了解Kafka Client API的細節。

##4.1 Producers API##

package com.Kafka.sample.newapi;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
	
import java.util.Properties;

public class Producer {
    public void run() throws InterruptedException {
        KafkaProducer<String, String> producer = getProducer();

        int i = 0;

        while (true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(ClientConfig.TOPICS, String.valueOf(i), "This is message: " + i);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                    }
                }
            });

            i++;

            Thread.sleep(1000);
        }
    }

    private KafkaProducer<String, String> getProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);

        return kp;
    }

    public static void main(String[] args) {
        Producer producer = new Producer();

        try {
            producer.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Kafka 0.82版以後,提供新的API,對於生產者的API來說,使用邏輯比較簡單,推薦使用新API向Kafka發送消息。向Kafka發送消息時首先須要構建一個KafkaProducer對象,並設置發送消息的一些參數。Producer端的經常使用配置有:

  • bootstrap.servers:Kafka集羣鏈接串,能夠由多個host:port組成
  • acks:broker消息確認的模式,有三種: 0:不進行消息接收確認,即Client端發送完成後不會等待Broker的確認 1:由Leader確認,Leader接收到消息後會當即返回確認信息 all:集羣完整確認,Leader會等待全部in-sync的follower節點都確認收到消息後,再返回確認信息。 咱們能夠根據消息的重要程度,設置不一樣的確認模式。默認爲1
  • retries:發送失敗時Producer端的重試次數,默認爲0
  • batch.size:當同時有大量消息要向同一個分區發送時,Producer端會將消息打包後進行批量發送。若是設置爲0,則每條消息都獨立發送。默認爲16384字節。
  • linger.ms:發送消息前等待的毫秒數,與batch.size配合使用。在消息負載不高的狀況下,配置linger.ms可以讓Producer在發送消息前等待必定時間,以積累更多的消息打包發送,達到節省網絡資源的目的。默認爲0。
  • key.serializer/value.serializer:消息key/value的序列器Class,根據key和value的類型決定。
  • buffer.memory:消息緩衝池大小。還沒有被髮送的消息會保存在Producer的內存中,若是消息產生的速度大於消息發送的速度,那麼緩衝池滿後發送消息的請求會被阻塞。默認33554432字節(32MB)。

##4.2 Consumers API## 相比起Producers API的便宜使用,Consumer API的使用要複雜不少,核心問題就是如何高可靠的處理消息,保證消息不丟失。Kafka爲了保證消息不丟失能被消費者成功的處理,在消費者處理消息成功後須要向Kafka發送確認確認消息被成功的消費。

package com.Kafka.sample.newapi;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public void run() {
        KafkaConsumer<String, String> consumer = getConsumer();
        consumer.subscribe(Arrays.asList(ClientConfig.TOPICS));

        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
        }
    }

    private KafkaConsumer<String, String> getConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
        props.put("group.id", "1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);

        return kc;
    }

    public static void main(String[] args) throws Exception{
        Consumer consumer = new Consumer();
        consumer.run();
    }
}

上面的代碼很容易看懂,但props.put("auto.commit.interval.ms", "1000")下文中會特殊說明一下,這涉及到Kafka消息的高可靠處理。

事實上,Kafka不只僅可以發送String類型的消息,也能夠發送其餘類型的消息,但這須要將消息轉換爲二進制格式。

##4.3 消息高可靠 At-Least-Once## 網上各類文章常常談到Kafka丟消息問題,那麼Kakfa真的不可靠,只能用在容許有必定錯誤的系統中嗎?這個問題還得從Kaka的設計初衷來看。

Kafka最初是被LinkedIn設計用來處理log的分佈式消息系統,所以它的着眼點不在數據的安全性(log偶爾丟幾條無所謂),換句話說Kafka並不能徹底保證數據不丟失。儘管Kafka官網聲稱可以保證at-least-once,但若是consumer進程數小於partition_num,這個結論不必定成立。考慮這樣一個case,partiton_num=2,啓動一個consumer進程訂閱這個topic,對應的,stream_num設爲2,也就是說啓兩個線程並行處理message。若是auto.commit.enable=true,當consumer fetch了一些數據但尚未徹底處理掉的時候,恰好到commit interval出發了提交offset操做,接着consumer crash掉了。這時已經fetch的數據尚未處理完成但已經被commit掉,所以沒有機會再次被處理,數據丟失。若是auto.commit.enable=false,假設consumer的兩個fetcher各自拿了一條數據,而且由兩個線程同時處理,這時線程t1處理完partition1的數據,手動提交offset,這裏須要着重說明的是,當手動執行commit的時候,其實是對這個consumer進程所佔有的全部partition進行commit,Kafka暫時尚未提供更細粒度的commit方式,也就是說,即便t2沒有處理完partition2的數據,offset也被t1提交掉了。若是這時consumer crash掉,t2正在處理的這條數據就丟失了。若是但願可以嚴格的不丟數據,解決辦法有兩個:

  1. 手動commit offset,並針對partition_num啓一樣數目的consumer進程,這樣就能保證一個consumer進程佔有一個partition,commit offset的時候不會影響別的partition的offset。但這個方法比較侷限,由於partition和consumer進程的數目必須嚴格對應。
  2. 另外一個方法一樣須要手動commit offset,另外在consumer端再將全部fetch到的數據緩存到queue裏,當把queue裏全部的數據處理完以後,再批量提交offset,這樣就能保證只有處理完的數據才被commit。固然這只是基本思路,實際上操做起來不是這麼簡單,具體作法之後我再另開一篇。

##4.4 消息高可靠Consumer##

public class ManualOffsetConsumer {
    private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
    
    public ManualOffsetConsumer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //設置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //設置consumer group name
        props.put("group.id","manual_g1");

        props.put("enable.auto.commit", "false");

        //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset
        //若是採用latest,消費者只能得道其啓動後,生產者生產的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        
        final int minBatchSize = 5;  //批量提交數量
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                LOG.info("now commit offset");
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

上面例子中咱們將自動提交改成手動提交,若是取得消息後,由於某種緣由沒有進行提交,那麼消息仍然保持在Kafka中,能夠重複拉取以前沒有確認的消息,保證消息不會丟失,但有可能重複處理相同的消息,消費者接收到重複消息後應該經過業務邏輯保證重複消息不會帶來額外影響,也就是所謂的冪等設計, 這種方法就是Kafka所說的At-Least-Once。上面的這種讀取消息的方法是單線程的,除此以外還能夠用多線程方法讀取消息,每一個線程從指定的分區中讀取消息。

public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //設置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //設置consumer group name
        props.put("group.id","manual_g2");

        props.put("enable.auto.commit", "false");

        //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset
        //若是採用latest,消費者只能得道其啓動後,生產者生產的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                LOG.info("now commit the partition[ "+partition.partition()+"] offset");
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }

咱們還能夠進一步讓消費者消費某個分區的消息。

public static void main(String[] args) {
        Properties props = new Properties();
        //設置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //設置consumer group name
        props.put("group.id", "manual_g4");
        //設置自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率
        props.put("enable.auto.commit", "true");
        //偏移量(offset)提交頻率
        props.put("auto.commit.interval.ms", "1000");
        //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset
        //若是採用latest,消費者只能得道其啓動後,生產者生產的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        TopicPartition partition0 = new TopicPartition("producer_test", 0);
        TopicPartition partition1 = new TopicPartition("producer_test", 1);
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.assign(Arrays.asList(partition0, partition1));
        while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
              for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());
              try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
}

##4.5 生產者、消費者總結##

  1. 若是consumer比partition多,是浪費,由於Kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數。
  2. 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目。
  3. 若是consumer從多個partition讀到數據,不保證數據間的順序性,Kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣。
  4. 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化
  5. High-level接口中獲取不到數據的時候是會block的。

#5、 Kafka運維# ##5.1 Broker故障切換## 咱們在192.168.104.10一、192.168.104.102兩臺服務器上啓動Kafka組成一個集羣,如今咱們觀察一下topic t1的狀況。

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1	PartitionCount:2	ReplicationFactor:2	Configs:
Topic: t1	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
Topic: t1	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 0,1

咱們看到t1由兩個分區組成,分佈於Leader 0、1兩個服務器上。下面咱們運行消費者程序,同時運行生產者程序,咱們向topic t1發送1十一、22二、33三、44四、55五、666的數據。

bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1
111
222
333
444
555
666

接下來咱們觀察一下消費者接收消息的狀況。

fetched from partition 0, offset: 3, message: 
fetched from partition 1, offset: 4, message: 111
fetched from partition 0, offset: 4, message: 222
fetched from partition 1, offset: 5, message: 333
fetched from partition 0, offset: 5, message: 444
fetched from partition 1, offset: 6, message: 555
fetched from partition 0, offset: 6, message: 666

能夠看到消息被很是均勻的發送到兩個分區,消費者從兩個分區中拉取了消息。爲了模擬故障,咱們手工kill 101上的Kafka進程。這時咱們在觀察Kafka的分區狀況,對照以前的結果,咱們發現兩個分區的Leader都變爲1了,說明Kafka啓用了副本機制進行故障切換。

./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1
Topic:t1	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: t1	Partition: 0	Leader: 1	Replicas: 0,1	Isr: 1
	Topic: t1	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1

咱們繼續向分區發送888, 999,消費者仍然可以接收到發送的消息,而不受故障進度的影響。邏輯上看消費者只是讀取分區上的消息,與具體的服務器不要緊。

fetched from partition 1, offset: 7, message: 999
fetched from partition 0, offset: 7, message: 888

##5.2 Broker動態擴容## ###5.2.1 增長分區### 咱們爲已經建立的包含兩個分區的Topic在添加一個分區。

Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3

咱們觀察一下增長分區後的結果:

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1

Topic:t1	PartitionCount:3	ReplicationFactor:2	Configs:
Topic: t1	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 1,0
Topic: t1	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
Topic: t1	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

接下來,咱們使用生產者程序發送數據,過了一段時間後發現生產者程序已經能夠向新增分區寫入數據了。說明分區的增減對正在運行的應用程序(生產者、消費者)沒有影響, 生產者、消費者都不須要從新啓動。

###5.2.2 增長Broker Server### 待補充

##5.3 Kafka配置優化## 待補充

##5.4 數據清理## ###5.4.1 數據刪除### ###5.4.2 數據壓縮###

##5.5 Kafka運行監控## 目前Kafka有三個經常使用的監控系統: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,這三個系統或多或少都有些問題,不是特別完善,推薦使用KafkaOffsetMonitor。

#6、Kafka其餘組件# ##6.1 Kafka Connect## Kafka 0.9+增長了一個新的特性 Kafka Connect ,能夠更方便的建立和管理數據流管道。它爲Kafka和其它系統建立規模可擴展的、可信賴的流數據提供了一個簡單的模型,經過 connectors能夠將大數據從其它系統導入到Kafka中,也能夠從Kafka中導出到其它系統。Kafka Connect能夠將完整的數據庫注入到Kafka的Topic中,或者將服務器的系統監控指標註入到Kafka,而後像正常的Kafka流處理機制同樣進行數據流處理。而導出工做則是將數據從Kafka Topic中導出到其它數據存儲系統、查詢系統或者離線分析系統等,好比數據庫、 Elastic Search 、 Apache Ignite 等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供統一的集成API
  • 同時支持分佈式模式和單機模式
  • REST 接口,用來查看和管理Kafka connectors
  • 自動化的offset管理,開發人員沒必要擔憂錯誤處理的影響
  • 分佈式、可擴展
  • 流/批處理集成

當前Kafka Connect支持兩種分發擔保:at least once (至少一次) 和 at most once(至多一次),exactly once將在將來支持,當前已有的Connectors包括:

Connector Name Owner Status
HDFS confluent-platform@googlegroups.com Confluentsupported
JDBC confluent-platform@googlegroups.com Confluentsupported
Debezium - CDC Sources debezium@gmail.com Community project
MongoDB Source a.patelli@reply.de a.topchyan@reply.de In progress
MQTT Source tomasz.pietrzak@evok.ly Community project
MySQL Binlog Source wushujames@gmail.com In progress
Twitter Source rollulus@xs4all.nl In progress
Cassandra Sink Cassandra Sink Community project
Elastic Search Sink ksenji@gmail.com Community project
Elastic Search Sink hannes.stockner@gmail.com In progress
Elastic Search Sink a.patelli@reply.de a.topchyan@reply.de In progress

Kafka Connect目前正在開發中,最新的組件請查看官網https://www.confluent.io/product/connectors/

咱們來看一個使用Kafka Connect從一個文件讀取數據在傳輸到另外一個文件的例子。

  • 首先在192.168.104.10一、192.168.104.102兩臺服務器上啓動Kafka。
  • 在192.168.104.102服務器的Kafka安裝目錄上,修改connect-standalone.properties文件:
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

修改connect-file-source.properties文件:
file=/root/data.txt
topic=t1

修改connect-file-sink.properties文件:
file=/root/output.txt
topics=t1
  • 在192.168.104.102服務器上啓動Kafka-connect bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 向/root/data.txt中寫入數據,echo 「Kafka connect」>> data.txt,能夠觀察」Kafka connect」被寫入到/root/output.txt文件中。

##6.2 Kafka Stream## Kafka Streams是一套類庫,嵌入到java應用程序中,它使得Apache Kafka能夠擁有流處理的能力,經過使用Kafka Stream API進行業務邏輯處理最後寫回Kakfa或者其餘系統中。Kafka Stream中有幾個重要的流處理概念:嚴格區分Event time和Process Time、支持窗口函數、應用狀態管理。開發者使用Kafka Stream的門檻很是低,好比單機進行一些小數據量的功能驗證而不須要在其餘機器上啓動一些服務(好比在Storm運行Topology須要啓動Nimbus和Supervisor,固然也支持Local Mode),Kafka Stream的併發模型能夠對單應用多實例進行負載均衡。有了Kafka Stream能夠在不少場景下代替Storm、Spark Streaming減小技術複雜度。目前Kafka Stream仍然處於開發階段,不建議生產環境使用,因此期待正式版發佈吧。

##6.3 Kafka Camus## Camus是Linkedin開源的一個從Kafka到HDFS的數據管道,本質上上Camus是一個運行在Hadoop中的MapReduce程序,調用一些Camus提供的API從Kafka中讀取數據而後寫入HDFS。Camus2015年已經中止維護了,gobblin是後續產品,camus功能是是Gobblin的一個子集,經過執行MapReduce任務實現從Kafka讀取數據到HDFS,而gobblin是一個通用的數據提取框架,能夠將各類來源的數據同步到HDFS上,包括數據庫、FTP、Kafka等。

#7、 Kafka典型應用場景# Kafka做爲一個消息中間件,最長應用的場景是將數據進行加工後從源系統移動到目的系統,也就是所謂的ETL過程,ETL是一個數據從源頭到目的地的移動過程,固然其中也伴隨數據清洗。一般數據源頭是應用程序所輸出的消息、日誌、生產數據庫數據。應用程序輸出消息一般由應用程序主動控制寫入Kafka的行爲,而從日誌、生產數據庫到Kafka一般由第三方獨立應用處理。從日誌到Kafka典型的技術方案如ELK,從生產數據庫到Kafka一般可採用以下三種方式:

  • 經過時間戳方式記錄數據變動並寫入Kafka,如使用kettle等ETL工具。
  • 經過觸發器方式記錄數據變動並寫入Kafka,如使用kettle等ETL工具。
  • 經過數據庫特有特性記錄數變動並寫入Kafka,如Oracle GoldenGate,MySQL Binlog,Postgre SQL Wal,MongoDB Oplog,CouchDB Changes Feed,值得一提的是PostgreSQL 9.4後的Bottled Water是一個很是好用的方案,將PostgreSQL數據同步到Kafka中。

數據經過Kafka移動到Hadoop一般有以下方案:

  • Kafka -> Flume -> Hadoop Hdfs
  • Kafka -> Gobblin -> Hadoop Hdfs
  • Kafka -> Kafka Hadoop Loader -> Hadoop Hdfs
  • Kafka -> KaBoom -> Hadoop Hdfs
  • Kafka -> Kafka Connect -> Hadoop Hdfs
  • Kafka -> Storm\Spark Streaming -> Hadoop Hdfs

從目前看這些方法都是經常使用的成熟方案,不少技術也在被一線互聯網公司所使用,好比京東內部在使用Gobblin將數據從Kafka同步到Hdfs中,但從長遠看Kafka Connect則是最佳方案,畢竟是官方標準出品並且Kafka Connect還在快速的發展。

#8、參考資料#

  1. Confluent JDBC Connector

  2. 如何肯定Kafka的分區數,key和consumer線程數

  3. Kafka使用入門教程

  4. 《Kafka Stream》調研:一種輕量級流計算模式

  5. 天天處理幾十億條消息:Yelp的實時數據管道


再一次感謝您花費時間閱讀這份文稿,祝您閱讀愉快!

做者 @量子世界的我  2016年11月8日

相關文章
相關標籤/搜索