Flume NG 學習筆記(十) Transaction、Sink、Source和Channel開發

目錄(?)[+]java

1、Transaction interface

Transaction接口是基於flume的穩定性考慮的。全部主要的組件(sources、sinks、channels)都必須使用Flume Transaction。咱們也能夠理解Transaction接口就是flume的事務,sources和sinks的發送數據與接受數據都是在一個Transaction裏完成的。apache


從上圖中能夠看出,一個Transaction在Channel實現內實現。每個鏈接到channel的source和sink都要獲取一個Transaction對象。這Sources實際上使用了一個ChannelSelector接口來封裝Transaction。存放事件到channel和從channel中提取事件的操做是在一個活躍的Transaction內執行的。緩存

下面是官網例子框架


[java] view plain copytcp

  1. Channel ch = new MemoryChannel();  ide

  2. Transaction txn = ch.getTransaction();  測試

  3. txn.begin();  ui

  4. try {  this

  5.   // This try clause includes whatever Channel operations you want to do  

  6.   

  7.   Event eventToStage = EventBuilder.withBody("Hello Flume!",  

  8.                        Charset.forName("UTF-8"));  

  9.   ch.put(eventToStage);  

  10.   // Event takenEvent = ch.take();  

  11.   // ...  

  12.   txn.commit();  

  13. catch (Throwable t) {  

  14.   txn.rollback();  

  15.   

  16.   // Log exception, handle individual exceptions as needed  

  17.   

  18.   // re-throw all Errors  

  19.   if (t instanceof Error) {  

  20.     throw (Error)t;  

  21.   }  

  22. finally {  

  23.   txn.close();  

  24. }  


上面的代碼是一個很簡單的Transaction示例,在自定義Source與自定義Sink中都要使用。

2、自定義Sink開發

Sink提取event數據從channel中,而後直接將數據發送到下一個flume agent中或者存儲到外部庫中。

Sink和channel的關聯關係能夠在配置文件中配置。有一個SinkRunner實例與每個已配置的Sink關聯,當Flume框架調用SinkRunner.start()方法時候,將建立一個新的線程來驅動這Sink。

這個線程將管理這個Sink的生命週期。Sink須要實現LifecycleAware接口的start()和stop()方法。start()方法用於初始化數據;stop()用於釋放資源;process()是從channel中提取event數據和轉發數據的核心方法。

這Sink須要實現Configurable接口以便操做配置文件。

下面是官網例子:

[java] view plain copy

  1. public class MySink extends AbstractSink implements Configurable {  

  2.   private String myProp;  

  3.   

  4.   @Override  

  5.   public void configure(Context context) {  

  6.     String myProp = context.getString("myProp""defaultValue");  

  7.   

  8.     // Process the myProp value (e.g. validation)  

  9.   

  10.     // Store myProp for later retrieval by process() method  

  11.     this.myProp = myProp;  

  12.   }  

  13.   

  14.   @Override  

  15.   public void start() {  

  16.     // Initialize the connection to the external repository (e.g. HDFS) that  

  17.     // this Sink will forward Events to ..  

  18.   }  

  19.   

  20.   @Override  

  21.   public void stop () {  

  22.     // Disconnect from the external respository and do any  

  23.     // additional cleanup (e.g. releasing resources or nulling-out  

  24.     // field values) ..  

  25.   }  

  26.   

  27.   @Override  

  28.   public Status process() throws EventDeliveryException {  

  29.     Status status = null;  

  30.   

  31.     // Start transaction  

  32.     Channel ch = getChannel();  

  33.     Transaction txn = ch.getTransaction();  

  34.     txn.begin();  

  35.     try {  

  36.       // This try clause includes whatever Channel operations you want to do  

  37.   

  38.       Event event = ch.take();  

  39.   

  40.       // Send the Event to the external repository.  

  41.       // storeSomeData(e);  

  42.   

  43.       txn.commit();  

  44.       status = Status.READY;  

  45.     } catch (Throwable t) {  

  46.       txn.rollback();  

  47.   

  48.       // Log exception, handle individual exceptions as needed  

  49.   

  50.       status = Status.BACKOFF;  

  51.   

  52.       // re-throw all Errors  

  53.       if (t instanceof Error) {  

  54.         throw (Error)t;  

  55.       }  

  56.     } finally {  

  57.       txn.close();  

  58.     }  

  59.     return status;  

  60.   }  

  61. }  

下面是測試例子:

[java] view plain copy

  1. import org.apache.flume.Channel;  

  2. import org.apache.flume.Context;  

  3. import org.apache.flume.Event;  

  4. import org.apache.flume.EventDeliveryException;  

  5. import org.apache.flume.Transaction;  

  6. import org.apache.flume.conf.Configurable;  

  7.   

  8. import org.apache.flume.sink.AbstractSink;  

  9.   

  10.   

  11. public class Custom_Sink extends AbstractSink implements Configurable {  

  12.       private String myProp;  

  13.      @Override  

  14.       public void configure(Context context) {  

  15.         String myProp = context.getString("myProp""defaultValue");  

  16.   

  17.         // Process the myProp value (e.g. validation)  

  18.   

  19.         // Store myProp for later retrieval by process() method  

  20.         this.myProp = myProp;  

  21.       }  

  22.   

  23.       @Override  

  24.       public void start() {  

  25.         // Initialize the connection to the external repository (e.g. HDFS) that  

  26.         // this Sink will forward Events to ..  

  27.       }  

  28.   

  29.       @Override  

  30.       public void stop () {  

  31.         // Disconnect from the external respository and do any  

  32.         // additional cleanup (e.g. releasing resources or nulling-out  

  33.         // field values) ..  

  34.       }  

  35.   

  36.       @Override  

  37.       public Status process() throws EventDeliveryException {  

  38.         Status status = null;  

  39.   

  40.         // Start transaction  

  41.         Channel ch = getChannel();  

  42.         Transaction txn = ch.getTransaction();  

  43.         txn.begin();  

  44.         try {  

  45.           // This try clause includes whatever Channel operations you want to do  

  46.             

  47.           Event event = ch.take();  

  48.           String out = new String(event.getBody());   

  49.           // Send the Event to the external repository.  

  50.           // storeSomeData(e);  

  51.           System.out.println(out);  

  52.             

  53.           txn.commit();  

  54.           status = Status.READY;  

  55.         } catch (Throwable t) {  

  56.           txn.rollback();  

  57.   

  58.           // Log exception, handle individual exceptions as needed  

  59.   

  60.           status = Status.BACKOFF;  

  61.   

  62.           // re-throw all Errors  

  63.           if (t instanceof Error) {  

  64.             throw (Error)t;  

  65.           }  

  66.         } finally {  

  67.           txn.close();  

  68.         }  

  69.         return status;  

  70.       }  

  71.   

  72. }  

上面的測試例子只輸出事件的BODY信息,這裏說明下直接用代碼event.getBody().tostring() 輸出是亂碼。由於全部sink都是在Transaction裏完成的,所以自定義開發sink是須要加上Transaction相關設置。

 

而後是測試配置,這裏是自定義的jar 包是flumedev.Custom_Sink。注意,打包以後請放在目錄$FLUME_HOME/lib下

[html] view plain copy

  1. #配置文件:custom_sink_case23.conf  

  2. # Name the components on this agent  

  3. a1.sources = r1  

  4. a1.sinks = k1  

  5. a1.channels = c1  

  6.   

  7. # Describe/configure the source  

  8. a1.sources.r1.type = syslogtcp  

  9. a1.sources.r1.port = 50000  

  10. a1.sources.r1.bind = 192.168.233.128  

  11. a1.sources.r1.channels = c1  

  12.   

  13. # Describe the sink  

  14. a1.sinks.k1.channel = c1  

  15. a1.sinks.k1.type = flumedev.Custom_Sink  

  16. #a1.sinks.k1.type =logger  

  17.   

  18. # Use a channel which buffers events in memory  

  19. a1.channels.c1.type = memory  

  20. a1.channels.c1.capacity = 1000  

  21. a1.channels.c1.transactionCapacity = 100  

#敲命令

flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console

啓動成功後

打開另外一個終端輸入,往偵聽端口送數據

echo "testcustom_sink" | nc 192.168.233.128 50000

#在啓動的終端查看console輸出


能夠看到數據正常輸出。


3、自定義Source開發

Source從外面接收數據並把數據存入Channel中。不多有人用。

下面是官網的例子

[java] view plain copy

  1. public class MySource extends AbstractSource implements Configurable, PollableSource {  

  2.   private String myProp;  

  3.   

  4.   @Override  

  5.   public void configure(Context context) {  

  6.     String myProp = context.getString("myProp""defaultValue");  

  7.   

  8.     // Process the myProp value (e.g. validation, convert to another type, ...)  

  9.   

  10.     // Store myProp for later retrieval by process() method  

  11.     this.myProp = myProp;  

  12.   }  

  13.   

  14.   @Override  

  15.   public void start() {  

  16.     // Initialize the connection to the external client  

  17.   }  

  18.   

  19.   @Override  

  20.   public void stop () {  

  21.     // Disconnect from external client and do any additional cleanup  

  22.     // (e.g. releasing resources or nulling-out field values) ..  

  23.   }  

  24.   

  25.   @Override  

  26.   public Status process() throws EventDeliveryException {  

  27.     Status status = null;  

  28.   

  29.     // Start transaction  

  30.     Channel ch = getChannel();  

  31.     Transaction txn = ch.getTransaction();  

  32.     txn.begin();  

  33.     try {  

  34.       // This try clause includes whatever Channel operations you want to do  

  35.   

  36.       // Receive new data  

  37.       Event e = getSomeData();  

  38.   

  39.       // Store the Event into this Source's associated Channel(s)  

  40.       getChannelProcessor().processEvent(e)  

  41.   

  42.       txn.commit();  

  43.       status = Status.READY;  

  44.     } catch (Throwable t) {  

  45.       txn.rollback();  

  46.   

  47.       // Log exception, handle individual exceptions as needed  

  48.   

  49.       status = Status.BACKOFF;  

  50.   

  51.       // re-throw all Errors  

  52.       if (t instanceof Error) {  

  53.         throw (Error)t;  

  54.       }  

  55.     } finally {  

  56.       txn.close();  

  57.     }  

  58.     return status;  

  59.   }  

  60. }  


測試的話,主要針對Event e 這裏進行傳輸數據,這裏就不測試了。

 

4、自定義Channel開發

官網說待定。

下面是美團網的自定義Channel 開發,下面是連接

http://tech.meituan.com/mt-log-system-optimization.html

……

Flume自己提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則恰好相反。咱們但願利用二者的優點,在Sink處理速度夠快,Channel沒有緩存過多日誌的時候,就使用MemoryChannel,當Sink處理速度跟不上,又須要Channel可以緩存下應用端發送過來的日誌時,就使用FileChannel,由此咱們開發了DualChannel,可以智能的在兩個Channel之間切換。

其具體的邏輯以下:

[java] view plain copy

  1. /*** 

  2.  * putToMemChannel indicate put event to memChannel or fileChannel 

  3.  * takeFromMemChannel indicate take event from memChannel or fileChannel 

  4.  * */  

  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);  

  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);  

  7.   

  8. void doPut(Event event) {  

  9.         if (switchon && putToMemChannel.get()) {  

  10.               //往memChannel中寫數據  

  11.               memTransaction.put(event);  

  12.   

  13.               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {  

  14.                 putToMemChannel.set(false);  

  15.               }  

  16.         } else {  

  17.               //往fileChannel中寫數據  

  18.               fileTransaction.put(event);  

  19.         }  

  20.   }  

  21.   

  22. Event doTake() {  

  23.     Event event = null;  

  24.     if ( takeFromMemChannel.get() ) {  

  25.         //從memChannel中取數據  

  26.         event = memTransaction.take();  

  27.         if (event == null) {  

  28.             takeFromMemChannel.set(false);  

  29.         }   

  30.     } else {  

  31.         //從fileChannel中取數據  

  32.         event = fileTransaction.take();  

  33.         if (event == null) {  

  34.             takeFromMemChannel.set(true);  

  35.   

  36.             putToMemChannel.set(true);  

  37.         }   

  38.     }  

  39.     return event;  

  40. }  

這裏要說明下,官網是建議使用file channel,雖然它的效率比較低,可是它能保證數據完整性,而memory channel效率高,可是隻能對數據丟失和重複不太敏感的業務使用

相關文章
相關標籤/搜索