Flume NG 學習筆記(七)Sink Processors(故障轉移與負載均衡)測試

目錄(?)[+]負載均衡

Sink groups容許組織多個sink到一個實體上。 Sink processors可以提供在組內全部Sink之間實現負載均衡的能力,並且在失敗的狀況下可以進行故障轉移從一個Sink到另外一個Sink。dom

簡單的說就是一個source 對應一個Sinkgroups,即多個sink,這裏實際上與第六節的複用/複製狀況差很少,只是這裏考慮的是可靠性與性能,即故障轉移與負載均衡的設置。tcp

下面是官方配置:ide

Property Name性能

Default測試

Descriptionthis

sinksurl

spa

Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be defaultfailover or load_balance

從參數類型上能夠看出有3種Processors類型:defaultfailover(故障轉移)和 load_balance(負載均衡),固然,官網上說目前自定義processors 還不支持。

下面是官網例子

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=load_balance

 

1、Default Sink Processor

DefaultSink Processor 接收單一的Sink,不強制用戶爲Sink建立Processor,前面舉了不少例子。因此這個就很少說了。

 

2、Failover Sink Processor(故障轉移)

         FailoverSink Processor會經過配置維護了一個優先級列表。保證每個有效的事件都會被處理。

         故障轉移的工做原理是將連續失敗sink分配到一個池中,在那裏被分配一個冷凍期,在這個冷凍期裏,這個sink不會作任何事。一旦sink成功發送一個event,sink將被還原到live 池中。

         在這配置中,要設置sinkgroups processor爲failover,須要爲全部的sink分配優先級,全部的優先級數字必須是惟一的,這個得格外注意。此外,failover time的上限能夠經過maxpenalty 屬性來進行設置。

下面是官網配置:

Property Name

Default

Description

sinks

Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be failover

processor.priority.<sinkName>

<sinkName> must be one of the sink instances associated with the current sink group

processor.maxpenalty

30000

(in millis)

下面是官網例子

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=failover

a1.sinkgroups.g1.processor.priority.k1=5

a1.sinkgroups.g1.processor.priority.k2=10

a1.sinkgroups.g1.processor.maxpenalty=10000

這裏首先要申明一個sinkgroups,而後再設置2個sink ,k1與k2,其中2個優先級是5和10,而processor的maxpenalty被設置爲10秒,默認是30秒。‘

下面是測試例子


[html] view plain copy

  1. #配置文件:failover_sink_case13.conf  

  2. #Name the components on this agent  

  3. a1.sourcesr1  

  4. a1.sinksk1 k2  

  5. a1.channelsc1 c2  

  6.    

  7. a1.sinkgroupsg1  

  8. a1.sinkgroups.g1.sinksk1 k2  

  9. a1.sinkgroups.g1.processor.typefailover  

  10. a1.sinkgroups.g1.processor.priority.k15  

  11. a1.sinkgroups.g1.processor.priority.k210  

  12. a1.sinkgroups.g1.processor.maxpenalty10000  

  13.    

  14. #Describe/configure the source  

  15. a1.sources.r1.typesyslogtcp  

  16. a1.sources.r1.port50000  

  17. a1.sources.r1.host192.168.233.128  

  18. a1.sources.r1.channelsc1 c2  

  19.    

  20. #Describe the sink  

  21. a1.sinks.k1.typeavro  

  22. a1.sinks.k1.channelc1  

  23. a1.sinks.k1.hostname192.168.233.129  

  24. a1.sinks.k1.port50000  

  25.    

  26. a1.sinks.k2.typeavro  

  27. a1.sinks.k2.channelc2  

  28. a1.sinks.k2.hostname192.168.233.130  

  29. a1.sinks.k2.port50000  

  30. # Usea channel which buffers events in memory  

  31. a1.channels.c1.typememory  

  32. a1.channels.c1.capacity1000  

  33. a1.channels.c1.transactionCapacity100  



這裏設置了2個channels與2個sinks ,關於故障轉移的設置直接複製官網的例子。咱們還要配置2個sinks對於的代理。這裏的2個接受代理咱們沿用以前第六章複製的2個sink代理配置。

下面是第一個接受複製事件代理配置


[html] view plain copy

  1. #配置文件:replicate_sink1_case11.conf  

  2. # Name the components on this agent  

  3. a2.sources = r1  

  4. a2.sinks = k1  

  5. a2.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a2.sources.r1.type = avro  

  9. a2.sources.r1.channels = c1  

  10. a2.sources.r1.bind = 192.168.233.129  

  11. a2.sources.r1.port = 50000  

  12.    

  13. # Describe the sink  

  14. a2.sinks.k1.type = logger  

  15. a2.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a2.channels.c1.type = memory  

  19. a2.channels.c1.capacity = 1000  

  20. a2.channels.c1.transactionCapacity = 100  



下面是第二個接受複製事件代理配置:


[html] view plain copy

  1. #配置文件:replicate_sink2_case11.conf  

  2. # Name the components on this agent  

  3. a3.sources = r1  

  4. a3.sinks = k1  

  5. a3.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a3.sources.r1.type = avro  

  9. a3.sources.r1.channels = c1  

  10. a3.sources.r1.bind = 192.168.233.130  

  11. a3.sources.r1.port = 50000  

  12.    

  13. # Describe the sink  

  14. a3.sinks.k1.type = logger  

  15. a3.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a3.channels.c1.type = memory  

  19. a3.channels.c1.capacity = 1000  

  20. a3.channels.c1.transactionCapacity = 100  



#敲命令

首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。

flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console

在啓動源發送的代理

flume-ng agent -cconf -f conf/failover_sink_case13.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "hello failoversink" | nc 192.168.233.128 50000

#在啓動源發送的代理終端查看console輸出

由於k1的優先級是5,K2是10所以當K2正常運行的時候,是發送到K2的。下面數據正常輸出。



而後咱們中斷K2的代理進程。



再嘗試往偵聽端口送數據

echo "hello close k2"| nc 192.168.233.128 50000

咱們發現源代理髮生事件到K2失敗,而後他將K2放入到failover list(故障列表)



由於K1仍是正常運行的,所以這個時候他會接收到數據。



而後咱們再打開K2的大理進程,咱們繼續往偵聽端口送數據

echo " hello open k2 again" | nc192.168.233.128 50000



數據正常發生,Failover SinkProcessor測試完畢。

3、Load balancing SinkProcessor

負載均衡片處理器提供在多個Sink之間負載平衡的能力。實現支持經過round_robin(輪詢)或者random(隨機)參數來實現負載分發,默認狀況下使用round_robin,但能夠經過配置覆蓋這個默認值。還能夠經過集成AbstractSinkSelector類來實現用戶本身的選擇機制。

當被調用的時候,這選擇器經過配置的選擇規則選擇下一個sink來調用。

下面是官網配置

 

Property Name

Default

Description

processor.sinks

Space-separated list of sinks that are participating in the group

processor.type

default

The component type name, needs to be load_balance

processor.backoff

false

Should failed sinks be backed off exponentially.

processor.selector

round_robin

Selection mechanism. Must be either round_robinrandom or FQCN of custom class that inherits from AbstractSinkSelector

processor.selector.maxTimeOut

30000

Used by backoff selectors to limit exponential backoff (in milliseconds)

下面是官網的例子

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1 k2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.backoff=true

a1.sinkgroups.g1.processor.selector=random

這個與故障轉移的設置差很少。

下面是測試例子


[html] view plain copy

  1. #配置文件:load_sink_case14.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1 k2  

  5. a1.channels = c1  

  6.    

  7. a1.sinkgroups = g1  

  8. a1.sinkgroups.g1.sinks = k1 k2  

  9. a1.sinkgroups.g1.processor.type =load_balance  

  10. a1.sinkgroups.g1.processor.backoff = true  

  11. a1.sinkgroups.g1.processor.selector =round_robin  

  12.    

  13. # Describe/configure the source  

  14. a1.sources.r1.type = syslogtcp  

  15. a1.sources.r1.port = 50000  

  16. a1.sources.r1.host = 192.168.233.128  

  17. a1.sources.r1.channels = c1  

  18.    

  19. # Describe the sink  

  20. a1.sinks.k1.type = avro  

  21. a1.sinks.k1.channel = c1  

  22. a1.sinks.k1.hostname = 192.168.233.129  

  23. a1.sinks.k1.port = 50000  

  24.    

  25. a1.sinks.k2.type = avro  

  26. a1.sinks.k2.channel = c1  

  27. a1.sinks.k2.hostname = 192.168.233.130  

  28. a1.sinks.k2.port = 50000  

  29. # Use a channel which buffers events inmemory  

  30. a1.channels.c1.type = memory  

  31. a1.channels.c1.capacity = 1000  

  32. a1.channels.c1.transactionCapacity = 100  


這裏要說明的是,所以測試的是負載均衡的例子,所以這邊使用一個channel來做爲數據傳輸通道。這裏sinks的對應的接收數據的代理配置,咱們沿用故障轉移的接收代理配置。

#敲命令

首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。

flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1

-Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1

-Dflume.root.logger=INFO,console

在啓動源發送的代理

flume-ng agent -cconf -f conf/load_sink_case14.conf -n a1

-Dflume.root.logger=INFO,console

 

啓動成功後

 

打開另外一個終端輸入,往偵聽端口送數據

echo "loadbanlancetest1" | nc 192.168.233.128 50000

echo "loadbantest2" | nc 192.168.233.128 50000

echo "loadban test3"| nc 192.168.233.128 50000

echo "loadbantest4" | nc 192.168.233.128 50000

echo "loadbantest5" | nc 192.168.233.128 50000

#在啓動源發送的代理終端查看console輸出

其中K1收到3條數據



其中K1收到2條數據



由於咱們負載均衡選擇的類型是輪詢,所以能夠看出flume 讓代理每次向一個sink發送2次事件數據後就換另外一個sinks 發送。

Sink Processors測試完畢

相關文章
相關標籤/搜索