Flume + Kafka學習——整合Demo

Flume + Kafka學習——整合Demo

環境

  1. Flume 1.6.0-cdh5.7.0
  2. Kafka 0.9.0.0

FLume官方文檔
Flume1.6.0 User Guide
Kafka官網文檔
Kafka Documentationhtml

Flume

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

Flume是分佈式的日誌收集系統,它將各個服務器中的數據收集起來並送到目的地apache

Flume數據流模型

Flume就是將數據從數據源(source)收集過來,Flume會先緩存數據(channel),再將收集到的數據送到指定的目的地(sink),最後Flume在刪除本身緩存的數據數組

這樣就是一個Event ,被定義爲具備字節數組和可選字符串屬性的數據流單元。包括 event headers、event body、event信息
數據流模型緩存

Flume Agent 是一個(JVM)進程,用於承載Event從外部源流向下一個目標服務器

Agent 由三個核心組成架構

  1. Source source組件是專門用來收集數據的,相似生產者。消耗由外部源(如Web服務器)傳遞給它的Event。外部源以Flume源能識別的格式向Flume發送Event
  2. Channel source組件把數據收集來之後臨時存放在channel中,相似倉庫
  3. Sink sink組件是用於把數據發送到目的地,相似消費者

Kafka

Kafka® is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景分佈式

  1. producer:消息生產者,發佈消息到 kafka 集羣的終端或服務。
  2. broker:kafka 集羣中包含的服務器。
  3. topic:每條發佈到 kafka 集羣的消息屬於的類別,即 kafka 是面向 topic 的。
  4. partition:partition 是物理上的概念,每一個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
  5. consumer:從 kafka 集羣中消費消息的終端或服務。
  6. consumer group:high-level consumer API 中,每一個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但能夠被多個 consumer group 消費。
  7. replica:partition 的副本,保障 partition 的高可用。
  8. leader:replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
  9. follower:replica 中的一個角色,從 leader 中複製數據。
  10. controller:kafka 集羣中的其中一個服務器,用來進行 leader election 以及 各類 failover。
  11. zookeeper:kafka 經過 zookeeper 來存儲集羣的 meta 信息。

Kafka原理

一般來說,消息模型能夠分爲兩種:隊列和發佈-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記本身。ide

一個發佈在Topic上消息被分發給此消費者組中的一個消費者。假如全部的消費者都在一個組中,那麼這就變成了queue模型。假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者,一個組內多個消費者能夠用來擴展性能和容錯。 性能

而且,kafka可以保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,若是一個消息M1和M2使用相同的producer發送,M1先發送,那麼M1將比M2的offset低,而且優先的出如今日誌中。消費者收到的消息也是此順序。若是一個Topic配置了複製因子(replication facto)爲N,那麼能夠容許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。可是,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。學習

Demo

監控一個文件實時採集新增的數據輸出到Kafka
FLume採用 exec source + memory channel+ kafka sink

![圖片描述

  1. 設置agent

Flume agent配置存儲在本地配置文件中。配置文件包含代理中每一個source,sink和channel的屬性以及它們如何鏈接在一塊兒以造成數據流

下文$FLUME_HOME,$KAFKA_HOME指FLUME,KAFKA安裝目錄

進入$FLUME_HOME/conf 建立exec-memory-kafca.conf並配置

Exec Source配置

type:

The component type name, needs to be exec

command:

The command to execute

Kafka Sink 配置

type:

Must be set to org.apache.flume.sink.kafka.KafkaSink

brokerList:

List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port

topic:

default-flume-topic    The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a 「topic」 field, the event will be published to that topic overriding the topic configured here.

batchSize:

How many messages to process in one batch. Larger batches improve throughput while adding latency.

requiredAcks:

How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.

詳細配置見官方文檔

# 給agent命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = exec  
a1.sources.r1.command = tail -F /Users/null/data/flume-test.log
a1.sources.r1.channels = c1

# 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafka-test
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
a1.sinks.k1.channel = c1
# 配置memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  1. 啓動zookeeper
    Kafka須要zookeeper環境

    bin/zkServer.sh start
  2. 啓動Kafka

    bin/kafka-server-start.sh config/server.properties
  3. 建立話題

    kafka-test 爲topic 名稱與flume 配置中a1.sinks.k1.topic 對應

    kafka-topics.sh --list --zookeeper localhost:2181 kafka-test
  4. 啓動一個Consumer

    kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning
  5. 啓動FLume

    --conf 指定配置文件所在位置 --conf-file爲指定配置文件 --name 爲agent的名稱

    flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-kafca.conf --name a1 -Dflume.root.logger=INFO,console
  6. 在監控的文件中添加內容,能夠查看到kafka發出並消費

參考:
1.Kafka基本知識整理
2.FLume架構以及應用

相關文章
相關標籤/搜索