Kafka
概述Kafka
是一個分佈式的基於發佈/訂閱模式的消息隊列,主要應用於大數據實時處理領域。java
應用場景:算法
解耦shell
異步apache
削峯bootstrap
點對點模式:vim
消息生產者生產消息發送到Queue
中,而後消息消費者從Queue
中取出而且消費消息,消息被消費之後,Queue
中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息,Queue
支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費。bash
發佈訂閱模式:服務器
消息生產者將消息發佈到Topic
,同時有多個消息消費者該消息,和點對點不一樣的是,發佈到Topic
中的消息會被全部訂閱者消費。網絡
Producer
:消息生產者,就是向Kafka Broker
發消息的客戶端架構
Consumer
:消息消費者,向Kafka Broker
取消息的客戶端
Consumer Group (CG)
:消費者組,由多個Consumer
組成,消費者組內每一個消費者負責消費不一樣分區的數據,一個分區只能由一個消費者消費,消費者組之間互不影響,全部的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者
Broker
:一臺Kafka
服務器就是一個Broker
,一個集羣由多個Broker
組成,一個Broker
能夠容納多個Topic
Topic
:能夠理解爲一個隊列,生產者和消費者面向的都是一個Topic
Partition
:爲了實現擴展性,一個很是大的Topic
能夠分佈到多個Broker
(即服務器)上,一個Topic
能夠分爲多個Partition
,每一個Partition
是一個有序的隊列
Replica
:副本,爲保證集羣中的某個節點發生故障時,該節點上的Partition
數據不丟失,且Kafka
仍然可以繼續工做,Kafka
提供了副本機制,一個Topic
的每一個分區都有若干個副本,一個leader
和若干個follower
leader
:每一個分區多個副本的主,生產者發送數據的對象,以及消費者消費數據的對象都是leader
follower
:每一個分區多個副本中的從,實時從leader
中同步數據,保持和leader
數據的同步,leader
發生故障時,某個follower
會成爲新的follower
Kafka
快速入門一、解壓
[djm@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
二、修改解壓後的文件夾名稱
[djm@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
三、在/opt/module/kafka
目錄下建立logs
文件夾
[djm@hadoop102 kafka]$ mkdir logs
四、修改配置文件
[djm@hadoop102 kafka]$ vi conf/server.properties
修改如下內容
#broker的全局惟一編號,不能重複 broker.id=0 #刪除topic功能使能 delete.topic.enable=true #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤IO的現成數量 num.io.threads=8 #發送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接收套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka運行日誌存放的路徑 log.dirs=/opt/module/kafka/logs #topic在當前broker上的分區個數 num.partitions=1 #用來恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment文件保留的最長時間,超時將被刪除 log.retention.hours=168 #配置鏈接Zookeeper集羣地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
五、分發
[djm@hadoop102 kafka]$ xsync kafka
六、修改其餘Broker
的broker.id
七、Kafka
羣起腳本
[djm@hadoop102 kafka]$ vim start-kafka
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves` do echo "========== $i ==========" ssh $i 'source /etc/profile&&/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties' echo $? done
[djm@hadoop102 kafka]$ chmod 777 start-kafka
[djm@hadoop102 kafka]$ sudo mv start-kafka /bin
八、啓動Kafka
集羣
[djm@hadoop102 kafka]$ start-kafka
一、查看全部Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
二、建立Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first #--topic 定義topic名 #--replication-factor 定義副本數 #--partitions 定義分區數
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數
三、刪除Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
四、發送消息
[djm@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
五、消費消息
[djm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning 會把topic中以往全部的消息消費出來
六、查看Topic
詳細信息
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
七、修改分區數
[djm@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
分區數只能增長,不能減小
Kafka
架構Kafka
工做流程及文件存儲機制Kafka
中消息是以Topic
進行分類的,生產者生產消息,消費者消費消息,都是面向Topic
的;
Topic
是邏輯上的概念,而Partition
是物理上的概念,每一個Partition
對應於一個log
文件,該log
文件中存儲的就是Producer
生產的數據;
Producer
生產的數據會被不斷追加到該log
文件末端,且每條數據都有本身的offset
,消費者組中的每一個消費者,都會實時記錄本身消費到了哪一個offset
,以便出錯恢復時,從上次的位置繼續消費。
因爲生產者生產的消息會不斷的追加到log
文件末尾,爲了防止文件過大而致使數據定位效率低下,Kafka
採起了分片和索引機制,將每一個Partiton
分爲多個segment
,每一個segment
對應兩個文件,分別是.log
和.index
,這些文件位於同一個文件夾下,文件夾的命名規則爲Topic
名稱+Partiton
序號,.log
和.index
文件以當前segment
的第一條消息的offset
命名,index
存儲索引信息,.log
存儲數據信息,索引文件中的元數據指向對應數據文件中message
的物理偏移地址。
Producer
爲何要進行分區?
Partition
能夠經過調整以適應它所在的機器,而一個Topic
又能夠有多個Partition
組成,所以整個集羣就能夠適應任意大小的數據了分區的原則是什麼?
咱們須要將Producer
發送的數據封裝成一個ProducerRecord
對象:
Partition
的狀況下,直接將指明的值直接做爲Partition
值Partition
值但有key
的狀況下,將key
的hash
值與Topic
的Partition
數進行取餘獲得 Partition
值Partition
值又沒有key
值的狀況下,第一次調用時隨機生成一個整數(後面每次調用在這個整數上自增),將這個值與Topic
可用的Partition
總數取餘獲得Partition
值,也就是常說的round-robin
算法爲保證Partition
發送的數據,能可靠的發送到指定的Topic
,Topic
的每一個Partition
收到Producer
發送的數據後,都須要向Producer
發送ack
(acknowledgement
確認收到),若是Producer
收到ack
,就會進行下一輪的發送,不然從新發送數據。
副本數據同步策略:
方案 | 優勢 | 缺點 |
---|---|---|
半數以上完成同步,就發送ack |
延遲低 | 選舉新的leader 時,容忍n 臺節點的故障,須要2n+1 個副本 |
所有完成同步,才發送ack |
選舉新的leader 時,容忍n 臺節點的故障,須要n+1 個副本 |
延遲高 |
Kafka
選擇了第二種方案,緣由以下:
一樣爲了容忍n
臺節點的故障,第一種方案須要2n+1
個副本,而第二種方案只須要n+1
個副本,而Kafka
的每一個Partition
存儲大量的數據,這樣會形成大量的數據冗餘;
雖然第二種方案的延遲會比較高,可是相比而言延遲對Kafka
的影響較小。
採用第二種方案後,leader
收到數據,全部的follower
都開始同步數據,可是有一個follower
,由於某種故障,遲遲不能與leader
同步,那leader
就要一直等下去,直到它同步完才能發送ack
,這個問題怎麼解決呢?
leader
維護了一個動態的in-syncreplica set (ISR)
,意爲和leader
保持同步的follower
集合,當ISR
中的follower
完成數據的同步以後,leader
就會給follower
發送ack
,若是follower
長時間未向leader
同步數據,則該follower
將被踢出ISR
,該時間閾值由replica.lag.time.max.ms
參數設定,leader
發生故障以後,就會從ISR
中選舉新的leader
。
ack
應答機制:
對於某些不重要的數據,可以容忍少許數據的丟失,因此不必等ISR
中的全部follower
所有同步完成
因此Kafka
提供了三種可靠性級別,根據對可靠性和延遲的要求權衡,分別是:
Producer
不等待Broker
的ack
,這一操做提供了最低的延遲,Broker
一接收到尚未落盤就已經返回,當Broker
故障時可能會丟失數據Producer
等待Broker
的ack
,Partition
的leader
落盤成功後返回ack
,若是在follower
同步成功以前leader
故障,那麼將會丟失數據Producer
等待Broker
的ack
,Partition
的leader
和follower
所有落盤成功後才返回ack
,可是若是在follower
同步完成後,Broker
發送ack
以前,leader
發生故障,那麼會形成數據重複故障處理:
follower
掛了被會暫時提出ISR
,等到follower
恢復後,follower
會讀取本地磁盤記錄上次的HW
,並將log
文件中高於HW
的部分截取掉,從HW
開始向leader
進行同步,等leader
的LEO
高於Partition
的HW
,就能夠被從新加入ISR
leader
發生故障以後,會從ISR
中選出一個新的leader
,爲保證多個副本之間的數據一致性,每一個leader
會將各自log
文件中高於HW
的數據切掉,而後重新的leader
同步數據
Exactly Once
語義對於某些比較重要的消息,咱們須要保證Exactly Once
語義,即保證每條消息被髮送且僅被髮送一次
在0.11
版本以後,Kafka
引入了冪等性機制(idempotent
),配合acks = -1
時的at least once
語義,實現了Producer
到Broker
的Exactly once
語義
idempotent + at least once = exactly once
使用時,只需將enable.idempotence
屬性設置爲true
,Kafka
自動將acks
屬性設爲-1
Consumer
Consumer
採起pull
的方式從Broker
中讀取數據
爲何採用pull
方式呢?
由於push
模式很難適應不一樣速率的Consumer
,所以發送速率是由Broker
決定的,它的目的就是儘量快的傳遞消息,可是這樣容易形成Consumer
來不及處理消息,典型的表現就是網絡擁堵以及拒絕服務,而poll
模式則能夠根據Consumer
的消費能力消費消息。
可是poll
也有不足,就是若是隊列中沒有消息,Consumer
可能陷入循環中,一直返回空數據,針對這個缺點,Consumer
在消費數據時會傳入一個timeout
,若是當前沒有消息可供消費,Consumer
會等待一段時間再返回,這段時間就是timeout
。
Kafka
有兩種分配策略,分別是:
offset
維護因爲Consumer
在消息過程當中可能會出現斷電宕機等故障,Consumer
恢復後,須要從故障的位置繼續消費,因此Consumer
須要實時記錄本身消費到了哪一個offset
0.9
之前,Consumer
默認將offset
保存在ZK
中
0.9
之後,Consumer
默認將offset
保存在Kafka
一個內置的Topic
,該Topic
爲__consumer_offsets
Kafka
高效讀取數據順序寫磁盤
Kafka
的Producer
生產數據,要寫入到log
文件中,寫的過程是一直追加到文件末端
零拷貝技術
Zookeeper
在Kafka
中的做用Kafka集羣中有一個broker會被選舉爲Controller,負責管理集羣broker的上下線,全部topic的分區副本分配和leader選舉等工做。
Controller的管理工做都是依賴於Zookeeper的。
如下爲partition的leader選舉過程:
Kafka API
Producer API
Kafka
的Producer
發送消息採用的是異步發送的方式,在消息發送的過程當中,涉及到了兩個線程——main
線程和Sender
線程,以及一個線程共享變量——RecordAccumulator
,main
線程將消息發送給RecordAccumulator
,Sender
線程不斷從RecordAccumulator
中拉取消息發送到Kafka broker
。
相關參數:
batch.size
:只有數據積累到batch.size
以後,sender
纔會發送數據
linger.ms
:若是數據遲遲未達到batch.size
,sender
等待linger.time
以後就會發送數據
相關類:
KafkaProducer
:須要建立一個生產者對象,用來發送數據
ProducerConfig
:獲取所需的一系列配置參數
ProducerRecord
:每條數據都要封裝成一個ProducerRecord
對象
導入依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
package com.djm.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("success -> " + metadata.offset()); } else { exception.printStackTrace(); } } }); } producer.close(); } }
package com.djm.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 100); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("success -> " + metadata.offset()); } else { exception.printStackTrace(); } } }).get(); } producer.close(); } }
Consumer API
Consumer
消費數據時的可靠性是很容易保證的,由於數據在Kafka
中是持久化的,故不用擔憂數據丟失問題。
因爲Consumer
在消費過程當中可能會出現斷電宕機等故障,Consumer
恢復後,須要從故障前的位置的繼續消費,因此Consumer
須要實時記錄本身消費到了哪一個offset
,以便故障恢復後繼續消費。
因此offset
的維護是Consumer
消費數據是必須考慮的問題。
相關類:
KafkaConsumer
:須要建立一個消費者對象,用來消費數據
ConsumerConfig
:獲取所需的一系列配置參數
ConsuemrRecord
:每條數據都要封裝成一個ConsumerRecord
對象
導入依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
offset
package com.djm.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("first")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
手動提交offset的方法有兩種:
commitSync
(同步提交):將本次poll
的一批數據最高的偏移量提交,失敗重試,一直到提交成功commitAsync
(異步提交):將本次poll
的一批數據最高的偏移量提交,沒有失敗重試機制,有可能提交失敗offset
自動提交offset
的相關參數:
enable.auto.commit
:是否開啓自動提交offset
功能
auto.commit.interval.ms
:自動提交offset
的時間間隔
package com.djm.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("first")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
Interceptor
Interceptor
是在Kafka
0.10
版本被引入的,主要用於實現Client
端的定製化控制邏輯。
對於Producer
而言,Interceptor
使得用戶在消息發送前以及Producer
回調邏輯前有機會對消息作一些定製化需求,好比修改消息等,同時,Producer
容許用戶指定多個Interceptor
按序做用於同一條消息從而造成一個Interceptorchain
。
Interceptor
的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
configure(configs)
:獲取配置信息和初始化數據時調用onSend(ProducerRecord)
:Producer
確保在消息被序列化以及計算分區前調用該方法,用戶能夠在該方法中對消息作任何操做,但最好保證不要修改消息所屬的Topic
和Partition
,不然會影響目標分區的計算onAcknowledgement(RecordMetadata, Exception)
:該方法會在消息從RecordAccumulator
成功發送到Kafka Broker
以後,或者在發送過程當中失敗時調用close
:關閉Interceptor
,主要用於執行一些資源清理工做攔截器案例
一、需求分析:
實現一個簡單的雙Interceptor
組成的攔截鏈,第一個Interceptor
會在消息發送前將時間戳信息加到消息value
的最前部,第二個Interceptor
會在消息發送後更新成功發送消息數或失敗發送消息數
二、編寫TimeInterceptor
package com.djm.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
三、編寫CounterInterceptor
package com.djm.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String, String> { private static long successCounter = 0L; private static long errorCounter = 0L; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } @Override public void configure(Map<String, ?> configs) { } }
四、修改CustomProducer
package com.djm.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class CustomProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); List<String> interceptors = new ArrayList<>(); interceptors.add("com.djm.kafka.interceptor.TimeInterceptor"); interceptors.add("com.djm.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("success -> " + metadata.offset()); } else { exception.printStackTrace(); } } }); } producer.close(); } }
Flume
對接Kafka
一、配置Flume
編寫flume-kafka.conf
[djm@hadoop102 job]$ vim flume-kafka.conf
輸入一下內容
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log a1.sources.r1.shell = /bin/bash -c # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
二、啓動消費者
[djm@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
三、啓動Flume
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
四、向/opt/module/datas/flume.log
裏追加數據,查看Kafka
消費狀況
Kafka
監控Monitor
一、上傳jar
包KafkaOffsetMonitor-assembly-0.4.6.jar
到集羣
二、在/opt/module/
下建立kafka-offset-console
文件夾
三、將上傳的jar
包放入剛建立的目錄下
四、在/opt/module/kafka-offset-console
目錄下建立啓動腳本start.sh
,內容以下:
#!/bin/bash java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \ --kafkaSecurityProtocol PLAINTEXT \ --zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \ --port 8086 \ --refresh 10.seconds \ --retain 2.days \ --dbName offsetapp_kafka &
五、在/opt/module/kafka-offset-console
目錄下建立mobile-logs
文件夾
六、啓動Monitor
./start.sh
Manager
一、上傳壓縮包kafka-manager-1.3.3.15.zip
到集羣
二、解壓到/opt/module
三、修改配置文件conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改成:
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
四、啓動kafka-manager
[djm@hadoop102 kafka-manager-1.3.3.15]$ bin/kafka-manager
五、登陸hadoop102:9000
頁面查看詳細信息