log4j2輸出到kafka

maven

<!-- log4j2 kafka appender -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
            <exclusions> <!-- exclude掉過期的log依賴 -->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <!-- log4j2 async -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.0</version>
        </dependency>

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn" name="MyApp" packages="">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT" ignoreExceptions="false">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L)  - %m%n" />
        </Console>
        <Kafka name="KafkaAppender" topic="error-log">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] (%F:%L)  - %m%n" />
            <Property name="bootstrap.servers">localhost:9092</Property>
            <Property name="timeout.ms">10000</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <AsyncLogger name="async">
            <AppenderRef ref="KafkaAppender" />
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="Console" />
            <AppenderRef ref="KafkaAppender" level="error" />
        </Root>
    </Loggers>
</Configuration>

test

@Test
    public void consumeErrorLog(){
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("auto.offset.reset","smallest");
        props.put("group.id", "testgroup6");
        props.put("enable.auto.commit", "true");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "2000"); // 從200修改爲2000 過短有rebalance錯誤
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        int localConsumerCount = 1;
        topicCountMap.put(TOPIC, localConsumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
        streams.stream().forEach(stream -> {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println(new String(it.next().message()));
            }
        });
    }

docs

如何使用Kafka實時的收集與存儲這些Log4j產生的日誌呢?一種方案是使用其餘組件(好比Flume,或者本身開發程序)實時監控這些日誌文件,而後發送至Kafka。而另一種比較便捷的方案是使用Kafka自帶的Log4jAppender,在Log4j配置文件中進行相應的配置,便可完成將Log4j產生的日誌實時發送至Kafka中。git

相關文章
相關標籤/搜索