<!-- log4j2 kafka appender --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> <exclusions> <!-- exclude掉過期的log依賴 --> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <!-- log4j2 async --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.0</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="warn" name="MyApp" packages=""> <Appenders> <Console name="Console" target="SYSTEM_OUT" ignoreExceptions="false"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L) - %m%n" /> </Console> <Kafka name="KafkaAppender" topic="error-log"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L) - %m%n" /> <Property name="bootstrap.servers">localhost:9092</Property> <Property name="timeout.ms">10000</Property> </Kafka> </Appenders> <Loggers> <AsyncLogger name="async"> <AppenderRef ref="KafkaAppender" /> </AsyncLogger> <Root level="info"> <AppenderRef ref="Console" /> <AppenderRef ref="KafkaAppender" level="error" /> </Root> </Loggers> </Configuration>
@Test public void consumeErrorLog(){ Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("auto.offset.reset","smallest"); props.put("group.id", "testgroup6"); props.put("enable.auto.commit", "true"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "2000"); // 從200修改爲2000 過短有rebalance錯誤 props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); int localConsumerCount = 1; topicCountMap.put(TOPIC, localConsumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); streams.stream().forEach(stream -> { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } }); }
如何使用Kafka實時的收集與存儲這些Log4j產生的日誌呢?一種方案是使用其餘組件(好比Flume,或者本身開發程序)實時監控這些日誌文件,而後發送至Kafka。而另一種比較便捷的方案是使用Kafka自帶的Log4jAppender,在Log4j配置文件中進行相應的配置,便可完成將Log4j產生的日誌實時發送至Kafka中。git
KafkaAppendergithub
log4j2-kafka配置實例apache