flume 1.7 安裝與使用

Flume安裝

系統要求: 
需安裝JDK 1.7及以上版本php

一、 下載二進制包 
下載頁面:http://flume.apache.org/download.html 
1.7.0下載地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gzcss

二、解壓html

$ cp ~/Downloads/apache-flume-1.7.0-bin.tar.gz ~ $ cd $ tar -zxvf apache-flume-1.7.0-bin.tar.gz $ cd apache-flume-1.7.0-bin

三、建立flume-env.sh文件java

$ cp conf/flume-env.sh.template conf/flume-env.sh

簡單實例-傳輸指定文件

場景:兩臺機器,一臺爲client,一臺爲agent,在client上將指定文件傳輸到agent機器上。node

一、建立配置文件ios

根據flume自身提供的模板,建立flume.conf配置文件。nginx

$ cp conf/flume-conf.properties.template conf/flume.conf

編輯文件flume.conf:apache

$ vi conf/flume.conf

在文件末尾加入如下配置:ruby

# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 agent1.sinks.log-sink1.type = logger # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = log-sink1

保存,而且退出:bash

二、啓動flume server 
在做爲agent的機器上執行如下:

bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1

三、在新的窗口開啓client 
在做爲client的機器上執行如下: 
(因爲當前環境是在單機上模擬兩臺機器,因此,直接在新的終端中輸入如下命令)

$ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console

四、結果 
這個時候,你能夠看到如下消息:

2012-03-16 16:39:17,124 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:175)] Finished 2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:178)] Closing reader 2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:183)] Closing transceiver 2012-03-16 16:39:17,129 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:73)] Exiting

在前面那個開啓flume server的窗口,能夠看到以下消息:

2012-03-16 16:39:16,738 (New I/O server boss #1 ([id: 0x49e808ca, /0:0:0:0:0:0:0:0:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /1 27.0.0.1:39577 => /127.0.0.1:41414] OPEN 2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] BOU ND: /127.0.0.1:41414 2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] CON NECTED: /127.0.0.1:39577 2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] DISCONNECTED 2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] UNBOUND 2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] CLOSED 2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@5c1ae90c } 2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6aba4211 } 2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6a47a0d4 } 2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@48ff4cf } ...

簡單實例-將目錄文件上傳到HDFS

場景:將機器上的某個文件夾下的文件上傳到HDFS上。

一、配置conf/flume.conf

# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.spooldir-source1.channels = ch1 agent1.sources.spooldir-source1.type = spooldir agent1.sources.spooldir-source1.spoolDir=/home/hadoop/flume-1.7.0/tmpData agent1.sources.spooldir-source1.bind = 0.0.0.0 agent1.sources.spooldir-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.hdfs-sink1.channel = ch1 agent1.sinks.hdfs-sink1.type = hdfs agent1.sinks.hdfs-sink1.hdfs.path = hdfs://master:9000/test agent1.sinks.hdfs-sink1.hdfs.filePrefix = events- agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true agent1.sinks.hdfs-sink1.hdfs.round = true agent1.sinks.hdfs-sink1.hdfs.roundValue = 10 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = spooldir-source1 agent1.sinks = hdfs-sink1

其中,/home/hadoop/flume-1.7.0/tmpData是我要上傳的文件所在目錄,也就是,我要將此文件夾下的文件都上傳到HDFS上的hdfs://master:9000/test目錄。

注意

  • 這樣的配置會產生許多小文件,由於默認狀況下,一個文件存儲10個event,這個配置由rollCount控制,默認爲10,此外還有一個參數爲rollSize,這個是控制一個文件的大小,若是文件大於這個數值,就是另起一文件。
  • 此時的文件名都是以event開頭,若是想保留原來文件的名字,可使用如下配置(其中,basenameHeader是相對source而言,filePrefix是相對sink而言,分別這樣設置以後,上傳到hdfs上的文件名就會變成「原始文件名.時間戳」):
agent1.sources.spooldir-source1.basenameHeader = true agent1.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}

二、啓動agent 
使用如下命令啓動agent:

bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console

三、查看結果 
到Hadoop提供的WEB GUI界面能夠看到剛剛上傳的文件是否成功。 
GUI界面地址爲:http://master:50070/explorer.html#/test 
其中,master爲Hadoop的Namenode所在的機器名。

四、總結 
在這個場景,須要將文件上傳到HDFS上,會使用到幾個Hadoop的jar包,分別是:

${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar ${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar ${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar ${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar

異常

Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError org/apache/hadoop/io/SequenceFile$CompressionType

2016-11-03 14:49:35,278 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType

問題緣由:缺乏依賴包,這個依賴包是如下jar文件:

${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar

解決方法:找到這個jar文件,copy到flume安裝目錄下的lib目錄下就ok了。

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

2016-11-03 16:32:06,741 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:256) at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:465) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745)

解決方法: 
編輯conf/flume.conf文件,其中agent1,sink1替換成你本身的agent和sink

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration

2016-11-03 16:32:55,594 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:38) at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:36) at org.apache.hadoop.security.UserGroupInformation$UgiMetrics.create(UserGroupInformation.java:106) at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:208) at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2554) at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2546) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2412) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232) at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 18 more

解決方法: 
缺乏的依賴在commons-configuration-1.6.jar包裏,這個包在${HADOOP_HOME}share/hadoop/common/lib/下,將其拷貝到flume的lib目錄下。

cp ${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar ${FLUME_HOME}/lib/

java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName

2016-11-03 16:41:54,629 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName

解決方法: 
缺乏hadoop-auth-2.4.0.jar依賴,一樣將其拷貝到flume的lib目錄下:

cp ${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar ${FLUME_HOME}/lib/

HDFS IO error java.io.IOException: No FileSystem for scheme: hdfs

2016-11-03 16:49:26,638 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:443)] HDFS IO error java.io.IOException: No FileSystem for scheme: hdfs

缺乏依賴:hadoop-hdfs-2.4.0.jar

cp ${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar ${FLUME_HOME}/lib/
相關文章
相關標籤/搜索