版權聲明:本文爲博主原創文章,未經博主容許不得轉載。html
Sink groups容許組織多個sink到一個實體上。 Sink processors可以提供在組內全部Sink之間實現負載均衡的能力,並且在失敗的狀況下可以進行故障轉移從一個Sink到另外一個Sink。dom
簡單的說就是一個source 對應一個Sinkgroups,即多個sink,這裏實際上與第六節的複用/複製狀況差很少,只是這裏考慮的是可靠性與性能,即故障轉移與負載均衡的設置。tcp
下面是官方配置:ide
Property Name性能 |
Default測試 |
Descriptionthis |
sinksurl |
–spa |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be default, failover or load_balance |
從參數類型上能夠看出有3種Processors類型:default, failover(故障轉移)和 load_balance(負載均衡),固然,官網上說目前自定義processors 還不支持。
下面是官網例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
DefaultSink Processor 接收單一的Sink,不強制用戶爲Sink建立Processor,前面舉了不少例子。因此這個就很少說了。
FailoverSink Processor會經過配置維護了一個優先級列表。保證每個有效的事件都會被處理。
故障轉移的工做原理是將連續失敗sink分配到一個池中,在那裏被分配一個冷凍期,在這個冷凍期裏,這個sink不會作任何事。一旦sink成功發送一個event,sink將被還原到live 池中。
在這配置中,要設置sinkgroups processor爲failover,須要爲全部的sink分配優先級,全部的優先級數字必須是惟一的,這個得格外注意。此外,failover time的上限能夠經過maxpenalty 屬性來進行設置。
下面是官網配置:
Property Name |
Default |
Description |
sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be failover |
processor.priority.<sinkName> |
– |
<sinkName> must be one of the sink instances associated with the current sink group |
processor.maxpenalty |
30000 |
(in millis) |
下面是官網例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.maxpenalty=10000
這裏首先要申明一個sinkgroups,而後再設置2個sink ,k1與k2,其中2個優先級是5和10,而processor的maxpenalty被設置爲10秒,默認是30秒。‘
下面是測試例子
[html] view plain copy
#配置文件:failover_sink_case13.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1 k2
a1.channels= c1 c2
a1.sinkgroups= g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= failover
a1.sinkgroups.g1.processor.priority.k1= 5
a1.sinkgroups.g1.processor.priority.k2= 10
a1.sinkgroups.g1.processor.maxpenalty= 10000
#Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.port= 50000
a1.sources.r1.host= 192.168.233.128
a1.sources.r1.channels= c1 c2
#Describe the sink
a1.sinks.k1.type= avro
a1.sinks.k1.channel= c1
a1.sinks.k1.hostname= 192.168.233.129
a1.sinks.k1.port= 50000
a1.sinks.k2.type= avro
a1.sinks.k2.channel= c2
a1.sinks.k2.hostname= 192.168.233.130
a1.sinks.k2.port= 50000
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
這裏設置了2個channels與2個sinks ,關於故障轉移的設置直接複製官網的例子。咱們還要配置2個sinks對於的代理。這裏的2個接受代理咱們沿用以前第六章複製的2個sink代理配置。
下面是第一個接受複製事件代理配置
[html] view plain copy
#配置文件:replicate_sink1_case11.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.233.129
a2.sources.r1.port = 50000
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
下面是第二個接受複製事件代理配置:
[html] view plain copy
#配置文件:replicate_sink2_case11.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.233.130
a3.sources.r1.port = 50000
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#敲命令
首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
在啓動源發送的代理
flume-ng agent -cconf -f conf/failover_sink_case13.conf -n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "hello failoversink" | nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
由於k1的優先級是5,K2是10所以當K2正常運行的時候,是發送到K2的。下面數據正常輸出。
而後咱們中斷K2的代理進程。
再嘗試往偵聽端口送數據
echo "hello close k2"| nc 192.168.233.128 50000
咱們發現源代理髮生事件到K2失敗,而後他將K2放入到failover list(故障列表)
由於K1仍是正常運行的,所以這個時候他會接收到數據。
而後咱們再打開K2的大理進程,咱們繼續往偵聽端口送數據
echo " hello open k2 again" | nc192.168.233.128 50000
數據正常發生,Failover SinkProcessor測試完畢。
負載均衡片處理器提供在多個Sink之間負載平衡的能力。實現支持經過round_robin(輪詢)或者random(隨機)參數來實現負載分發,默認狀況下使用round_robin,但能夠經過配置覆蓋這個默認值。還能夠經過集成AbstractSinkSelector類來實現用戶本身的選擇機制。
當被調用的時候,這選擇器經過配置的選擇規則選擇下一個sink來調用。
下面是官網配置
Property Name |
Default |
Description |
processor.sinks |
– |
Space-separated list of sinks that are participating in the group |
processor.type |
default |
The component type name, needs to be load_balance |
processor.backoff |
false |
Should failed sinks be backed off exponentially. |
processor.selector |
round_robin |
Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector |
processor.selector.maxTimeOut |
30000 |
Used by backoff selectors to limit exponential backoff (in milliseconds) |
下面是官網的例子
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=random
這個與故障轉移的設置差很少。
下面是測試例子
[html] view plain copy
#配置文件:load_sink_case14.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type =load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector =round_robin
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.233.129
a1.sinks.k1.port = 50000
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = 192.168.233.130
a1.sinks.k2.port = 50000
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這裏要說明的是,所以測試的是負載均衡的例子,所以這邊使用一個channel來做爲數據傳輸通道。這裏sinks的對應的接收數據的代理配置,咱們沿用故障轉移的接收代理配置。
#敲命令
首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1
-Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1
-Dflume.root.logger=INFO,console
在啓動源發送的代理
flume-ng agent -cconf -f conf/load_sink_case14.conf -n a1
-Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "loadbanlancetest1" | nc 192.168.233.128 50000
echo "loadbantest2" | nc 192.168.233.128 50000
echo "loadban test3"| nc 192.168.233.128 50000
echo "loadbantest4" | nc 192.168.233.128 50000
echo "loadbantest5" | nc 192.168.233.128 50000
#在啓動源發送的代理終端查看console輸出
其中K1收到3條數據
其中K1收到2條數據
由於咱們負載均衡選擇的類型是輪詢,所以能夠看出flume 讓代理每次向一個sink發送2次事件數據後就換另外一個sinks 發送。
Sink Processors測試完畢