FLume官方文檔
Flume1.6.0 User Guide
Kafka官網文檔
Kafka Documentationhtml
Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.
Flume是分佈式的日誌收集系統,它將各個服務器中的數據收集起來並送到目的地apache
Flume就是將數據從數據源(source)收集過來,Flume會先緩存數據(channel),再將收集到的數據送到指定的目的地(sink),最後Flume在刪除本身緩存的數據數組
這樣就是一個Event ,被定義爲具備字節數組和可選字符串屬性的數據流單元。包括 event headers、event body、event信息
緩存
Flume Agent 是一個(JVM)進程,用於承載Event從外部源流向下一個目標服務器
Agent 由三個核心組成架構
Kafka® is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景分佈式
一般來說,消息模型能夠分爲兩種:隊列和發佈-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記本身。ide
一個發佈在Topic上消息被分發給此消費者組中的一個消費者。假如全部的消費者都在一個組中,那麼這就變成了queue模型。假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者,一個組內多個消費者能夠用來擴展性能和容錯。 性能
而且,kafka可以保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,若是一個消息M1和M2使用相同的producer發送,M1先發送,那麼M1將比M2的offset低,而且優先的出如今日誌中。消費者收到的消息也是此順序。若是一個Topic配置了複製因子(replication facto)爲N,那麼能夠容許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。可是,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。學習
監控一個文件實時採集新增的數據輸出到Kafka
FLume採用 exec source + memory channel+ kafka sink
Flume agent配置存儲在本地配置文件中。配置文件包含代理中每一個source,sink和channel的屬性以及它們如何鏈接在一塊兒以造成數據流
下文$FLUME_HOME,$KAFKA_HOME指FLUME,KAFKA安裝目錄
進入$FLUME_HOME/conf 建立exec-memory-kafca.conf並配置
type:
The component type name, needs to be exec
command:
The command to execute
type:
Must be set to org.apache.flume.sink.kafka.KafkaSink
brokerList:
List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
topic:
default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a 「topic」 field, the event will be published to that topic overriding the topic configured here.
batchSize:
How many messages to process in one batch. Larger batches improve throughput while adding latency.
requiredAcks:
How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
詳細配置見官方文檔
# 給agent命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /Users/null/data/flume-test.log a1.sources.r1.channels = c1 # 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = kafka-test a1.sinks.k1.brokerList = localhost:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 2 a1.sinks.k1.channel = c1 # 配置memory channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
啓動zookeeper
Kafka須要zookeeper環境
bin/zkServer.sh start
啓動Kafka
bin/kafka-server-start.sh config/server.properties
建立話題
kafka-test 爲topic 名稱與flume 配置中a1.sinks.k1.topic 對應
kafka-topics.sh --list --zookeeper localhost:2181 kafka-test
啓動一個Consumer
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning
啓動FLume
--conf 指定配置文件所在位置 --conf-file爲指定配置文件 --name 爲agent的名稱
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-kafca.conf --name a1 -Dflume.root.logger=INFO,console
參考:
1.Kafka基本知識整理
2.FLume架構以及應用