Kafka 入門

1 Kafka概述

1.1 定義

Kafka是一個分佈式的基於發佈/訂閱模式的消息隊列,主要應用於大數據實時處理領域。java

應用場景:算法

解耦shell

異步apache

削峯bootstrap

1.2 消息隊列

1.2.1 傳統消息隊列的應用場景

Kafka 入門

Kafka 入門

1.2.2 消息隊列的兩種模式

點對點模式:vim

消息生產者生產消息發送到Queue中,而後消息消費者從Queue中取出而且消費消息,消息被消費之後,Queue中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息,Queue支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費。bash

Kafka 入門

發佈訂閱模式:服務器

消息生產者將消息發佈到Topic,同時有多個消息消費者該消息,和點對點不一樣的是,發佈到Topic中的消息會被全部訂閱者消費。網絡

Kafka 入門

1.3 基礎架構

Kafka 入門

Producer:消息生產者,就是向Kafka Broker發消息的客戶端架構

Consumer:消息消費者,向Kafka Broker取消息的客戶端

Consumer Group (CG):消費者組,由多個Consumer組成,消費者組內每一個消費者負責消費不一樣分區的數據,一個分區只能由一個消費者消費,消費者組之間互不影響,全部的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者

Broker:一臺Kafka服務器就是一個Broker,一個集羣由多個Broker組成,一個Broker能夠容納多個Topic

Topic:能夠理解爲一個隊列,生產者和消費者面向的都是一個Topic

Partition:爲了實現擴展性,一個很是大的Topic能夠分佈到多個Broker(即服務器)上,一個Topic能夠分爲多個Partition,每一個Partition是一個有序的隊列

Replica:副本,爲保證集羣中的某個節點發生故障時,該節點上的Partition數據不丟失,且Kafka仍然可以繼續工做,Kafka提供了副本機制,一個Topic的每一個分區都有若干個副本,一個leader和若干個follower

leader:每一個分區多個副本的主,生產者發送數據的對象,以及消費者消費數據的對象都是leader

follower:每一個分區多個副本中的從,實時從leader中同步數據,保持和leader數據的同步,leader發生故障時,某個follower會成爲新的follower

2 Kafka快速入門

2.1 安裝部署

一、解壓

[djm@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

二、修改解壓後的文件夾名稱

[djm@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

三、在/opt/module/kafka目錄下建立logs文件夾

[djm@hadoop102 kafka]$ mkdir logs

四、修改配置文件

[djm@hadoop102 kafka]$ vi conf/server.properties

修改如下內容

#broker的全局惟一編號,不能重複
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的現成數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka運行日誌存放的路徑 
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置鏈接Zookeeper集羣地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

五、分發

[djm@hadoop102 kafka]$ xsync kafka

六、修改其餘Brokerbroker.id

七、Kafka羣起腳本

[djm@hadoop102 kafka]$ vim start-kafka
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
    echo "========== $i =========="
    ssh $i 'source /etc/profile&&/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
    echo $?
done
[djm@hadoop102 kafka]$ chmod 777 start-kafka
[djm@hadoop102 kafka]$ sudo mv start-kafka /bin

八、啓動Kafka集羣

[djm@hadoop102 kafka]$ start-kafka

2.2 命令行操做

一、查看全部Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

二、建立Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
#--topic 定義topic名
#--replication-factor 定義副本數
#--partitions 定義分區數

--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數

三、刪除Topic

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

四、發送消息

[djm@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

五、消費消息

[djm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

--from-beginning 會把topic中以往全部的消息消費出來

六、查看Topic詳細信息

[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

七、修改分區數

[djm@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

分區數只能增長,不能減小

3 Kafka架構

3.1 Kafka工做流程及文件存儲機制

Kafka 入門

Kafka中消息是以Topic進行分類的,生產者生產消息,消費者消費消息,都是面向Topic的;

Topic是邏輯上的概念,而Partition是物理上的概念,每一個Partition對應於一個log文件,該log文件中存儲的就是Producer生產的數據;

Producer生產的數據會被不斷追加到該log文件末端,且每條數據都有本身的offset,消費者組中的每一個消費者,都會實時記錄本身消費到了哪一個offset,以便出錯恢復時,從上次的位置繼續消費。

Kafka 入門

因爲生產者生產的消息會不斷的追加到log文件末尾,爲了防止文件過大而致使數據定位效率低下,Kafka採起了分片和索引機制,將每一個Partiton分爲多個segment,每一個segment對應兩個文件,分別是.log.index,這些文件位於同一個文件夾下,文件夾的命名規則爲Topic名稱+Partiton序號,.log.index文件以當前segment的第一條消息的offset命名,index存儲索引信息,.log存儲數據信息,索引文件中的元數據指向對應數據文件中message的物理偏移地址。

Kafka 入門

3.2 Producer

3.2.1 分區策略

爲何要進行分區?

  • 方便在羣集中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個Topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了
  • 能夠提升併發

分區的原則是什麼?

咱們須要將Producer發送的數據封裝成一個ProducerRecord對象:

  • 指明Partition的狀況下,直接將指明的值直接做爲Partition
  • 沒有指明Partition值但有key的狀況下,將keyhash值與TopicPartition數進行取餘獲得 Partition
  • 既沒有Partition值又沒有key值的狀況下,第一次調用時隨機生成一個整數(後面每次調用在這個整數上自增),將這個值與Topic可用的Partition總數取餘獲得Partition值,也就是常說的round-robin算法

3.2.2 數據可靠性保證

爲保證Partition發送的數據,能可靠的發送到指定的TopicTopic的每一個Partition收到Producer發送的數據後,都須要向Producer發送ackacknowledgement確認收到),若是Producer收到ack,就會進行下一輪的發送,不然從新發送數據。

Kafka 入門

副本數據同步策略:

方案 優勢 缺點
半數以上完成同步,就發送ack 延遲低 選舉新的leader時,容忍n臺節點的故障,須要2n+1個副本
所有完成同步,才發送ack 選舉新的leader時,容忍n臺節點的故障,須要n+1個副本 延遲高

Kafka選擇了第二種方案,緣由以下:

一樣爲了容忍n臺節點的故障,第一種方案須要2n+1個副本,而第二種方案只須要n+1個副本,而Kafka的每一個Partition存儲大量的數據,這樣會形成大量的數據冗餘;

雖然第二種方案的延遲會比較高,可是相比而言延遲對Kafka的影響較小。

採用第二種方案後,leader收到數據,全部的follower都開始同步數據,可是有一個follower,由於某種故障,遲遲不能與leader同步,那leader就要一直等下去,直到它同步完才能發送ack,這個問題怎麼解決呢?

leader維護了一個動態的in-syncreplica set (ISR),意爲和leader保持同步的follower集合,當ISR中的follower完成數據的同步以後,leader就會給follower發送ack,若是follower長時間未向leader同步數據,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數設定,leader發生故障以後,就會從ISR中選舉新的leader

ack應答機制:

對於某些不重要的數據,可以容忍少許數據的丟失,因此不必等ISR中的全部follower所有同步完成

因此Kafka提供了三種可靠性級別,根據對可靠性和延遲的要求權衡,分別是:

  • 0 Producer不等待Brokerack,這一操做提供了最低的延遲,Broker一接收到尚未落盤就已經返回,當Broker故障時可能會丟失數據
  • 1 Producer等待BrokerackPartitionleader落盤成功後返回ack,若是在follower同步成功以前leader故障,那麼將會丟失數據
  • -1 Producer等待BrokerackPartitionleaderfollower所有落盤成功後才返回ack,可是若是在follower同步完成後,Broker發送ack以前,leader發生故障,那麼會形成數據重複

故障處理:

Kafka 入門

follower掛了被會暫時提出ISR,等到follower恢復後,follower會讀取本地磁盤記錄上次的HW,並將log文件中高於HW的部分截取掉,從HW開始向leader進行同步,等leaderLEO高於PartitionHW,就能夠被從新加入ISR

leader發生故障以後,會從ISR中選出一個新的leader,爲保證多個副本之間的數據一致性,每一個leader會將各自log文件中高於HW的數據切掉,而後重新的leader同步數據

3.3.3 Exactly Once語義

對於某些比較重要的消息,咱們須要保證Exactly Once語義,即保證每條消息被髮送且僅被髮送一次

0.11版本以後,Kafka引入了冪等性機制(idempotent),配合acks = -1時的at least once語義,實現了ProducerBrokerExactly once語義

idempotent + at least once = exactly once

使用時,只需將enable.idempotence屬性設置爲trueKafka自動將acks屬性設爲-1

3.3 Consumer

3.3.1 消費方式

Consumer採起pull的方式從Broker中讀取數據

爲何採用pull方式呢?

由於push模式很難適應不一樣速率的Consumer,所以發送速率是由Broker決定的,它的目的就是儘量快的傳遞消息,可是這樣容易形成Consumer來不及處理消息,典型的表現就是網絡擁堵以及拒絕服務,而poll模式則能夠根據Consumer的消費能力消費消息。

可是poll也有不足,就是若是隊列中沒有消息,Consumer可能陷入循環中,一直返回空數據,針對這個缺點,Consumer在消費數據時會傳入一個timeout,若是當前沒有消息可供消費,Consumer會等待一段時間再返回,這段時間就是timeout

3.3.2 分區分配策略

Kafka有兩種分配策略,分別是:

Kafka 入門

Kafka 入門

3.3.3 offset維護

因爲Consumer在消息過程當中可能會出現斷電宕機等故障,Consumer恢復後,須要從故障的位置繼續消費,因此Consumer須要實時記錄本身消費到了哪一個offset

0.9之前,Consumer默認將offset保存在ZK

0.9之後,Consumer默認將offset保存在Kafka一個內置的Topic,該Topic__consumer_offsets

3.4 Kafka高效讀取數據

順序寫磁盤

KafkaProducer生產數據,要寫入到log文件中,寫的過程是一直追加到文件末端

零拷貝技術

Kafka 入門

3.5 ZookeeperKafka中的做用

Kafka集羣中有一個broker會被選舉爲Controller,負責管理集羣broker的上下線,全部topic的分區副本分配和leader選舉等工做。

Controller的管理工做都是依賴於Zookeeper的。

如下爲partition的leader選舉過程:

Kafka 入門

Kafka 入門

4 Kafka API

4.1 Producer API

4.1.1 消息發送流程

KafkaProducer發送消息採用的是異步發送的方式,在消息發送的過程當中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulatormain線程將消息發送給RecordAccumulatorSender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker

Kafka 入門

相關參數:

batch.size:只有數據積累到batch.size以後,sender纔會發送數據

linger.ms:若是數據遲遲未達到batch.sizesender等待linger.time以後就會發送數據

相關類:

KafkaProducer:須要建立一個生產者對象,用來發送數據

ProducerConfig:獲取所需的一系列配置參數

ProducerRecord:每條數據都要封裝成一個ProducerRecord對象

導入依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

4.1.2 異步發送

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

4.1.3 同步發送

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            }).get();
        }
        producer.close();
    }
}

4.2 Consumer API

Consumer消費數據時的可靠性是很容易保證的,由於數據在Kafka中是持久化的,故不用擔憂數據丟失問題。

因爲Consumer在消費過程當中可能會出現斷電宕機等故障,Consumer恢復後,須要從故障前的位置的繼續消費,因此Consumer須要實時記錄本身消費到了哪一個offset,以便故障恢復後繼續消費。

因此offset的維護是Consumer消費數據是必須考慮的問題。

相關類:

KafkaConsumer:須要建立一個消費者對象,用來消費數據

ConsumerConfig:獲取所需的一系列配置參數

ConsuemrRecord:每條數據都要封裝成一個ConsumerRecord對象

導入依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

4.2.1 手動提交offset

package com.djm.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}

手動提交offset的方法有兩種:

  • commitSync(同步提交):將本次poll的一批數據最高的偏移量提交,失敗重試,一直到提交成功
  • commitAsync(異步提交):將本次poll的一批數據最高的偏移量提交,沒有失敗重試機制,有可能提交失敗

4.2.2 自動提交offset

自動提交offset的相關參數:

enable.auto.commit:是否開啓自動提交offset功能

auto.commit.interval.ms:自動提交offset的時間間隔

package com.djm.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

4.3 自定義Interceptor

Interceptor是在Kafka 0.10版本被引入的,主要用於實現Client端的定製化控制邏輯。

對於Producer而言,Interceptor使得用戶在消息發送前以及Producer回調邏輯前有機會對消息作一些定製化需求,好比修改消息等,同時,Producer容許用戶指定多個Interceptor按序做用於同一條消息從而造成一個Interceptorchain

Interceptor的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • configure(configs):獲取配置信息和初始化數據時調用
  • onSend(ProducerRecord)Producer確保在消息被序列化以及計算分區前調用該方法,用戶能夠在該方法中對消息作任何操做,但最好保證不要修改消息所屬的TopicPartition,不然會影響目標分區的計算
  • onAcknowledgement(RecordMetadata, Exception):該方法會在消息從RecordAccumulator成功發送到Kafka Broker以後,或者在發送過程當中失敗時調用
  • close:關閉Interceptor,主要用於執行一些資源清理工做

攔截器案例

一、需求分析:

實現一個簡單的雙Interceptor組成的攔截鏈,第一個Interceptor會在消息發送前將時間戳信息加到消息value的最前部,第二個Interceptor會在消息發送後更新成功發送消息數或失敗發送消息數

二、編寫TimeInterceptor

package com.djm.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

三、編寫CounterInterceptor

package com.djm.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private static long successCounter = 0L;

    private static long errorCounter = 0L;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

四、修改CustomProducer

package com.djm.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.djm.kafka.interceptor.TimeInterceptor");  
        interceptors.add("com.djm.kafka.interceptor.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 1000; i++) {
            producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success -> " + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

5 Flume對接Kafka

一、配置Flume

編寫flume-kafka.conf

[djm@hadoop102 job]$ vim flume-kafka.conf

輸入一下內容

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

二、啓動消費者

[djm@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

三、啓動Flume

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

四、向/opt/module/datas/flume.log裏追加數據,查看Kafka消費狀況

6 Kafka監控

6.1 Monitor

一、上傳jarKafkaOffsetMonitor-assembly-0.4.6.jar到集羣

二、在/opt/module/下建立kafka-offset-console文件夾

三、將上傳的jar包放入剛建立的目錄下

四、在/opt/module/kafka-offset-console目錄下建立啓動腳本start.sh,內容以下:

#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &

五、在/opt/module/kafka-offset-console目錄下建立mobile-logs文件夾

六、啓動Monitor

./start.sh

6.2 Manager

一、上傳壓縮包kafka-manager-1.3.3.15.zip到集羣

二、解壓到/opt/module

三、修改配置文件conf/application.conf

kafka-manager.zkhosts="kafka-manager-zookeeper:2181"

修改成:

kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"

四、啓動kafka-manager

[djm@hadoop102 kafka-manager-1.3.3.15]$ bin/kafka-manager

五、登陸hadoop102:9000頁面查看詳細信息

相關文章
相關標籤/搜索