背景:利用kafka+flume+morphline+solr作實時統計。html
solr從12月23號開始一直沒有數據。查看日誌發現,由於有一個同事加了一條格式錯誤的埋點數據,致使大量error。node
據推斷,是由於使用mem channel佔滿,消息來不及處理,致使新來的數據都丟失了。apache
修改flume使用file channel:bootstrap
kafka2solr.sources = source_from_kafka kafka2solr.channels = file_channel kafka2solr.sinks = solrSink # For each one of the sources, the type is defined kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource kafka2solr.sources.source_from_kafka.channels = file_channel kafka2solr.sources.source_from_kafka.batchSize = 100 kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092 kafka2solr.sources.source_from_kafka.kafka.topics = eventCount kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest # file channel kafka2solr.channels.file_channel.type = file kafka2solr.channels.file_channel.checkpointDir = /var/log/flume-ng/checkpoint kafka2solr.channels.file_channel.dataDirs = /var/log/flume-ng/data kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink kafka2solr.sinks.solrSink.channel = file_channel #kafka2solr.sinks.solrSink.batchSize = 1000 #kafka2solr.sinks.solrSink.batchDurationMillis = 1000 kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf kafka2solr.sinks.solrSink.morphlineId=morphline1 kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true
使得數據持久化到磁盤不會丟失。spa