flume使用(一):入門demo html
flume使用(二):採集遠程日誌數據到MySql數據庫 node
flume使用(三):實時log4j日誌經過flume輸出到MySql數據庫 git
flume使用(四):taildirSource多文件監控實時採集 github
(1)flume會把重命名的文件從新看成新文件讀取是由於正則表達式的緣由,由於重命名後的文件名仍然符合正則表達式。因此第一,重命名後的文件仍然會被flume監控;第二,flume是根據文件inode&&文件絕對路徑 、文件是否爲null&&文件絕對路徑,這樣的條件來判斷是不是同一個文件這個能夠看源碼:下 載 源碼,放到maven項目(注意路徑名稱對應),找到taildirsource的包。
ReliableTaildirEventReader 類的 updateTailFiles 方法
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
updateTime = System.currentTimeMillis();
List<Long> updatedInodes = Lists.newArrayList();
for (TaildirMatcher taildir : taildirCache) {
Map<String, String> headers = headerTable.row(taildir.getFileGroup());
for (File f : taildir.getMatchingFiles()) {
long inode = getInode(f);
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
} else {
boolean updated = tf.getLastUpdated() < f.lastModified();
if (updated) {
if (tf.getRaf() == null) {
tf = openFile(f, headers, inode, tf.getPos());
if (f.length() < tf.getPos()) {
logger.info("Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0);
tailFiles.put(inode, tf);
return updatedInodes;
for (File f : taildir.getMatchingFiles()) {
long inode = getInode(f);
TailFile tf = tailFiles.get(inode);
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
TailFile 類的 updatePos 方法:
public boolean updatePos(String path, long inode, long pos) throws IOException {
<strong>if (this.inode == inode && this.path.equals(path)) {</strong>
logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
return true;
return false;
就如正則表達式:【.*.log.* 】這樣的正則表達式固然文件由 .ac.log 重命名爲.ac.log.1會帶來重複讀取的問題。
而正則表達式:【.*.log】 當文件由 .ac.log 重命名爲 .ac.log.1 就不會被flume監控,就不會有重複讀取的問題。
固然,若是相似【.*.log.* 】這樣的正則表達式在實際生產中是很是必要使用的話,那麼flume團隊應該會根據github上issue的呼聲大小來考慮是否修正到項目中。
1.修改 ReliableTaildirEventReader
修改 ReliableTaildirEventReader 類的 updateTailFiles方法。
去除tf.getPath().equals(f.getAbsolutePath()) 。只用判斷文件不爲空便可,不用判斷文件的名字,由於log4j 日誌切分文件會重命名文件。
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
if (tf == null) {
修改TailFile 類的 updatePos方法。
inode 已經可以肯定惟一的 文件,不用加 path 做爲斷定條件
if (this.inode == inode && this.path.equals(path)) {
if (this.inode == inode) {
這個問題也不是問題,實際上,資源的確會釋放,可是 是有必定時間等待。
In CDH 5.2 and higher, Apache Flume contains an Apache Kafka source and sink. Use these to stream data from Kafka to Hadoop or from any Flume source to Kafka.
In CDH 5.7 and higher, the Flume connector to Kafka only works with Kafka 2.0 and higher.
Important: Do not configure a Kafka source to send data to a Kafka sink. If you do, the Kafka source sets the topic in the event header, overriding the sink configuration and creating an infinite loop, sending messages back and forth between the source and sink. If you need to use both a source and a sink, use an interceptor to modify the event header and set a different topic.
For information on configuring Kafka to securely communicate with Flume, see Configuring Flume Security with Kafka.
Use the Kafka source to stream data in Kafka topics to Hadoop. The Kafka source can be combined with any Flume sink, making it easy to write Kafka data to HDFS, HBase, and Solr.
The following Flume configuration example uses a Kafka source to send data to an HDFS sink:
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.kafka.bootstrap.servers = kafka-broker01.example.com:9092 tier1.sources.source1.kafka.topics = weblogs tier1.sources.source1.kafka.consumer.group.id = flume tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 100 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
For higher throughput, configure multiple Kafka sources to read from the same topic. If you configure all the sources with the same kafka.consumer.group.id, and the topic contains multiple partitions, each source reads data from a different set of partitions, improving the ingest rate.
For the list of Kafka Source properties, see Kafka Source Properties.
For the full list of Kafka consumer properties, see the Kafka documentation.
Tuning Notes
The Kafka source overrides two Kafka consumer parameters:
Use the Kafka sink to send data to Kafka from a Flume source. You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS.
The following Flume configuration example uses a Kafka sink with an exec source:
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = exec tier1.sources.source1.command = /usr/bin/vmstat 1 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.topic = sink1 tier1.sinks.sink1.brokerList = kafka01.example.com:9092,kafka02.example.com:9092 tier1.sinks.sink1.channel = channel1 tier1.sinks.sink1.batchSize = 20
For the list of Kafka Sink properties, see Kafka Sink Properties.
For the full list of Kafka producer properties, see the Kafka documentation.
The Kafka sink uses the topic and key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor.
CDH 5.3 and higher includes a Kafka channel to Flume in addition to the existing memory and file channels. You can use the Kafka channel:
The following Flume configuration uses a Kafka channel with an exec source and hdfs sink:
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = exec tier1.sources.source1.command = /usr/bin/vmstat 1 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181 tier1.channels.channel1.parseAsFlumeEvent = false tier1.channels.channel1.kafka.topic = channel2 tier1.channels.channel1.kafka.consumer.group.id = channel2-grp tier1.channels.channel1.kafka.consumer.auto.offset.reset = earliest tier1.channels.channel1.kafka.bootstrap.servers = kafka02.example.com:9092,kafka03.example.com:9092 tier1.channels.channel1.transactionCapacity = 1000 tier1.channels.channel1.kafka.consumer.max.partition.fetch.bytes=2097152 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
For the list of Kafka Channel properties, see Kafka Channel Properties.
For the full list of Kafka producer properties, see the Kafka documentation.
Categories: Flume | Kafka | All Categories