版權聲明:本文爲博主原創文章,未經博主容許不得轉載。html
Transaction接口是基於flume的穩定性考慮的。全部主要的組件(sources、sinks、channels)都必須使用Flume Transaction。咱們也能夠理解Transaction接口就是flume的事務,sources和sinks的發送數據與接受數據都是在一個Transaction裏完成的。apache
從上圖中能夠看出,一個Transaction在Channel實現內實現。每個鏈接到channel的source和sink都要獲取一個Transaction對象。這Sources實際上使用了一個ChannelSelector接口來封裝Transaction。存放事件到channel和從channel中提取事件的操做是在一個活躍的Transaction內執行的。緩存
下面是官網例子框架
[java] view plain copytcp
Channel ch = new MemoryChannel(); ide
Transaction txn = ch.getTransaction(); 測試
txn.begin(); ui
try { this
// This try clause includes whatever Channel operations you want to do
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
上面的代碼是一個很簡單的Transaction示例,在自定義Source與自定義Sink中都要使用。
Sink提取event數據從channel中,而後直接將數據發送到下一個flume agent中或者存儲到外部庫中。
Sink和channel的關聯關係能夠在配置文件中配置。有一個SinkRunner實例與每個已配置的Sink關聯,當Flume框架調用SinkRunner.start()方法時候,將建立一個新的線程來驅動這Sink。
這個線程將管理這個Sink的生命週期。Sink須要實現LifecycleAware接口的start()和stop()方法。start()方法用於初始化數據;stop()用於釋放資源;process()是從channel中提取event數據和轉發數據的核心方法。
這Sink須要實現Configurable接口以便操做配置文件。
下面是官網例子:
[java] view plain copy
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
下面是測試例子:
[java] view plain copy
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class Custom_Sink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String out = new String(event.getBody());
// Send the Event to the external repository.
// storeSomeData(e);
System.out.println(out);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
上面的測試例子只輸出事件的BODY信息,這裏說明下直接用代碼event.getBody().tostring() 輸出是亂碼。由於全部sink都是在Transaction裏完成的,所以自定義開發sink是須要加上Transaction相關設置。
而後是測試配置,這裏是自定義的jar 包是flumedev.Custom_Sink。注意,打包以後請放在目錄$FLUME_HOME/lib下
[html] view plain copy
#配置文件:custom_sink_case23.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.bind = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = flumedev.Custom_Sink
#a1.sinks.k1.type =logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#敲命令
flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console
啓動成功後
打開另外一個終端輸入,往偵聽端口送數據
echo "testcustom_sink" | nc 192.168.233.128 50000
#在啓動的終端查看console輸出
能夠看到數據正常輸出。
Source從外面接收數據並把數據存入Channel中。不多有人用。
下面是官網的例子
[java] view plain copy
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e)
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
測試的話,主要針對Event e 這裏進行傳輸數據,這裏就不測試了。
官網說待定。
下面是美團網的自定義Channel 開發,下面是連接
http://tech.meituan.com/mt-log-system-optimization.html
……
Flume自己提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則恰好相反。咱們但願利用二者的優點,在Sink處理速度夠快,Channel沒有緩存過多日誌的時候,就使用MemoryChannel,當Sink處理速度跟不上,又須要Channel可以緩存下應用端發送過來的日誌時,就使用FileChannel,由此咱們開發了DualChannel,可以智能的在兩個Channel之間切換。
其具體的邏輯以下:
[java] view plain copy
/***
* putToMemChannel indicate put event to memChannel or fileChannel
* takeFromMemChannel indicate take event from memChannel or fileChannel
* */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
void doPut(Event event) {
if (switchon && putToMemChannel.get()) {
//往memChannel中寫數據
memTransaction.put(event);
if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中寫數據
fileTransaction.put(event);
}
}
Event doTake() {
Event event = null;
if ( takeFromMemChannel.get() ) {
//從memChannel中取數據
event = memTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//從fileChannel中取數據
event = fileTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
return event;
}
這裏要說明下,官網是建議使用file channel,雖然它的效率比較低,可是它能保證數據完整性,而memory channel效率高,可是隻能對數據丟失和重複不太敏感的業務使用