flume學習筆記——安裝和使用

Flume是一個分佈式、可靠、和高可用的海量日誌聚合的系統,支持在系統中定製各種數據發送方,用於收集數據;
同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。html

Flume是一個專門設計用來從大量的源,推送數據到Hadoop生態系統中各類各樣存儲系統中去的,例如HDFS和HBase。java

Guide: http://flume.apache.org/FlumeUserGuide.html正則表達式

 

體系架構

Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。express

 Flume以Flume Agent最小的獨立運行單位。一個Agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成。一個Flume Agent能夠鏈接一個或者多個其餘的Flume Agent;一個Flume Agent也能夠從一個或者多個Flume Agent接收數據。apache

注意:在Flume管道中若是有意想不到的錯誤、超時並進行了重試,Flume會產生重複的數據最終被被寫入,後續須要處理這些冗餘的數據。json

具體能夠參考文章:Flume教程(一) Flume入門教程 bootstrap

 

組件

Source:source是從一些其餘產生數據的應用中接收數據的活躍組件。Source能夠監聽一個或者多個網絡端口,用於接收數據或者能夠從本地文件系統讀取數據。每一個Source必須至少鏈接一個Channel。基於一些標準,一個Source能夠寫入幾個Channel,複製事件到全部或者某些Channel。數組

Source能夠經過處理器 - 攔截器 - 選擇器路由寫入多個Channel。安全

Channel:channel的行爲像隊列,Source寫入到channel,Sink從Channel中讀取。多個Source能夠安全地寫入到相同的Channel,而且多個Sink能夠從相同的Channel進行讀取。bash

但是一個Sink只能從一個Channel讀取。若是多個Sink從相同的Channel讀取,它能夠保證只有一個Sink將會從Channel讀取一個指定特定的事件。

Flume自帶兩類Channel:Memory Channel和File Channel。Memory Channel的數據會在JVM或者機器重啓後丟失;File Channel不會。

Sink: sink連續輪詢各自的Channel來讀取和刪除事件。

攔截器:每次Source將數據寫入Channel,它是經過委派該任務到其Channel處理器來完成,而後Channel處理器將這些事件傳到一個或者多個Source配置的攔截器中。

攔截器是一段代碼,基於某些標準,如正則表達式,攔截器能夠用來刪除事件,爲事件添加新報頭或者移除現有的報頭等。每一個Source能夠配置成使用多個攔截器,按照配置中定義的順序被調用,將攔截器的結果傳遞給鏈的下一個單元。一旦攔截器處理完事件,攔截器鏈返回的事件列表傳遞到Channel列表,即經過Channel選擇器爲每一個事件選擇的Channel。

 

組件

功能

Agent

使用JVM運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。

Client

生產數據,運行在一個獨立的線程。

Source

從Client收集數據,傳遞給Channel。

Sink

從Channel收集數據,運行在一個獨立線程。

Channel

鏈接sources和sinks,這個有點像一個隊列。

Events

能夠是日誌記錄、avro對象等。

 

配置文件

Flume Agent使用純文本配置文件來配置。Flume配置使用屬性文件格式,僅僅是用換行符分隔的鍵值對的純文本文件,如:key1 = value1;當有多個的時候:agent.sources = r1 r2

參考 flume配置介紹

 

1. 從file source 到 file sink的配置文件

# ========= Name the components on this agent =========
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.interceptors = i1

agent.sources.r1.interceptors.i1.type = Inteceptor.DemoInterceptor$Builder

# ========= Describe the source =============
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /home/lintong/桌面/data/input

# ========= Describe the channel =============
# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 1000

# ========= Describe the sink =============
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory = /home/lintong/桌面/data/output
agent.sinks.s1.sink.rollInterval = 0

# ========= Bind the source and sink to the channel =============
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

 

2. 從kafka source 到 file sink的配置文件,kafka使用zookeeper,可是建議使用bootstrap-server

# ========= Name the components on this agent =========
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.interceptors = i1

agent.sources.r1.interceptors.i1.type = Inteceptor.DemoInterceptor$Builder

# ========= Describe the source =============
agent.sources.r1.type=org.apache.flume.source.kafka.KafkaSource  
agent.sources.r1.zookeeperConnect=127.0.0.1:2181  
agent.sources.r1.topic=test #不能寫成topics
#agent.sources.kafkaSource.groupId=flume  
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100  

# ========= Describe the channel =============
# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 1000

# ========= Describe the sink =============
agent.sinks.s1.type = file_roll
agent.sinks.s1.sink.directory = /home/lintong/桌面/data/output
agent.sinks.s1.sink.rollInterval = 0

# ========= Bind the source and sink to the channel =============
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

 

3.kafka source到kafka sink的配置文件

# ========= Name the components on this agent =========
agent.sources = r1
agent.channels = c1
agent.sinks = s1 s2
agent.sources.r1.interceptors = i1

agent.sources.r1.interceptors.i1.type = com.XXX.interceptor.XXXInterceptor$Builder

# ========= Describe the source =============
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.channels = c1
agent.sources.r1.zookeeperConnect = localhost:2181
agent.sources.r1.topic = input

# ========= Describe the channel =============
# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 1000

# ========= Describe the sink =============
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = test
agent.sinks.s1.brokerList = localhost:9092
# 避免死循環
agent.sinks.s1.allowTopicOverride = false

agent.sinks.s2.type = file_roll
agent.sinks.s2.sink.directory = /home/lintong/桌面/data/output
agent.sinks.s2.sink.rollInterval = 0

# ========= Bind the source and sink to the channel =============
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1
#agent.sinks.s2.channel = c1

 

4.file source到hbase sink的配置文件

從文件讀取實時消息,不作處理直接存儲到Hbase

# ========= Name the components on this agent =========
agent.sources = r1
agent.channels = c1
agent.sinks = s1

# ========= Describe the source =============
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /home/lintong/桌面/test.log
agent.sources.r1.checkperiodic = 50


# ========= Describe the sink =============
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 1000

# agent.channels.file-channel.type = file 
# agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint 
# agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

# ========= Describe the sink =============
agent.sinks.s1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.s1.zookeeperQuorum=master:2183
#HBase表名
agent.sinks.s1.table=mikeal-hbase-table
#HBase表的列族名稱
agent.sinks.s1.columnFamily=familyclom1
agent.sinks.s1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#HBase表的列族下的某個列名稱
agent.sinks.s1.serializer.payloadColumn=cloumn-1


# ========= Bind the source and sink to the channel =============
agent.sources.r1.channels = c1
agent.sinks.s1.channel=c1

 若是在啓動flume的時候遇到

java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/***

解決方案,在 ~/software/apache/hadoop-2.9.1/etc/hadoop/hadoop-env.sh 中添加

HADOOP_CLASSPATH=/home/lintong/software/apache/hbase-1.2.6/lib/*

 

5.kafka source到hdfs sink的配置文件

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

# ========= Name the components on this agent =========
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.interceptors = i1

agent.sources.r1.interceptors.i1.type = Util.HdfsInterceptor$Builder

# ========= Describe the source =============
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.channels = c1
agent.sources.r1.zookeeperConnect = localhost:2181
agent.sources.r1.topic = topicB
#agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 409600000

# ========= Describe the channel =============
# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 1000
#agent.channels.c1.keep-alive = 60

# ========= Describe the sink =============
agent.sinks.s1.type = hdfs
agent.sinks.s1.hdfs.path = /user/lintong/logs/nsh/json/%{filepath}/ds=%{ds}
agent.sinks.s1.hdfs.filePrefix = test
agent.sinks.s1.hdfs.fileSuffix = .log
agent.sinks.s1.hdfs.fileType = DataStream
agent.sinks.s1.hdfs.useLocalTimeStamp = true
agent.sinks.s1.hdfs.writeFormat = Text
agent.sinks.s1.hdfs.rollCount = 0
agent.sinks.s1.hdfs.rollSize = 10240
agent.sinks.s1.hdfs.rollInterval = 600
agent.sinks.s1.hdfs.batchSize = 500
agent.sinks.s1.hdfs.threadsPoolSize = 10
agent.sinks.s1.hdfs.idleTimeout = 0
agent.sinks.s1.hdfs.minBlockReplicas = 1
agent.sinks.s1.channel = fileChannel


# ========= Bind the source and sink to the channel =============
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

 hdfs sink的配置參數參考:Flume中的HDFS Sink配置參數說明

由於寫HDFS的速度很慢,當數據量大的時候會出現一下問題

org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 1000 full, consider committing more frequently, increasing capacity, or increasing thread count

能夠將內存channel改爲file channel或者改爲kafka channel

當換成kafka channel的時候,數據量大的時候,依然會問題

16:07:48.615 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error ILLEGAL_GENERATION occurred while committing offsets for group flume
16:07:48.617 ERROR org.apache.flume.source.kafka.KafkaSource:317 - KafkaSource EXCEPTION, {}
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)

或者

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group flume

參考:flume1.7使用KafkaSource採集大量數據

Flume官方使用kafka channel的Demo

修改增大如下兩個參數

agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 409600000
agent.sources.r1.kafka.consumer.timeout.ms = 100

kafka channel 爆了

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group flume

添加參數

agent.channels.c1.kafka.consumer.session.timeout.ms=100000
agent.channels.c1.kafka.consumer.request.timeout.ms=110000
agent.channels.c1.kafka.consumer.fetch.max.wait.ms=1000

 

命令

啓動

bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console
相關文章
相關標籤/搜索