Kafka實戰-Flume到Kafka (轉)

原文連接:Kafka實戰-Flume到Kafka

1.概述

  前面給你們介紹了整個Kafka項目的開發流程,今天給你們分享Kafka如何獲取數據源,即Kafka生產數據。下面是今天要分享的目錄:html

  • 數據來源
  • Flume到Kafka
  • 數據源加載
  • 預覽

  下面開始今天的分享內容。java

2.數據來源

  Kafka生產的數據,是由Flume的Sink提供的,這裏咱們須要用到Flume集羣,經過Flume集羣將Agent的日誌收集分發到 Kafka(供實時計算處理)和HDFS(離線計算處理)。關於Flume集羣的Agent部署,這裏就很少作贅述了,不清楚的同窗能夠參考《高可用Hadoop平臺-Flume NG實戰圖解篇》一文中的介紹,下面給你們介紹數據來源的流程圖,以下圖所示:apache

  這裏,咱們使用Flume做爲日誌收集系統,將收集到的數據輸送到Kafka中間件,以供Storm去實時消費計算,整個流程從各個Web節點 上,經過Flume的Agent代理收集日誌,而後彙總到Flume集羣,在由Flume的Sink將日誌輸送到Kafka集羣,完成數據的生產流程。app

3.Flume到Kafka

  從圖,咱們已經清楚了數據生產的流程,下面咱們來看看如何實現Flume到Kafka的輸送過程,下面我用一個簡要的圖來講明,以下圖所示:工具

  這個表達了從Flume到Kafka的輸送工程,下面咱們來看看如何實現這部分。oop

  首先,在咱們完成這部分流程時,須要咱們將Flume集羣和Kafka集羣都部署完成,在完成部署相關集羣后,咱們來配置Flume的Sink數據流向,配置信息以下所示:post

  • 首先是配置spooldir方式,內容以下所示:
producer.sources.s.type = spooldir
producer.sources.s.spoolDir = /home/hadoop/dir/logdfs
  • 固然,Flume的數據發送方類型也是多種類型的,有:Console、Text、HDFS、RPC等,這裏咱們系統所使用的是Kafka中間件來接收,配置內容以下所示:
複製代碼
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test
複製代碼

  這樣,咱們就在Flume的Sink端配置好了數據流向接受方。ui

4.數據加載

  在完成配置後,接下來咱們開始加載數據,首先咱們在Flume的spooldir端生產日誌,以供Flume去收集這些日誌。而後,咱們經過Kafka的KafkaOffsetMonitor監控工具,去監控數據生產的狀況,下面咱們開始加載。編碼

  • 啓動ZK集羣,內容以下所示:
zkServer.sh start

  注意:分別在ZK的節點上啓動。url

  • 啓動Kafka集羣
kafka-server-start.sh config/server.properties &

  在其餘的Kafka節點輸入一樣的命令,完成啓動。

  • 啓動Kafka監控工具
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk dn1:2181,dn2:2181,dn3:2181 \ --port 8089 \ --refresh 10.seconds \ --retain 1.days
  • 啓動Flume集羣
flume-ng agent -n producer -c conf -f flume-kafka-sink.properties -Dflume.root.logger=ERROR,console

  而後,我在/home/hadoop/dir/logdfs目錄下上傳log日誌,這裏我只抽取了一少部分日誌進行上傳,以下圖所示,表示日誌上傳成功。

5.預覽

  下面,咱們經過Kafka的監控工具,來預覽咱們上傳的日誌記錄,有沒有在Kafka中產生消息數據,以下所示:

  • 啓動Kafka集羣,爲生產消息截圖預覽

  • 經過Flume上傳日誌,在Kafka中產生消息數據

6.總結

  本篇文章給你們講述了Kafka的消息產生流程,後續會在Kafka實戰系列中爲你們講述Kafka的消息消費流程等一整套流程,這裏只是爲後續的Kafka實戰編碼打下一個基礎,讓你們先對Kafka的消息生產有個總體的認識。

相關文章
相關標籤/搜索