Flume NG 學習筆記(六)Selector(複用與複製)測試

目錄(?)[+]apache

學習心得(三)流配置中介紹多路複用流的時候,有說到Flume支持從一個源發送事件到多個通道中,這被稱爲事件流的複用。這裏須要在配置中定義事件流的複製/複用,選擇1個或者多個通道進行數據流向。app

而關於selector配置前面也講過:curl

<Agent>.sources.<Source1>.selector.type= replicatingtcp

這個源的選擇類型爲複製。這個參數不指定一個選擇的時候,默認狀況下它複製ide

複用則是麻煩一下,流的事情是被篩選的發生到不一樣的渠道,須要指定源和扇出通道的規則,感受與case when 相似。學習

複用的參數爲:測試

<Agent>.sources.<Source1>.selector.type= multiplexingthis


1、下面給出複製的測試例子:


這裏須要配置1個代理做爲源發送與2個代理做爲接受複製事件,共3個flume配置url

首先是做爲源發送的代理配置


[html] view plain copy

  1. #配置文件:replicate_source_case11.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1 k2  

  5. a1.channels = c1 c2  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type = syslogtcp  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.selector.type = replicating  

  12. a1.sources.r1.channels = c1 c2  

  13.    

  14. # Describe the sink  

  15. a1.sinks.k1.type = avro  

  16. a1.sinks.k1.channel = c1  

  17. a1.sinks.k1.hostname = 192.168.233.129  

  18. a1.sinks.k1.port = 50000  

  19.    

  20. a1.sinks.k2.type = avro  

  21. a1.sinks.k2.channel = c2  

  22. a1.sinks.k2.hostname = 192.168.233.130  

  23. a1.sinks.k2.port = 50000  

  24. # Use a channel which buffers events inmemory  

  25. a1.channels.c1.type = memory  

  26. a1.channels.c1.capacity = 1000  

  27. a1.channels.c1.transactionCapacity = 100  

  28.    

  29. a1.channels.c2.type = memory  

  30. a1.channels.c2.capacity = 1000  

  31. a1.channels.c2.transactionCapacity = 100  




這裏設置了2個channels與2個sinks,那麼咱們也要設置2個sinks對應的代理配置:

下面是第一個接受複製事件代理配置


[html] view plain copy

  1. #配置文件:replicate_sink1_case11.conf  

  2. # Name the components on this agent  

  3. a2.sources = r1  

  4. a2.sinks = k1  

  5. a2.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a2.sources.r1.type = avro  

  9. a2.sources.r1.channels = c1  

  10. a2.sources.r1.bind = 192.168.233.129  

  11. a2.sources.r1.port = 50000  

  12.    

  13. # Describe the sink  

  14. a2.sinks.k1.type = logger  

  15. a2.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a2.channels.c1.type = memory  

  19. a2.channels.c1.capacity = 1000  

  20. a2.channels.c1.transactionCapacity = 100  




 

下面是第二個接受複製事件代理配置:


[html] view plain copy

  1. #配置文件:replicate_sink2_case11.conf  

  2. # Name the components on this agent  

  3. a3.sources = r1  

  4. a3.sinks = k1  

  5. a3.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a3.sources.r1.type = avro  

  9. a3.sources.r1.channels = c1  

  10. a3.sources.r1.bind = 192.168.233.130  

  11. a3.sources.r1.port = 50000  

  12.    

  13. # Describe the sink  

  14. a3.sinks.k1.type = logger  

  15. a3.sinks.k1.channel = c1  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a3.channels.c1.type = memory  

  19. a3.channels.c1.capacity = 1000  

  20. a3.channels.c1.transactionCapacity = 100  



#敲命令

首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。

flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console

在啓動源發送的代理

flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console

 

啓動成功後

 

打開另外一個終端輸入,往偵聽端口送數據

echo "hello looklook5"| nc 192.168.233.128 50000

#在啓動源發送的代理終端查看console輸出



能夠看到他的正常啓動以及發送數據成功

#在啓動源第一個接事件的代理終端查看console輸出



能夠看到他的正常啓動,以及接受到源代理髮送的數據

#在啓動源第二個接事件的代理終端查看console輸出



一樣能夠能夠看到他的正常啓動,以及接受到源代理髮送的數據

Ok,成功

 


2、下面給出複用的測試例子:


由於複用的流的事件要聲明一個頭部,而後咱們檢查頭部對應的值,由於咱們這邊源類用http source

下面是源代理的配置


[html] view plain copy

  1. #配置文件:multi_source_case12.conf  

  2. a1.sourcesr1  

  3. a1.sinksk1 k2  

  4. a1.channelsc1 c2  

  5.    

  6. #Describe/configure the source  

  7. a1.sources.r1.typeorg.apache.flume.source.http.HTTPSource  

  8. a1.sources.r1.port50000  

  9. a1.sources.r1.host192.168.233.128  

  10. a1.sources.r1.selector.typemultiplexing  

  11. a1.sources.r1.channelsc1 c2  

  12.    

  13. a1.sources.r1.selector.headerstate  

  14. a1.sources.r1.selector.mapping.CZc1  

  15. a1.sources.r1.selector.mapping.USc2  

  16. a1.sources.r1.selector.defaultc1  

  17.    

  18. #Describe the sink  

  19. a1.sinks.k1.typeavro  

  20. a1.sinks.k1.channelc1  

  21. a1.sinks.k1.hostname192.168.233.129  

  22. a1.sinks.k1.port50000  

  23.    

  24. a1.sinks.k2.typeavro  

  25. a1.sinks.k2.channelc2  

  26. a1.sinks.k2.hostname192.168.233.130  

  27. a1.sinks.k2.port50000  

  28. # Usea channel which buffers events in memory  

  29. a1.channels.c1.typememory  

  30. a1.channels.c1.capacity1000  

  31. a1.channels.c1.transactionCapacity100  

  32.    

  33. a1.channels.c2.typememory  

  34. a1.channels.c2.capacity1000  

  35. a1.channels.c2.transactionCapacity100  



這裏設置了2個channels與2個sinks 同時判斷頭部屬性,當CZ的時,事件發送到sinks1,US時發送到sink2,其餘的都發送到sink2,所以咱們還有配置2個sinks對於的代理。這裏的2個接受代理咱們沿用以前複製的接受代理。

 

#敲命令

與以前複製的狀況同樣,首先先啓動2個接受複製事件代理,若是先啓動源發送的代理,會報他找不到sinks的綁定,由於2個接事件的代理還未起來。

flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console

在啓動源發送的代理

flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000

curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000

curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000

#在啓動源發送的代理終端查看console輸出



能夠看到他的正常啓動以及發送數據成功

#在啓動源第一個接事件的代理終端查看console輸出



這裏能夠清楚的看到,這個接事件代理只收到了2個事件,由於第二個事件由於咱們設置複用,將頭部信息對於的事件分流的關係,發送到另外一個接事件代理去了。

#在啓動源第二個接事件的代理終端查看console輸出


Ok,第二個接事件代理由於複用分流,果真只得到了第二個事件信息。

相關文章
相關標籤/搜索