# 功能:過濾器過濾json信息存儲kafka agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type = avro agent.sources.s1.channels = c1 agent.sources.s1.bind = centos agent.sources.s1.port = 4444 agent.sources.s1.interceptors = search-replace agent.sources.s1.interceptors.search-replace.type = search_replace agent.sources.s1.interceptors.search-replace.searchPattern = [^{]*(?=\\{) agent.sources.s1.interceptors.search-replace.replaceString = agent.channels.c1.type = memory agent.channels.c1.capacity = 100 agent.channels.c1.transactionCapacity = 10 agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.kafka.bootstrap.servers = centos:9092 agent.sinks.k1.channel = c1 agent.sinks.k1.kafka.topic = flume agent.sinks.k1.kafka.flumeBatchSize = 1 agent.sinks.k1.batchSize = 5 agent.sinks.k1.kafka.producer.acks = 1
解釋:apache
注意:最好調試時日誌級別唯debug,方便定位問題,環境穩定後改成Infojson
啓動kafka: kafka-server-start.sh server.properties 監聽主題Topic: kafka-console-consumer.sh --bootstrap-server centos:9092 --topic flume
pom:bootstrap
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.24</version> </dependency>
log4j.propertiescentos
log4j.rootLogger=debug,stdout,flume log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname=192.168.72.129 log4j.appender.flume.Port=4444 log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m ### 輸出信息到控制檯 ### log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{hh:mm:ss,SSS} [%t] %-5p [%c:%L] %x - %m%n
for (int i = 0; i < 100; i++) { log.info("{'name':'Adam', 'age':'26', 'skill':'reading'}"); }