使用Flume消費Kafka數據到HDFS

1.概述

對於數據的轉發,Kafka是一個不錯的選擇。Kafka可以裝載數據到消息隊列,而後等待其餘業務場景去消費這些數據,Kafka的應用接口API很是的豐富,支持各類存儲介質,例如HDFS、HBase等。若是不想使用Kafka API編寫代碼去消費Kafka Topic,也是有組件能夠去集成消費的。下面筆者將爲你們介紹如何使用Flume快速消費Kafka Topic數據,而後將消費後的數據轉發到HDFS上。apache

2.內容

在實現這套方案之間,能夠先來看看整個數據的流向,以下圖所示:bootstrap

 

業務數據實時存儲到Kafka集羣,而後經過Flume Source組件實時去消費Kafka業務Topic獲取數據,將消費後的數據經過Flume Sink組件發送到HDFS進行存儲。bash

2.1 準備基礎環境

按照上圖所示數據流向方案,須要準備好Kafka、Flume、Hadoop(HDFS可用)等組件。ssh

2.1.1 啓動Kafka集羣並建立Topic

Kafka目前來講,並無一個批量的管理腳本,不過咱們能夠對kafka-server-start.sh腳本和kafka-server-stop.sh腳本進行二次封裝。代碼以下所示:async

#! /bin/bash

# Kafka代理節點地址, 若是節點較多能夠用一個文件來存儲
hosts=(dn1 dn2 dn3)

# 打印啓動分佈式腳本信息
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`

echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.

# 執行分佈式開啓命令    
function start()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
            sleep 1
        done
}    

# 執行分佈式關閉命令    
function stop()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
            sleep 1
        done
}

# 查看Kafka代理節點狀態
function status()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
            sleep 1
        done
}

# 判斷輸入的Kafka命令參數是否有效
case "$1" in
    start)
        start
        ;;
    stop)
        stop
        ;;
    status)
        status
        ;;
    *)
        echo "Usage: $0 {start|stop|status}"
        RETVAL=1
esac

啓動Kafka集羣后,在Kafka集羣可用的狀況下,建立一個業務Topic,執行命令以下:分佈式

# 建立一個flume_collector_data主題
kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_collector_data

2.2 配置Flume Agent

而後,開始配置Flume Agent信息,讓Flume從Kafka集羣的flume_collector_data主題中讀取數據,並將讀取到的數據發送到HDFS中進行存儲。配置內容以下:oop

# ------------------- define data source ----------------------
# source alias
agent.sources = source_from_kafka  
# channels alias
agent.channels = mem_channel  
# sink alias
agent.sinks = hdfs_sink  


# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource  
agent.sources.source_from_kafka.channels = mem_channel  
agent.sources.source_from_kafka.batchSize = 5000  

# set kafka broker address  
agent.sources.source_from_kafka.kafka.bootstrap.servers = dn1:9092,dn2:9092,dn3:9092

# set kafka topic
agent.sources.source_from_kafka.kafka.topics = flume_collector_data

# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = flume_test_id

# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs 

# specify the channel the sink should use  
agent.sinks.hdfs_sink.channel = mem_channel

# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d  

# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0  
agent.sinks.hdfs_sink.hdfs.rollCount = 0  
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
agent.sinks.hdfs_sink.hdfs.writeFormat=Text    

# define channel from kafka source to hdfs sink 
agent.channels.mem_channel.type = memory  

# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000

而後,啓動Flume Agent,執行命令以下:學習

# 在Linux後臺執行命令
flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hdfs.properties &

2.3 向Kafka主題中發送數據

啓動Kafka Eagle監控系統(執行ke.sh start命令),填寫發送數據。以下圖所示:編碼

而後,查詢Topic中的數據是否有被寫入,以下圖所示:
spa

 

最後,到HDFS對應的路徑查看Flume傳輸的數據,結果以下圖所示:

3.Kafka如何經過Flume傳輸數據到HBase

3.1 建立新主題

建立一個新的Topic,執行命令以下:

# 建立一個flume_kafka_to_hbase主題
kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_kafka_to_hbase

3.2 配置Flume Agent

而後,配置Flume Agent信息,內容以下:

# ------------------- define data source ----------------------
# source alias
agent.sources = kafkaSource
# channels alias
agent.channels = kafkaChannel
# sink alias
agent.sinks = hbaseSink


# set kafka channel
agent.sources.kafkaSource.channels = kafkaChannel

# set hbase channel
agent.sinks.hbaseSink.channel = kafkaChannel

# set kafka source
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource

# set kafka broker address  
agent.sources.kafkaSource.kafka.bootstrap.servers = dn1:9092,dn2:9092,dn3:9092

# set kafka topic
agent.sources.kafkaSource.kafka.topics = flume_kafka_to_hbase

# set kafka groupid
agent.sources.kafkaSource.kafka.consumer.group.id = flume_test_id



# set channel
agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
# channel queue
agent.channels.kafkaChannel.capacity=10000
# transaction size
agent.channels.kafkaChannel.transactionCapacity=1000



# set hbase sink
agent.sinks.hbaseSink.type = asynchbase
# hbase table
agent.sinks.hbaseSink.table = flume_data
# set table column
agent.sinks.hbaseSink.columnFamily= info
# serializer sink
agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

# set hbase zk
agent.sinks.hbaseSink.zookeeperQuorum = dn1:2181,dn2:2181,dn3:2181

3.3 建立HBase表

進入到HBase集羣,執行表建立命令,以下所示:

hbase(main):002:0> create 'flume_data','info'

3.4 啓動Flume Agent

接着,啓動Flume Agent實例,命令以下所示:

# 在Linux後臺執行命令
flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hbase.properties &

3.5 在Kafka Eagle中向Topic寫入數據

而後,在Kafka Eagle中寫入數據,以下圖所示:

3.6 在HBase中查詢傳輸的數據

最後,在HBase中查詢表flume_data的數據,驗證是否傳輸成功,命令以下:

hbase(main):003:0> scan 'flume_data'

預覽結果以下所示:

 

4.總結

 至此,Kafka中業務Topic的數據,通過Flume Source組件消費後,再由Flume Sink組件寫入到HDFS,整個過程省略了大量的業務編碼工做。若是實際工做當中不涉及複雜的業務邏輯處理,對於Kafka的數據轉發需求,不妨能夠試試這種方案。

5.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。 

相關文章
相關標籤/搜索