不說過程了,直接說結果!一對相鏈接的channel-HdfsSink,無心間配置以下:
...
agent.channels.common-channel.transactionCapacity=10
...
agent.sinks.hdfs-sink.hdfs.batchSize=20
簡單測試以後發現flume報以下異常,倒也正常……java
[2015-12-17 11:42:09:694 ERROR][org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:467)]process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or in creasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
[2015-12-17 11:42:09:696 ERROR][org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
... 3 more
可是......可是老子發現停掉髮送,channelSize一直不減,hdfs裏的數據也一直在漲!!!並且永遠停不下來,數據被永遠重放!!!
查看相關HdfsSink代碼以下:apache
public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); List<BucketWriter> writers = Lists.newArrayList(); transaction.begin(); try { int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); if (event == null) { break; } ...... bucketWriter.append(event); ...... transaction.commit(); if (txnEventCount < 1) { return Status.BACKOFF; } else { sinkCounter.addToEventDrainSuccessCount(txnEventCount); return Status.READY; } } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF; } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { transaction.close(); } }
看到了嘛!異常在取第11個數據時channel.take();出現,而後這次事物被回滾,可是,可是尼瑪以前取出來的10個Event都被bucketWriter.append(event);了,也就是被寫到hdfs了;
而後就是最初的現象了,事物一直不斷的被回滾,但部分取到的數據卻也寫到hdfs了,這尼瑪算是什麼事務……
雖然是不合理的配置參數,但flume啓動時有一大陀檢測參數的代碼也沒檢測到這些,至少給報個錯或WARN嘛!app