二.Flume source

1 Flume source經常使用類型

1.1 Avro Source

1.1.1 概述

文檔定義:
Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. html

監聽Avro端口,從Avro Client 流中接收Agents
當與另外一個(前一跳)的flume agent內置的Avro Sink匹配時,塔可以夠建立分層收集拓撲。java

加粗爲必須配置的屬性
node

1.1.2 示例

建立配置文件
cd jobs
vim avro_demo1.conflinux

#定義agent的sources,sinks,channels的名字
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置source
a1.sources.s1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = hadoop10 # 這裏也能夠使用ip地址,我這裏指定主機名,但要配置好/etc/hosts
a1.sources.s1.port = 33333

#配置channels
a1.channels.c1.type = memory

#配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

#爲sources和sinks綁定channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

啓動flumeapache

flume-ng agent -n a1 -c conf/ -f jobs/avro_demo1.conf -Dflume.root.logger=INFO,console

另啓動一個終端,經過flume提供的avro客戶端向指定機器指定端口發送日誌信息vim

echo "hello world" > avro_log.txt
flume-ng avro-client -c conf/ -H hadoop10 -p 33333 -F avro_log.txt

在監聽處觀察app

2020-12-26 10:07:58,494 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] OPEN
2020-12-26 10:07:58,495 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] BOUND: /192.168.122.10:33333
2020-12-26 10:07:58,495 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 => /192.168.122.10:33333] CONNECTED: /192.168.122.10:51714
2020-12-26 10:07:58,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 64                   hello word }
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] DISCONNECTED
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] UNBOUND
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x9c055f8e, /192.168.122.10:51714 :> /192.168.122.10:33333] CLOSED
2020-12-26 10:07:58,760 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.122.10:51714 disconnected.

1.2 Exec Source

1.2.1 概述

文檔定義
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.less

ExecSource它經過配置,設定一個Unix(linux)命令,而後經過這個命令不斷輸出數據,若是進程退出了,那ExecSource也跟着一塊兒退出,再也不產生數據。
ide

1.2.2 示例

1)建立配置文件
touch exec_demo.conf
vim exec_demo.confoop

#Name the components on this agent  
a1.sources= s1  
a1.sinks= k1  
a1.channels= c1  
   
#配置sources
a1.sources.s1.type = exec  
a1.sources.s1.command = tail -F /home/v2admin/exec_demo.log  
 

#配置channel
a1.channels.c1.type= memory
   
#配置sinks 
a1.sinks.k1.type= logger

#關聯channel
a1.sources.s1.channels = c1   
a1.sinks.k1.channel= c1

2)在/home/v2admin下定義個日誌文件
touch exec_demo.log
3)啓動flume
flume-ng agent -n a1 -c conf/ -f job/exec_demo.conf -Dflume.root.logger=INFO,console
4)另起一個終端,在exec_demo.log中追加內容

[v2admin@hadoop10 ~]$ echo "hello world" >> exec_demo.log 
[v2admin@hadoop10 ~]$ echo "this is my demo" >> exec_demo.log

5)在監聽窗口觀察日誌

2020-12-26 10:30:12,616 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }
2020-12-26 10:30:37,329 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 69 73 20 69 73 20 6D 79 20 64 65 6D 6F    this is my demo }

1.3 Spooling Directory Source

1.3.1 概述

官網

This source lets you ingest data by placing files to be ingested into a 「spooling」 directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.
Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:
1.If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
2.If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.
Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.

Spooling Directory Source可以監測配置的目錄下新增的文件,並將文件中的數據讀取出來,可是它不適合對實時追加日誌的文件進行監聽並同步。
加粗屬性爲必要配置,其餘可選

1.3.2 示例

1)建立配置文件
touch spooldir_demo.conf
vim spooldir_demo.conf

#定義agent的sources,sinks,channels的名字
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /home/v2admin/demo
a1.sources.s1.fileHeader = true

#配置channels
a1.channels.c1.type = memory

#配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

#爲sources和sinks綁定channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2)在/home/v2admin建立demo目錄
mkdir demo
3)啓動flume

flume-ng agent -n a1 -c conf/ -f jobs/spooldir_demo.conf -Dflume.root.logger=INFO,console

4)另啓一個終端

cd /home/v2admin/demo
echo "hello world" >> spooldir.log

5)在監聽出觀察

2020-12-26 11:28:40,815 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2020-12-26 11:28:40,815 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /home/v2admin/demo/spooldir.log to /home/v2admin/demo/spooldir.log.COMPLETED
2020-12-26 11:28:40,817 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/v2admin/demo/spooldir.log} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

6)spooldir目錄下的文件不能夠再次編輯

// 再次執行下
echo "hello world" >> spooldir.log

觀察

[ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:296)] FATAL: Spool Directory source s1: { spoolDir: /home/v2admin/demo }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/v2admin/demo/spooldir.log.COMPLETED
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:528)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:475)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:386)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:263)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1.4 Taildir Source

1.4.1 概述

Taildir Source適合用於監聽多個實時追加的文件,而且可以實現斷點續傳。

1.4.2 示例

1)建立配置文件
vim taildir_demo.conf

#Name the components on this agent  
a1.sources= s1  
a1.sinks= k1  
a1.channels= c1  
   
#配置sources
a1.sources.s1.type = TAILDIR
a1.sources.s1.positionFile = /home/v2admin/demo/pos.log
a1.sources.s1.filegroups = f
a1.sources.s1.filegroups.f = /home/v2admin/demo/f/.*.log.*
 

#配置channel
a1.channels.c1.type= memory
   
#配置sinks 
a1.sinks.k1.type= logger

#關聯channel
a1.sources.s1.channels = c1   
a1.sinks.k1.channel= c1

2)在/home/v2admin/demo目錄建立f目錄
mkdir f
3)啓動flume

flume-ng agent -n a1 -c conf/ -f jobs/taildir_demo.conf -Dflume.root.logger=INFO,console

4)進入/home/v2admin/demo/f目錄

echo "hello world" > helloworld.log
echo "this is a demo" >> helloworld.log

5)在監聽處觀看

2020-12-26 11:55:45,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }
2020-12-26 11:57:03,089 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 69 73 20 69 73 20 61 20 64 65 6D 6F       this is a demo }

6)查看pos.log

[{"inode":73056560,"pos":27,"file":"/home/v2admin/demo/f/helloworld.log"}]

其餘更多參見文檔 http://flume.apache.org/FlumeUserGuide.html

相關文章
相關標籤/搜索