文章內容仍是來自官網http://flume.apache.org/FlumeDeveloperGuide.htmlhtml
因爲在實際工做中,數據的生產方式極具多樣性,Flume 雖然包含了一些內置的機制來採集數據,可是更多的時候用戶更但願能將應用程序和flume直接相通。因此這邊運行用戶開發應用程序,經過IPC或者RPC鏈接flume並往flume發送數據。java
Flume的RpcClient實現了Flume的RPC機制。用戶的應用程序能夠很簡單的調用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發送數據,不用擔憂底層信息交換的細節。用戶能夠提供所需的event經過直接實現Event接口,例如可使用簡單的方便的實現SimpleEvent類或者使用EventBuilder的writeBody()靜態輔助方法。apache
自Flume 1.4.0起,Avro是默認的RPC協議。NettyAvroRpcClient和ThriftRpcClient實現了RpcClient接口。實現中咱們須要知道咱們將要鏈接的目標flume agent的host和port用於建立client實例,而後使用RpcClient發送數據到flume agent。api
官網給了一個Avro RPCclients的例子,這邊直接拿來作實際測試例子。服務器
這裏咱們把client.init("host.example.org",41414);app
改爲 client.init("192.168.233.128",50000); 與咱們的主機對接負載均衡
[java] view plain copydom
import org.apache.flume.Event; eclipse
import org.apache.flume.EventDeliveryException; ide
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static voidmain(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initializeclient with the remote Flume agent's host and port
//client.init("host.example.org",41414);
client.init("192.168.233.128",50000);
// Send 10events to the remote Flume agent. That agent should be
// configured tolisten with an AvroSource.
String sampleData = "Hello Flume!";
for (int i =0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPCconnection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create aFlume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send theevent
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up andrecreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPCconnection
client.close();
}
}
這邊代碼不解釋了,主要是將HelloFlume 發送10遍給flume,同時記得將flume 安裝主目錄下的lib 文件都添加進項目,才能正常運行程序。
下面是代理配置:
[html] view plain copy
#配置文件:avro_client_case20.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這裏要注意下,以前說了,在接收端須要AvroSource或者Thrift Source來監聽接口。因此配置代理的時候要把a1.sources.r1.type 寫成avro或者thrift
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
在eclipse 裏運行Java程序,固然也能夠打包後在服務器上運行JAVA程序。
#在啓動源發送的代理終端查看console輸出
能夠看到10條數據正常發送。
這裏要說明下,開發代碼中client.append(event)不只僅能夠發送一條數據,也能夠發送一個List(string) 的數據信息,也就是批量發送。這邊就不作演示了。
這個類包封裝了Avro RPCclient的類默認提供故障處理能力。hosts採用空格分開host:port所表明的flume agent,構成一個故障處理組。這Failover RPC Client目前不支持thrift。若是當前選擇的host agent有問題,這個failover client會自動負載到組中下一個host中。
下面是官網開發例子:
[java] view plain copy
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測試的開發例子
[java] view plain copy
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
import java.util.Properties;
public class Failover_Client {
public static void main(String[] args) {
MyRpcClientFacade2 client = new MyRpcClientFacade2();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade2 {
private RpcClient client;
private String hostname;
private int port;
public void init() {
// Setup the RPC connection
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
// create the client with failover properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這邊代碼設三個host用於故障轉移,這裏偷懶,用同一個主機的3個端口模擬。代碼仍是將Hello Flume 發送10遍給第一個flume代理,當第一個代理故障的時候,則發送給第二個代理,以順序進行故障轉移。
下面是代理配置沿用以前的那個,並對配置文件進行拷貝,
cp avro_client_case20.conf avro_client_case21.conf
cp avro_client_case20.conf avro_client_case22.conf
分別修改avro_client_case21.conf與avro_client_case22.conf中的
a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
在eclipse 裏運行JAVA程序Failover_Client.java,固然也能夠打包後在服務器上運行JAVA程序。
#在啓動源發送的3個代理終端查看console輸出
咱們能夠看到第一個代理終端收到了,數據而其餘2個終端沒有數據。
而後咱們把第一個終端的進程關掉,再運行一遍client程序,而後會發現這個時候是發生到第二個終端中。當第二個終端也關閉的時候,再發送數據,則是發送到最後一個終端。這裏咱們能夠看到,故障轉移的代理主機轉移是採用順序序列的。
Flume Client SDK也支持在多個host之間使用負載均衡的Rpc Client。這種類型的client帶有一個經過空格分隔的host:port主機列表並構成了一個負載均衡組。這個client能夠指定一個負載均衡的策略,既能夠隨機的選擇一個配置的host,也能夠循環選擇一個host。固然你也能夠本身編寫一個類實現LoadBalancingRpcClient$HostSelector接口以致於用戶可使用本身編寫的選擇順序。在這種狀況下,用戶自定義的類須要被指定爲host-selector屬性的值。LoadBalancing RPC Client當前不支持thrift。
若是開啓了backoff,那麼client失敗將被放入黑名單中,只有過了被指定的超時之間以後這個被選擇的失敗的主機纔會從黑名單中被排除。當超時到了,若是主機仍是沒有反應,那麼這被認爲是一個連續的失敗而且超時時間會成倍的增加,以免可能陷入對反應遲鈍主機的長時間等待中。
這backoff的最大超時時間能夠經過maxBackoff屬性來配置,單位是毫秒。在默認狀況下maxBackoff的值是30秒(在orderSelector類裏面指定)。
下面是官網例子
[java] view plain copy
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測試的開發例子
[java] view plain copy
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.util.Properties;
public class Load_Client {
public static void main(String[] args) {
MyRpcClientFacade3 client = new MyRpcClientFacade3();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Flume Load_Client";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade3{
private RpcClient client;
private String hostname;
private int port;
public void init() {
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這裏採用隨機的負載均衡props.put("host-selector","random") 。測試的時候沿用以前的3個接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,並將他們起起來。
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啓動成功後
在eclipse 裏運行JAVA程序Failover_Client.java,固然也能夠打包後在服務器上運行JAVA程序。
#在啓動源發送的3個代理終端查看console輸出
下面是Host1,收到了2條數據
下面是Host2,收到了2條數據
下面是Host3,收到了6條數據。
能夠看到咱們開發例子中,host-selector選擇的是隨機,所以程序也是隨機發送數據。下面咱們測試輪詢round_robin選項。
程序裏咱們修改這句
//props.put("host-selector","random"); // For random host selection
props.put("host-selector", "round_robin");// Forround-robin host
再運行Java 程序
下面是Host1,收到了4條數據
下面是Host2,收到了3條數據
一樣Host3,收到了3條數據,這邊就不放圖了。輪詢就是按照順序放圖。