公司用的flume-ng採集數據,數據源是日誌文件,而後經過正則表達式來過濾指定日誌,最後發送給kafka。使用的是apache-flume-ng1.6.0版本。java
最初消耗以下:正則表達式
通過我改進後,消耗以下:apache
cpu消耗整整下降了幾十甚至百倍。app
其中改進點有兩個:一是自定義一個關鍵字篩選器;二是重寫KafkaSink.java文件爲另外一個類,添加無事件處理睡眠配置。spa
咱們的日誌格式大概是這樣子:日誌
XXXXXXXX`XXXXXXX`XXXXXX`…`op=0x38(OP_REPORT_ACTION)`seq=3424`xxx=xxxx`xxx=xxxx`xxx=xxxx……code
關鍵字篩選器fm.lizhi.flume.channel.OpChannelSelector,配置以下:事件
agent2.sources.Usersource2.selector.type = fm.lizhi.flume.channel.OpChannelSelector agent2.sources.Usersource2.selector.mapping.0x3a(OP_ADD_COLLECT_PROGRAM)=Userchannel2 agent2.sources.Usersource2.selector.mapping.0x38(OP_REPORT_ACTION) = Userchannel2 agent2.sources.Usersource2.selector.skip = 90
上面的skip意思是直接跳過90個字符不檢測(由於咱們打印的op=確定在90個字符之後,因此爲了節約資源設置)。ip
這個selector主要檢測op=後的值,匹配上mapping後的值,就發送給相應的channel,不然丟棄。從而達到篩選目的。資源
對KafkaSink的改進以下:
package fm.lizhi.flume.sink; /** * 近實時的KafkaSink,相比KafkaSink多了backoffSleepMillisecond屬性。<br> * 改造自1.6.0版本的KafkaSink * @author zhenghaofeng */ public class NRTKafkaSink extends AbstractSink implements Configurable { private int bakoffSleepMillisecond; private static final String BACKOFF_SLEEP_MILLISECOND = "backoffSleepMillisecond"; private static final int DEFUALT_BACKOFF_SLEEP_MILLISECOND = 0; public Status process() throws EventDeliveryException { boolean isNoEventMore = false; try { …………………… for (; processedEvents < batchSize; processedEvents += 1) { event = channel.take(); if (event == null) { // no events available in channel isNoEventMore = true; break; } …………………… } …………………… transaction.commit(); if (isNoEventMore) { try { Thread.sleep(bakoffSleepMillisecond); } catch (InterruptedException e) { // TODO: handle exception } } } catch (Exception ex) { …………………… } finally { …………………… } return result; } public void configure(Context context) { ……………… bakoffSleepMillisecond = context.getInteger(BACKOFF_SLEEP_MILLISECOND, DEFUALT_BACKOFF_SLEEP_MILLISECOND); ………………………… } }
爲何event=null的時候不返回BACKOFF狀態,而採用標記睡眠呢?由於若是返回BACKOFF狀態,flume默認會睡眠1~5秒。而咱們一般不但願延時這麼久,因此爲了避免侵入flume原先的代碼,就直接在這裏添加睡眠中斷。
配置裏寫法以下:
agent2.sinks.Usersink2.type=fm.lizhi.flume.sink.NRTKafkaSink agent2.sinks.Usersink2.backoffSleepMillisecond=100