大數據系列之Flume+kafka 整合

相關文章:html

大數據系列之Kafka安裝 apache

大數據系列之Flume--幾種不一樣的Sourcesbootstrap

大數據系列之Flume+HDFSpost

關於Flume 的 一些核心概念:大數據

組件名稱     功能介紹
Agent代理 使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。
Client客戶端 生產數據,運行在一個獨立的線程。
Source源 從Client收集數據,傳遞給Channel。
Sink接收器 從Channel收集數據,進行相關操做,運行在一個獨立線程。
Channel通道 鏈接 sources 和 sinks ,這個有點像一個隊列。
Events事件 傳輸的基本數據負載。

 

 

 

 

 

 

 

 

 

 文章和 大數據系列之Flume+HDFS 很是類似,不一樣的在於flume安裝目錄conf下新建了kafka.properties文件,啓動時也應當用此配置文件做爲參數啓動。下面看具體內容:

1. kafka.properties:url

agent.sources = s1                                                                                                                   
agent.channels = c1                                                                                                                  
agent.sinks = k1                                                                                                                     
                                                                                                                                     
agent.sources.s1.type=exec                                                                                                           
agent.sources.s1.command=tail -F /tmp/logs/kafka.log                                                                                 
agent.sources.s1.channels=c1                                                                                                         
agent.channels.c1.type=memory                                                                                                        
agent.channels.c1.capacity=10000                                                                                                     
agent.channels.c1.transactionCapacity=100                                                                                            
                                                                                                                                     
#設置Kafka接收器                                                                                                                     
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink                                                                           
#設置Kafka的broker地址和端口號                                                                                                       
agent.sinks.k1.brokerList=master:9092                                                                                                
#設置Kafka的Topic                                                                                                                    
agent.sinks.k1.topic=kafkatest                                                                                                       
#設置序列化方式                                                                                                                      
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder                                                                       
                                                                                                                                     
agent.sinks.k1.channel=c1     

  關於配置文件中注意3點:線程

  a.  agent.sources.s1.command=tail -F /tmp/logs/kafka.log   3d

  b.  agent.sinks.k1.brokerList=master:9092代理

  c . agent.sinks.k1.topic=kafkatest code

2.很明顯,由配置文件能夠了解到:

  a.咱們須要在/tmp/logs下建一個kafka.log的文件,且向文件中輸出內容(下面會說到);

  b.flume鏈接到kafka的地址是 master:9092,注意不要配置出錯了;

  c.flume會將採集後的內容輸出到Kafka topic 爲kafkatest上,因此咱們啓動zk,kafka後須要打開一個終端消費topic kafkatest的內容。這樣就能夠看到flume與kafka之間玩起來了~~

 

3.具體操做:

  a.在/tmp/logs下創建空文件kafka.log。在mfz 用戶目錄下新建腳本kafkaoutput.sh(必定要給予可執行權限),用來向kafka.log輸入內容: kafka_test***

  

for((i=0;i<=1000;i++));
do echo "kafka_test-"+$i>>/tmp/logs/kafka.log;
done

  b. 在kafka安裝目錄下執行以下命令,啓動zk,kafka 。(不明白此處可參照 大數據系列之Flume+HDFS)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
bin/kafka-server-start.sh -daemon config/server.properties &

    c.新增Topic kafkatest

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest  

   d.打開新終端,在kafka安裝目錄下執行以下命令,生成對topic kafkatest 的消費

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning --zookeeper master

   e.啓動flume

bin/flume-ng agent --conf-file  conf/kafka.properties -c conf/ --name agent -Dflume.root.logger=DEBUG,console

   d.執行kafkaoutput.sh腳本(注意觀察kafka.log內容及消費終端接收到的內容)

  e.查看新終端消費信息

總體流程如圖:

完~~

後續將介紹Java代碼對於Flume+HDFS ,Flume+Kafka的實現。敬請期待~

相關文章
相關標籤/搜索