Flume NG 學習筆記(四)Source配置

首先、這節水的東西就比較少了,大部分是例子。html

1、Avro Source與Thrift Source

Avro端口監聽並接收來自外部的Avro客戶流的事件。當內置Avro 去Sinks另外一個配對Flume代理,它就能夠建立分層採集的拓撲結構。官網說的比較繞,固然個人翻譯也很弱,其實就是flume能夠多級代理,而後代理與代理之間用Avro去鏈接java

下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了。linux

Property Nameshell

Defaultapache

Descriptionapp

channelsdom

curl


typetcp

ide

The component type name, needs to be avro

bind

hostname or IP address to listen on

port

Port # to bind to

threads

Maximum number of worker threads to spawn

selector.type



selector.*



interceptors

Space-separated list of interceptors

interceptors.*



compression-type

none

This can be 「none」 or 「deflate」. The compression-type must match the compression-type of matching AvroSource

ssl

FALSE

Set this to true to enable SSL encryption. You must also specify a 「keystore」 and a 「keystore-password」.

keystore

This is the path to a Java keystore file. Required for SSL.

keystore-password

The password for the Java keystore. Required for SSL.

keystore-type

JKS

The type of the Java keystore. This can be 「JKS」 or 「PKCS12」.

ipFilter

FALSE

Set this to true to enable ipFiltering for netty

ipFilter.rules

Define N netty ipFilter pattern rules with this config.

 

官網的例子就不放了,這邊用實際例子顯示。


[html] view plain copy

  1. #配置文件avro_case2.conf 其實和第二節的pull.conf 如出一轍  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.channels = c1  

  10. a1.sources.r1.bind = 192.168.233.128  

  11. a1.sources.r1.port = 55555  

  12.    

  13. #Describe the sink  

  14. a1.sinks.k1.typelogger  

  15. a1.sinks.k1.channelc1  

  16.    

  17. #Use a channel which buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/avro_case2.conf -n a1 -Dflume.root.logger=INFO,console

成功與否就不說明,與第二節的pull.conf 同。。。

 

#而後在另外一個終端進行測試

flume-ng avro-client -cconf -H 192.168.233.128 -p 44444 -F /tmp/logs/test.log

這個就是模擬第二節push代理費pull代理髮數據,這裏不寫配置直接命令方式測試。



發送事件成功,這裏和push代理不同的是沒有用spool,因此日誌文件名不會被更名稱。

看接受終端顯示



ok數據發送成功。

ThriftSource 與Avro Source 基本一致。只要把source的類型改爲thrift便可,例如a1.sources.r1.type = thrift

比較簡單,不作贅述。

2、Exec Source

ExecSource的配置就是設定一個Unix(Linux)命令,而後經過這個命令不斷輸出數據。若是進程退出,Exec Source也一塊兒退出,不會產生進一步的數據。

下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了。

Property Name

Default

Description

channels


type

The component type name, needs to be exec

command

The command to execute

shell

A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.

restartThrottle

10000

Amount of time (in millis) to wait before attempting a restart

restart

FALSE

Whether the executed cmd should be restarted if it dies

logStdErr

FALSE

Whether the command’s stderr should be logged

batchSize

20

The max number of lines to read and send to the channel at a time

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*



 

下面是實際例子顯示。


[html] view plain copy

  1. #配置文件exec_case3.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = exec  

  9. a1.sources.r1.command = tail -F /tmp/logs/test.log  

  10. a1.sources.r1.channels = c1  

  11.    

  12. #Describe the sink  

  13. a1.sinks.k1.typelogger  

  14. a1.sinks.k1.channelc1  

  15.    

  16. #Use a channel which buffers events in memory  

  17. a1.channels.c1.typememory  

  18. a1.channels.c1.capacity1000  

  19. a1.channels.c1.transactionCapacity100  



這裏咱們用tail –F命令去一直都日誌的尾部。

#敲命令

flume-ng agent -cconf -f conf/exec_case3.conf -n a1 -Dflume.root.logger=INFO,console

這邊會顯示讀取日誌的全部數據





上圖是日誌,這邊咱們繼續往日誌裏添加數據

echo"looklook5" >>  test.log ,會發現終端也在輸出數據。

 

3、JMS Source

官網說:JMS Sourcereads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested withActiveMQ.

簡單說的,官網JMSsource 就測試了ActiveMQ,其餘的尚未。下面是官網的例子:

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=jms

a1.sources.r1.channels=c1

a1.sources.r1.initialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory

a1.sources.r1.connectionFactory=GenericConnectionFactory

a1.sources.r1.providerURL=tcp://mqserver:61616

a1.sources.r1.destinationName=BUSINESS_DATA

a1.sources.r1.destinationType=QUEUE

下面是官網給出的source的配置,加粗的參數是必選,描述就不解釋了

Property Name

Default

Description

channels


type

The component type name, needs to be jms

initialContextFactory

Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory

The JNDI name the connection factory shoulld appear as

providerURL

The JMS provider URL

destinationName

Destination name

destinationType

Destination type (queue or topic)

messageSelector

Message selector to use when creating the consumer

userName

Username for the destination/provider

passwordFile

File containing the password for the destination/provider

batchSize

100

Number of messages to consume in one batch

converter.type

DEFAULT

Class to use to convert messages to flume events. See below.

converter.*

Converter properties.

converter.charset

UTF-8

Default converter only. Charset to use when converting JMS TextMessages to byte arrays.

 

介於這個源目前還不成熟,那咱們等他成熟了再來研究吧,這裏偷點懶。

 

4、Spooling Directory Source

Spooling Directory Source在第二節的時候已經講過,這裏複述一下:監測配置的目錄下新增的文件,並將文件中的數據讀取出來。其中,Spool Source有2個注意地方,第一個是拷貝到spool目錄下的文件不能夠再打開編輯,第二個是spool目錄下不可包含相應的子目錄。這個主要用途做爲對日誌的準實時監控。

下面是官網給出的source的配置,加粗的參數是必選。可選項太多,這邊就加一個fileSuffix,即文件讀取後添加的後綴名,這個是能夠更改。

Property Name

Default

Description

channels


type

The component type name, needs to be spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

Suffix to append to completely ingested files

下面給出例子,這個與第二節的push.conf 類似


[html] view plain copy

  1. #配置文件:spool_case4.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type =spooldir  

  9. a1.sources.r1.spoolDir =/tmp/logs  

  10. a1.sources.r1.fileHeadertrue  

  11. a1.sources.r1.channels =c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.type = logger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  



這裏咱們監控日誌目錄/tmp/logs

#敲命令

flume-ng agent -cconf -f conf/spool_case4.conf -n a1 -Dflume.root.logger=INFO,console



終端將數據都顯示出來了。咱們查看監控日誌目錄/tmp/logs



被讀取的文件已經被加上後綴名,表示已經完成讀取。

 

5、NetCat Source

Netcat source 在某一端口上進行偵聽,它將每一行文字變成一個事件源,也就是數據是基於換行符分隔。它的工做就像命令nc -k -l [host] [port] 換句話說,它打開一個指定端口,偵聽數據將每一行文字變成Flume事件,並經過鏈接通道發送。

下面是官網給出的source的配置,加粗的參數是必選

Property Name

Default

Description

channels


type

The component type name, needs to be netcat

bind

Host name or IP address to bind to

port

Port # to bind to

max-line-length

512

Max line length per event body (in bytes)

ack-every-event

TRUE

Respond with an 「OK」 for every event received

selector.type

replicating

replicating or multiplexing

selector.*


Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*



實際例子話,第二節的第一個例子就是Netcat source,這裏不演示了。

 

6、Sequence Generator Source

一個簡單的序列發生器,不斷產成與事件計數器0和1的增量開始。主要用於測試(官網說),這裏也不作贅述。

 

7、Syslog Sources

讀取syslog數據,並生成Flume 事件。 這個Source分紅三類SyslogTCP Source、

Multiport Syslog TCP Source(多端口)與SyslogUDP Source。其中TCP Source爲每個用回車(\ n)來分隔的字符串建立一個新的事件。而UDP Source將整個消息做爲一個單一的事件。

 

7.一、Syslog TCPSource

這個是最初的Syslog Sources

下面是官網給出的source的配置,加粗的參數是必選,這裏可選我省略了。

Property Name

Default

Description

channels


type

The component type name, needs to be syslogtcp

host

Host name or IP address to bind to

port

Port # to bind to

官網案例

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogtcp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是實際的例子


[html] view plain copy

  1. #配置文件:syslog_case5.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type =syslogtcp  

  9. a1.sources.r1.port =50000  

  10. a1.sources.r1.host =192.168.233.128  

  11. a1.sources.r1.channels =c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.type = logger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  




這裏咱們設置的偵聽端口爲192.168.233.128 50000

#敲命令

flume-ng agent -cconf -f conf/syslog_case5.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hellolooklook5" | nc 192.168.233.128 50000

而後看以前的終端,將會有以下顯示:



數據已經發送過來了。

 

7.2 Multiport Syslog TCP Source

這是一個更新,更快,支持多端口版本的SyslogTCP Source。他不只僅監控一個端口,還能夠監控多個端口。官網配置基本差很少,就是可選配置比較多

Property Name

Default

Description

channels


type

The component type name, needs to be multiport_syslogtcp

host

Host name or IP address to bind to.

ports

Space-separated list (one or more) of ports to bind to.

portHeader

If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.

這裏說明下須要注意的是這裏ports設置已經取代tcp 的port,這個千萬注意。還有portHeader這個能夠與後面的interceptors 與 channel selectors自定義邏輯路由使用。

下面是官網例子:

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=multiport_syslogtcp

a1.sources.r1.channels=c1

a1.sources.r1.host=0.0.0.0

a1.sources.r1.ports=10001 10002 10003

a1.sources.r1.portHeader=port

下面是實際例子


[html] view plain copy

  1. #配置文件:syslog_case6.conf  

  2. # Name thecomponents on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = multiport_syslogtcp  

  9. a1.sources.r1.ports = 50000 60000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe thesink  

  14. a1.sinks.k1.typelogger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channelwhich buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  




這裏咱們偵探192.168.233.128的2個端口50000與60000

#敲命令

flume-ng agent -cconf -f conf/syslog_case6.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hellolooklook5" | nc 192.168.233.128 50000

echo "hello looklook6"| nc 192.168.233.128 60000

而後看以前的終端,將會有以下顯示:



2個端口的數據已經發送過來了。

 

7.3 Syslog UDP Source

關於這個官網都懶的介紹了,其實就是與TCP不一樣的協議而已。

官網配置與TCP一致,就不說了。下面是官網例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=syslogudp

a1.sources.r1.port=5140

a1.sources.r1.host=localhost

a1.sources.r1.channels=c1

下面是實際例子


[html] view plain copy

  1. #配置文件:syslog_case7.conf  

  2. # Name thecomponents on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.type = syslogudp  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe thesink  

  14. a1.sinks.k1.typelogger  

  15.  a1.sinks.k1.channel = c1  

  16.    

  17. # Use a channelwhich buffers events in memory  

  18. a1.channels.c1.typememory  

  19. a1.channels.c1.capacity1000  

  20. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/syslog_case7.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hellolooklook5" | nc –u 192.168.233.128 50000

#在啓動的終端查看console輸出



Ok,數據已經發送過來了

 

 

8、HTTP Source

HTTP Source是HTTP POST和GET來發送事件數據的,官網說GET應只用於實驗。Flume 事件使用一個可插拔的「handler」程序來實現轉換,它必須實現的HTTPSourceHandler接口。此處理程序須要一個HttpServletRequest和返回一個flume 事件列表。

全部在一個POST請求發送的事件被認爲是在一個事務裏,一個批量插入flume 通道的行爲。

下面是官網給出的source的配置,加粗的參數是必選

Property Name

Default

Description

type


The component type name, needs to be http

port

The port the source should bind to.

bind

0.0.0.0

The hostname or IP address to listen on

handler

org.apache.flume.source.http.JSONHandler

The FQCN of the handler class.

官網例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.type=http

a1.sources.r1.port=5140

a1.sources.r1.channels=c1

a1.sources.r1.handler=org.example.rest.RestHandler

a1.sources.r1.handler.nickname=random props

下面是實際用例:



[html] view plain copy

  1. #配置文件:http_case8.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1  

  5. a1.channelsc1  

  6.    

  7. #Describe/configure the source  

  8. a1.sources.r1.typehttp  

  9. a1.sources.r1.port50000  

  10. a1.sources.r1.channelsc1  

  11.    

  12. #Describe the sink  

  13. a1.sinks.k1.typelogger  

  14.  a1.sinks.k1.channel = c1  

  15.    

  16. #Use a channel which buffers events in memory  

  17. a1.channels.c1.typememory  

  18. a1.channels.c1.capacity1000  

  19. a1.channels.c1.transactionCapacity100  



#敲命令

flume-ng agent -cconf -f conf/http_case8.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

#咱們用生成JSON 格式的POSTrequest發數據

curl -X POST -d '[{"headers" :{"looklook1" : "looklook1 isheader","looklook2" : "looklook2 isheader"},"body" : "hello looklook5"}]' http://192.168.233.128:50000

#在啓動的終端查看console輸出



這裏headersbody都正常輸出。

 

9、Twitter 1%firehose Source(實驗的)

官網警告,慎用,說不定下個版本就木有了

這個實驗source 是經過時搜索服務,從Twitter1%樣本信息中獲取事件數據。須要Twitter開發者帳號。好吧,對於400網站,咱們迫不得已用不到,就很少解釋了。

 


10、自定義Source

一個自定義 Source實際上是對Source接口的實現。當咱們開始flume代理的時候必須將自定義 Source和相依賴的jar包放到代理的classpath下面。自定義 Sourcetype就是咱們實現Source接口對應的類全路徑。

這裏後面的內容裏會詳細介紹,這裏不作贅述。

相關文章
相關標籤/搜索