lume NG 學習筆記(九)Flune Client 開發

文章內容仍是來自官網http://flume.apache.org/FlumeDeveloperGuide.htmlhtml

因爲在實際工做中,數據的生產方式極具多樣性,Flume 雖然包含了一些內置的機制來採集數據,可是更多的時候用戶更但願能將應用程序和flume直接相通。因此這邊運行用戶開發應用程序,經過IPC或者RPC鏈接flume並往flume發送數據。java

1、RPC client interface

Flume的RpcClient實現了Flume的RPC機制。用戶的應用程序能夠很簡單的調用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發送數據,不用擔憂底層信息交換的細節。用戶能夠提供所需的event經過直接實現Event接口,例如可使用簡單的方便的實現SimpleEvent類或者使用EventBuilder的writeBody()靜態輔助方法。apache

自Flume 1.4.0起,Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient實現了RpcClient接口。實現中咱們須要知道咱們將要鏈接的目標flume agent的host和port用於建立client實例,而後使用RpcClient發送數據到flume agent。api

官網給了一個Avro RPCclients的例子,這邊直接拿來作實際測試例子。服務器

這裏咱們把client.init("host.example.org",41414);app

改爲 client.init("192.168.233.128",50000);  與咱們的主機對接負載均衡


[java] view plain copydom

  1. import org.apache.flume.Event;  eclipse

  2. import org.apache.flume.EventDeliveryException;  ide

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6. import java.nio.charset.Charset;  

  7.    

  8. public class MyApp {  

  9.   public static voidmain(String[] args) {  

  10.    MyRpcClientFacade client = new MyRpcClientFacade();  

  11.    // Initializeclient with the remote Flume agent's host and port  

  12. //client.init("host.example.org",41414);  

  13. client.init("192.168.233.128",50000);  

  14.    

  15.    // Send 10events to the remote Flume agent. That agent should be  

  16.    // configured tolisten with an AvroSource.  

  17.    String sampleData = "Hello Flume!";  

  18.    for (int i =0; i < 10; i++) {  

  19.      client.sendDataToFlume(sampleData);  

  20.    }  

  21.    

  22.    client.cleanUp();  

  23.   }  

  24. }  

  25.    

  26. class MyRpcClientFacade {  

  27.   private RpcClient client;  

  28.   private String hostname;  

  29.   private int port;  

  30.    

  31.   public void init(String hostname, int port) {  

  32.    // Setup the RPCconnection  

  33.    this.hostname = hostname;  

  34.    this.port = port;  

  35.    this.client = RpcClientFactory.getDefaultInstance(hostname, port);  

  36.    // Use thefollowing method to create a thrift client (instead of the above line):  

  37.     // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  38.   }  

  39.    

  40.   public void sendDataToFlume(String data) {  

  41.    // Create aFlume Event object that encapsulates the sample data  

  42.    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  43.    

  44.    // Send theevent  

  45.    try {  

  46.      client.append(event);  

  47.    } catch (EventDeliveryException e) {  

  48.      // clean up andrecreate the client  

  49.      client.close();  

  50.      client = null;  

  51.      client = RpcClientFactory.getDefaultInstance(hostname, port);  

  52.      // Use thefollowing method to create a thrift client (instead of the above line):  

  53.      // this.client =RpcClientFactory.getThriftInstance(hostname, port);  

  54.    }  

  55.   }  

  56.    

  57.   public void cleanUp() {  

  58.    // Close the RPCconnection  

  59.    client.close();  

  60.   }  

  61.    

  62. }  

這邊代碼不解釋了,主要是將HelloFlume 發送10遍給flume,同時記得將flume 安裝主目錄下的lib 文件都添加進項目,才能正常運行程序。


 

下面是代理配置:


[html] view plain copy

  1. #配置文件:avro_client_case20.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.    

  7. # Describe/configure the source  

  8. a1.sources.r1.type = avro  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.host = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.    

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = logger  

  16.    

  17. # Use a channel which buffers events inmemory  

  18. a1.channels.c1.type = memory  

  19. a1.channels.c1.capacity = 1000  

  20. a1.channels.c1.transactionCapacity = 100  




這裏要注意下,以前說了,在接收端須要AvroSource或者Thrift Source來監聽接口。因此配置代理的時候要把a1.sources.r1.type 寫成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

啓動成功後

 

在eclipse 裏運行Java程序,固然也能夠打包後在服務器上運行JAVA程序。

 

#在啓動源發送的代理終端查看console輸出



能夠看到10條數據正常發送。

這裏要說明下,開發代碼中client.append(event)不只僅能夠發送一條數據,也能夠發送一個List(string) 的數據信息,也就是批量發送。這邊就不作演示了。

2、Failover Client

這個類包封裝了Avro RPCclient的類默認提供故障處理能力。hosts採用空格分開host:port所表明的flume agent,構成一個故障處理組。這Failover RPC Client目前不支持thrift。若是當前選擇的host agent有問題,這個failover client會自動負載到組中下一個host中。

下面是官網開發例子:

[java] view plain copy

  1. // Setup properties for the failover  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_failover");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h1 h2 h3");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h1", host1);  

  13. props.put("hosts.h2", host2);  

  14. props.put("hosts.h3", host3);  

  15.   

  16. // create the client with failover properties  

  17. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測試的開發例子

[java] view plain copy

  1. import org.apache.flume.Event;  

  2. import org.apache.flume.EventDeliveryException;  

  3. import org.apache.flume.api.RpcClient;  

  4. import org.apache.flume.api.RpcClientFactory;  

  5. import org.apache.flume.event.EventBuilder;  

  6.   

  7. import java.nio.charset.Charset;  

  8. import java.util.Properties;  

  9.   

  10. public class Failover_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade2 client = new MyRpcClientFacade2();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Hello Flume!";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade2 {  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.         // Setup the RPC connection  

  34.         // Use the following method to create a thrift client (instead of the above line):  

  35.         // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  36.      // Setup properties for the failover  

  37.         Properties props = new Properties();  

  38.         props.put("client.type""default_failover");  

  39.   

  40.         // List of hosts (space-separated list of user-chosen host aliases)  

  41.         props.put("hosts""h1 h2 h3");  

  42.   

  43.         // host/port pair for each host alias  

  44.         String host1 = "192.168.233.128:50000";  

  45.         String host2 = "192.168.233.128:50001";  

  46.         String host3 = "192.168.233.128:50002";  

  47.         props.put("hosts.h1", host1);  

  48.         props.put("hosts.h2", host2);  

  49.         props.put("hosts.h3", host3);  

  50.   

  51.         // create the client with failover properties  

  52.         client = RpcClientFactory.getInstance(props);  

  53.       }  

  54.   

  55.       public void sendDataToFlume(String data) {  

  56.         // Create a Flume Event object that encapsulates the sample data  

  57.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  58.   

  59.         // Send the event  

  60.         try {  

  61.           client.append(event);  

  62.         } catch (EventDeliveryException e) {  

  63.           // clean up and recreate the client  

  64.           client.close();  

  65.           client = null;  

  66.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  67.           // Use the following method to create a thrift client (instead of the above line):  

  68.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  69.         }  

  70.       }  

  71.   

  72.       public void cleanUp() {  

  73.         // Close the RPC connection  

  74.         client.close();  

  75.       }  

  76. }  

這邊代碼設三個host用於故障轉移,這裏偷懶,用同一個主機的3個端口模擬。代碼仍是將Hello Flume 發送10遍給第一個flume代理,當第一個代理故障的時候,則發送給第二個代理,以順序進行故障轉移。

下面是代理配置沿用以前的那個,並對配置文件進行拷貝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分別修改avro_client_case21.conf與avro_client_case22.conf中的

a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

 

啓動成功後

 

在eclipse 裏運行JAVA程序Failover_Client.java,固然也能夠打包後在服務器上運行JAVA程序。

#在啓動源發送的3個代理終端查看console輸出

咱們能夠看到第一個代理終端收到了,數據而其餘2個終端沒有數據。



而後咱們把第一個終端的進程關掉,再運行一遍client程序,而後會發現這個時候是發生到第二個終端中。當第二個終端也關閉的時候,再發送數據,則是發送到最後一個終端。這裏咱們能夠看到,故障轉移的代理主機轉移是採用順序序列的。

 

3、LoadBalancing RPC client

Flume Client SDK也支持在多個host之間使用負載均衡的Rpc Client。這種類型的client帶有一個經過空格分隔的host:port主機列表並構成了一個負載均衡組。這個client能夠指定一個負載均衡的策略,既能夠隨機的選擇一個配置的host,也能夠循環選擇一個host。固然你也能夠本身編寫一個類實現LoadBalancingRpcClient$HostSelector接口以致於用戶可使用本身編寫的選擇順序。在這種狀況下,用戶自定義的類須要被指定爲host-selector屬性的值。LoadBalancing RPC Client當前不支持thrift。

若是開啓了backoff,那麼client失敗將被放入黑名單中,只有過了被指定的超時之間以後這個被選擇的失敗的主機纔會從黑名單中被排除。當超時到了,若是主機仍是沒有反應,那麼這被認爲是一個連續的失敗而且超時時間會成倍的增加,以免可能陷入對反應遲鈍主機的長時間等待中。

這backoff的最大超時時間能夠經過maxBackoff屬性來配置,單位是毫秒。在默認狀況下maxBackoff的值是30秒(在orderSelector類裏面指定)。

下面是官網例子

[java] view plain copy

  1. // Setup properties for the load balancing  

  2. Properties props = new Properties();  

  3. props.put("client.type""default_loadbalance");  

  4.   

  5. // List of hosts (space-separated list of user-chosen host aliases)  

  6. props.put("hosts""h1 h2 h3");  

  7.   

  8. // host/port pair for each host alias  

  9. String host1 = "host1.example.org:41414";  

  10. String host2 = "host2.example.org:41414";  

  11. String host3 = "host3.example.org:41414";  

  12. props.put("hosts.h1", host1);  

  13. props.put("hosts.h2", host2);  

  14. props.put("hosts.h3", host3);  

  15.   

  16. props.put("host-selector""random"); // For random host selection  

  17. // props.put("host-selector", "round_robin"); // For round-robin host  

  18. //                                            // selection  

  19. props.put("backoff""true"); // Disabled by default.  

  20.   

  21. props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  22.                                   // becomes 30000 ms  

  23.   

  24. // Create the client with load balancing properties  

  25. RpcClient client = RpcClientFactory.getInstance(props);  

下面是測試的開發例子

[java] view plain copy

  1. import java.nio.charset.Charset;  

  2.   

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.api.RpcClient;  

  6. import org.apache.flume.api.RpcClientFactory;  

  7. import org.apache.flume.event.EventBuilder;  

  8. import java.util.Properties;  

  9.   

  10. public class Load_Client {  

  11.     public static void main(String[] args) {  

  12.         MyRpcClientFacade3 client = new MyRpcClientFacade3();  

  13.         // Initialize client with the remote Flume agent's host and port  

  14.         client.init();  

  15.   

  16.         // Send 10 events to the remote Flume agent. That agent should be  

  17.         // configured to listen with an AvroSource.  

  18.         String sampleData = "Flume Load_Client";  

  19.         for (int i = 0; i < 10; i++) {  

  20.           client.sendDataToFlume(sampleData);  

  21.         }  

  22.   

  23.         client.cleanUp();  

  24.       }  

  25.     }  

  26.   

  27.     class MyRpcClientFacade3{  

  28.       private RpcClient client;  

  29.       private String hostname;  

  30.       private int port;  

  31.   

  32.       public void init() {  

  33.           Properties props = new Properties();  

  34.           props.put("client.type""default_loadbalance");  

  35.   

  36.           // List of hosts (space-separated list of user-chosen host aliases)  

  37.           props.put("hosts""h1 h2 h3");  

  38.   

  39.           // host/port pair for each host alias  

  40.           String host1 = "192.168.233.128:50000";  

  41.           String host2 = "192.168.233.128:50001";  

  42.           String host3 = "192.168.233.128:50002";  

  43.           props.put("hosts.h1", host1);  

  44.           props.put("hosts.h2", host2);  

  45.           props.put("hosts.h3", host3);  

  46.   

  47.           props.put("host-selector""random"); // For random host selection  

  48.           // props.put("host-selector", "round_robin"); // For round-robin host  

  49. //                                                    // selection  

  50.           props.put("backoff""true"); // Disabled by default.  

  51.   

  52.           props.put("maxBackoff""10000"); // Defaults 0, which effectively  

  53.                                             // becomes 30000 ms  

  54.   

  55.           // Create the client with load balancing properties  

  56.           client = RpcClientFactory.getInstance(props);  

  57.       }  

  58.   

  59.       public void sendDataToFlume(String data) {  

  60.         // Create a Flume Event object that encapsulates the sample data  

  61.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));  

  62.   

  63.         // Send the event  

  64.         try {  

  65.           client.append(event);  

  66.         } catch (EventDeliveryException e) {  

  67.           // clean up and recreate the client  

  68.           client.close();  

  69.           client = null;  

  70.           client = RpcClientFactory.getDefaultInstance(hostname, port);  

  71.           // Use the following method to create a thrift client (instead of the above line):  

  72.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);  

  73.         }  

  74.       }  

  75.   

  76.       public void cleanUp() {  

  77.         // Close the RPC connection  

  78.         client.close();  

  79.       }  

  80. }  

這裏採用隨機的負載均衡props.put("host-selector","random") 。測試的時候沿用以前的3個接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,並將他們起起來。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

 

啓動成功後

 

在eclipse 裏運行JAVA程序Failover_Client.java,固然也能夠打包後在服務器上運行JAVA程序。

#在啓動源發送的3個代理終端查看console輸出

下面是Host1,收到了2條數據


下面是Host2,收到了2條數據


下面是Host3,收到了6條數據。



能夠看到咱們開發例子中,host-selector選擇的是隨機,所以程序也是隨機發送數據。下面咱們測試輪詢round_robin選項。

程序裏咱們修改這句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再運行Java 程序

下面是Host1,收到了4條數據



下面是Host2,收到了3條數據



一樣Host3,收到了3條數據,這邊就不放圖了。輪詢就是按照順序放圖。

相關文章
相關標籤/搜索