flume-ng的CPU高消耗定位及改進

公司用的flume-ng採集數據,數據源是日誌文件,而後經過正則表達式來過濾指定日誌,最後發送給kafka。使用的是apache-flume-ng1.6.0版本。java

最初消耗以下:正則表達式

通過我改進後,消耗以下:apache

cpu消耗整整下降了幾十甚至百倍。app


其中改進點有兩個:一是自定義一個關鍵字篩選器;二是重寫KafkaSink.java文件爲另外一個類,添加無事件處理睡眠配置。spa

咱們的日誌格式大概是這樣子:日誌

XXXXXXXX`XXXXXXX`XXXXXX`…`op=0x38(OP_REPORT_ACTION)`seq=3424`xxx=xxxx`xxx=xxxx`xxx=xxxx……code

關鍵字篩選器fm.lizhi.flume.channel.OpChannelSelector,配置以下:事件

agent2.sources.Usersource2.selector.type = fm.lizhi.flume.channel.OpChannelSelector
agent2.sources.Usersource2.selector.mapping.0x3a(OP_ADD_COLLECT_PROGRAM)=Userchannel2
agent2.sources.Usersource2.selector.mapping.0x38(OP_REPORT_ACTION) = Userchannel2
agent2.sources.Usersource2.selector.skip = 90

上面的skip意思是直接跳過90個字符不檢測(由於咱們打印的op=確定在90個字符之後,因此爲了節約資源設置)。ip

這個selector主要檢測op=後的值,匹配上mapping後的值,就發送給相應的channel,不然丟棄。從而達到篩選目的。資源


對KafkaSink的改進以下:

package fm.lizhi.flume.sink;


/**
 * 近實時的KafkaSink,相比KafkaSink多了backoffSleepMillisecond屬性。<br>
 * 改造自1.6.0版本的KafkaSink
 * @author zhenghaofeng
 */
public class NRTKafkaSink extends AbstractSink implements Configurable {

	private int bakoffSleepMillisecond;

	private static final String BACKOFF_SLEEP_MILLISECOND = "backoffSleepMillisecond";
	private static final int DEFUALT_BACKOFF_SLEEP_MILLISECOND = 0;

	public Status process() throws EventDeliveryException {
		
		boolean isNoEventMore = false;

		try {
			……………………
			for (; processedEvents < batchSize; processedEvents += 1) {
				event = channel.take();

				if (event == null) {
					// no events available in channel
					isNoEventMore = true;
					break;
				}
                                ……………………

			}
                        ……………………

			transaction.commit();

			if (isNoEventMore) {
				try {
					Thread.sleep(bakoffSleepMillisecond);
				} catch (InterruptedException e) {
					// TODO: handle exception
				}
			}

		} catch (Exception ex) {
			……………………
		} finally {
			……………………
		}

		return result;
	}


	public void configure(Context context) {

		………………
		bakoffSleepMillisecond = context.getInteger(BACKOFF_SLEEP_MILLISECOND, DEFUALT_BACKOFF_SLEEP_MILLISECOND);
		…………………………
	}
}

爲何event=null的時候不返回BACKOFF狀態,而採用標記睡眠呢?由於若是返回BACKOFF狀態,flume默認會睡眠1~5秒。而咱們一般不但願延時這麼久,因此爲了避免侵入flume原先的代碼,就直接在這裏添加睡眠中斷。

配置裏寫法以下:

agent2.sinks.Usersink2.type=fm.lizhi.flume.sink.NRTKafkaSink
agent2.sinks.Usersink2.backoffSleepMillisecond=100
相關文章
相關標籤/搜索