Flume是一個分佈式、可靠、和高可用的海量日誌聚合的系統,支持在系統中定製各種數據發送方,用於收集數據;
同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(可定製)的能力。html
Flume是一個專門設計用來從大量的源,推送數據到Hadoop生態系統中各類各樣存儲系統中去的,例如HDFS和HBase。java
Guide: http://flume.apache.org/FlumeUserGuide.html正則表達式
Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。express
Flume以Flume Agent爲最小的獨立運行單位。一個Agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成。一個Flume Agent能夠鏈接一個或者多個其餘的Flume Agent;一個Flume Agent也能夠從一個或者多個Flume Agent接收數據。apache
注意:在Flume管道中若是有意想不到的錯誤、超時並進行了重試,Flume會產生重複的數據最終被被寫入,後續須要處理這些冗餘的數據。json
具體能夠參考文章:Flume教程(一) Flume入門教程 bootstrap
Source:source是從一些其餘產生數據的應用中接收數據的活躍組件。Source能夠監聽一個或者多個網絡端口,用於接收數據或者能夠從本地文件系統讀取數據。每一個Source必須至少鏈接一個Channel。基於一些標準,一個Source能夠寫入幾個Channel,複製事件到全部或者某些Channel。數組
Source能夠經過處理器 - 攔截器 - 選擇器路由寫入多個Channel。安全
Channel:channel的行爲像隊列,Source寫入到channel,Sink從Channel中讀取。多個Source能夠安全地寫入到相同的Channel,而且多個Sink能夠從相同的Channel進行讀取。bash
但是一個Sink只能從一個Channel讀取。若是多個Sink從相同的Channel讀取,它能夠保證只有一個Sink將會從Channel讀取一個指定特定的事件。
Flume自帶兩類Channel:Memory Channel和File Channel。Memory Channel的數據會在JVM或者機器重啓後丟失;File Channel不會。
Sink: sink連續輪詢各自的Channel來讀取和刪除事件。
攔截器:每次Source將數據寫入Channel,它是經過委派該任務到其Channel處理器來完成,而後Channel處理器將這些事件傳到一個或者多個Source配置的攔截器中。
攔截器是一段代碼,基於某些標準,如正則表達式,攔截器能夠用來刪除事件,爲事件添加新報頭或者移除現有的報頭等。每一個Source能夠配置成使用多個攔截器,按照配置中定義的順序被調用,將攔截器的結果傳遞給鏈的下一個單元。一旦攔截器處理完事件,攔截器鏈返回的事件列表傳遞到Channel列表,即經過Channel選擇器爲每一個事件選擇的Channel。
組件 |
功能 |
Agent |
使用JVM運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。 |
Client |
生產數據,運行在一個獨立的線程。 |
Source |
從Client收集數據,傳遞給Channel。 |
Sink |
從Channel收集數據,運行在一個獨立線程。 |
Channel |
鏈接sources和sinks,這個有點像一個隊列。 |
Events |
能夠是日誌記錄、avro對象等。 |
Flume Agent使用純文本配置文件來配置。Flume配置使用屬性文件格式,僅僅是用換行符分隔的鍵值對的純文本文件,如:key1 = value1;當有多個的時候:agent.sources = r1 r2
參考 flume配置介紹
1. 從file source 到 file sink的配置文件
# ========= Name the components on this agent ========= agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = Inteceptor.DemoInterceptor$Builder # ========= Describe the source ============= agent.sources.r1.type = spooldir agent.sources.r1.spoolDir = /home/lintong/桌面/data/input # ========= Describe the channel ============= # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 100000 agent.channels.c1.transactionCapacity = 1000 # ========= Describe the sink ============= agent.sinks.s1.type = file_roll agent.sinks.s1.sink.directory = /home/lintong/桌面/data/output agent.sinks.s1.sink.rollInterval = 0 # ========= Bind the source and sink to the channel ============= agent.sources.r1.channels = c1 agent.sinks.s1.channel = c1
2. 從kafka source 到 file sink的配置文件,kafka使用zookeeper,可是建議使用bootstrap-server
# ========= Name the components on this agent ========= agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = Inteceptor.DemoInterceptor$Builder # ========= Describe the source ============= agent.sources.r1.type=org.apache.flume.source.kafka.KafkaSource agent.sources.r1.zookeeperConnect=127.0.0.1:2181 agent.sources.r1.topic=test #不能寫成topics #agent.sources.kafkaSource.groupId=flume agent.sources.kafkaSource.kafka.consumer.timeout.ms=100 # ========= Describe the channel ============= # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 100000 agent.channels.c1.transactionCapacity = 1000 # ========= Describe the sink ============= agent.sinks.s1.type = file_roll agent.sinks.s1.sink.directory = /home/lintong/桌面/data/output agent.sinks.s1.sink.rollInterval = 0 # ========= Bind the source and sink to the channel ============= agent.sources.r1.channels = c1 agent.sinks.s1.channel = c1
3.kafka source到kafka sink的配置文件
# ========= Name the components on this agent ========= agent.sources = r1 agent.channels = c1 agent.sinks = s1 s2 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = com.XXX.interceptor.XXXInterceptor$Builder # ========= Describe the source ============= agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.channels = c1 agent.sources.r1.zookeeperConnect = localhost:2181 agent.sources.r1.topic = input # ========= Describe the channel ============= # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 100000 agent.channels.c1.transactionCapacity = 1000 # ========= Describe the sink ============= agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.s1.topic = test agent.sinks.s1.brokerList = localhost:9092 # 避免死循環 agent.sinks.s1.allowTopicOverride = false agent.sinks.s2.type = file_roll agent.sinks.s2.sink.directory = /home/lintong/桌面/data/output agent.sinks.s2.sink.rollInterval = 0 # ========= Bind the source and sink to the channel ============= agent.sources.r1.channels = c1 agent.sinks.s1.channel = c1 #agent.sinks.s2.channel = c1
4.file source到hbase sink的配置文件
從文件讀取實時消息,不作處理直接存儲到Hbase
# ========= Name the components on this agent ========= agent.sources = r1 agent.channels = c1 agent.sinks = s1 # ========= Describe the source ============= agent.sources.r1.type = exec agent.sources.r1.command = tail -f /home/lintong/桌面/test.log agent.sources.r1.checkperiodic = 50 # ========= Describe the sink ============= agent.channels.c1.type = memory agent.channels.c1.capacity = 100000 agent.channels.c1.transactionCapacity = 1000 # agent.channels.file-channel.type = file # agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint # agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data # ========= Describe the sink ============= agent.sinks.s1.type = org.apache.flume.sink.hbase.HBaseSink agent.sinks.s1.zookeeperQuorum=master:2183 #HBase表名 agent.sinks.s1.table=mikeal-hbase-table #HBase表的列族名稱 agent.sinks.s1.columnFamily=familyclom1 agent.sinks.s1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer #HBase表的列族下的某個列名稱 agent.sinks.s1.serializer.payloadColumn=cloumn-1 # ========= Bind the source and sink to the channel ============= agent.sources.r1.channels = c1 agent.sinks.s1.channel=c1
若是在啓動flume的時候遇到
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/***
解決方案,在 ~/software/apache/hadoop-2.9.1/etc/hadoop/hadoop-env.sh 中添加
HADOOP_CLASSPATH=/home/lintong/software/apache/hbase-1.2.6/lib/*
5.kafka source到hdfs sink的配置文件
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' # ========= Name the components on this agent ========= agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = Util.HdfsInterceptor$Builder # ========= Describe the source ============= agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.channels = c1 agent.sources.r1.zookeeperConnect = localhost:2181 agent.sources.r1.topic = topicB #agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 409600000 # ========= Describe the channel ============= # Use a channel which buffers events in memory agent.channels.c1.type = memory agent.channels.c1.capacity = 100000 agent.channels.c1.transactionCapacity = 1000 #agent.channels.c1.keep-alive = 60 # ========= Describe the sink ============= agent.sinks.s1.type = hdfs agent.sinks.s1.hdfs.path = /user/lintong/logs/nsh/json/%{filepath}/ds=%{ds} agent.sinks.s1.hdfs.filePrefix = test agent.sinks.s1.hdfs.fileSuffix = .log agent.sinks.s1.hdfs.fileType = DataStream agent.sinks.s1.hdfs.useLocalTimeStamp = true agent.sinks.s1.hdfs.writeFormat = Text agent.sinks.s1.hdfs.rollCount = 0 agent.sinks.s1.hdfs.rollSize = 10240 agent.sinks.s1.hdfs.rollInterval = 600 agent.sinks.s1.hdfs.batchSize = 500 agent.sinks.s1.hdfs.threadsPoolSize = 10 agent.sinks.s1.hdfs.idleTimeout = 0 agent.sinks.s1.hdfs.minBlockReplicas = 1 agent.sinks.s1.channel = fileChannel # ========= Bind the source and sink to the channel ============= agent.sources.r1.channels = c1 agent.sinks.s1.channel = c1
hdfs sink的配置參數參考:Flume中的HDFS Sink配置參數說明
由於寫HDFS的速度很慢,當數據量大的時候會出現一下問題
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 1000 full, consider committing more frequently, increasing capacity, or increasing thread count
能夠將內存channel改爲file channel或者改爲kafka channel
當換成kafka channel的時候,數據量大的時候,依然會問題
16:07:48.615 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error ILLEGAL_GENERATION occurred while committing offsets for group flume 16:07:48.617 ERROR org.apache.flume.source.kafka.KafkaSource:317 - KafkaSource EXCEPTION, {} org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
或者
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group flume
參考:flume1.7使用KafkaSource採集大量數據
修改增大如下兩個參數
agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 409600000 agent.sources.r1.kafka.consumer.timeout.ms = 100
kafka channel 爆了
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550 - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group flume
添加參數
agent.channels.c1.kafka.consumer.session.timeout.ms=100000 agent.channels.c1.kafka.consumer.request.timeout.ms=110000 agent.channels.c1.kafka.consumer.fetch.max.wait.ms=1000
啓動
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console