log4j2發送消息至Kafka

title: 自定義log4j2發送日誌到Kafkasql

 

 

 

 

 

tags: log4j2,kafkaapache

爲了給公司的大數據平臺提供各項目組的日誌,而又使各項目組在改動上無感知。作了一番調研後才發現log4j2默認有支持將日誌發送到kafka的功能,驚喜之下趕忙看了下log4j對其的實現源碼!發現默認的實現是同步阻塞的,若是kafka服務一旦掛掉會阻塞正常服務的日誌打印,爲此本人在參考源碼的基礎上作了一些修改。bootstrap

log4j日誌工做流程架構

log4j2對於log4j在性能上有着顯著的提高,這點官方上已經有了明確的說明和測試,因此很少贅述。在爲了更熟練的使用,仍是有必要了解其內部的工做流程。這是 官網 log4j的一張類圖併發

Applications using the Log4j 2 API will request a Logger with a specific name from the LogManager. The LogManager will locate the appropriate LoggerContext and then obtain the Logger from it. If the Logger must be created it will be associated with the LoggerConfig that contains either a) the same name as the Logger, b) the name of a parent package, or c) the root LoggerConfig. LoggerConfig objects are created from Logger declarations in the configuration. The LoggerConfig is associated with the Appenders that actually deliver the LogEvents.app

官網已經解釋他們之間的關係了,這裏再也不對每一個類的功能和做用作具體介紹,今天的重點是 Appender 類,由於他將決定將日誌輸出至何方。socket

  • Appender

The ability to selectively enable or disable logging requests based on their logger is only part of the picture. Log4j allows logging requests to print to multiple destinations. In log4j speak, an output destination is called an Appender. Currently, appenders exist for the console, files, remote socket servers, Apache Flume, JMS, remote UNIX Syslog daemons, and various database APIs. See the section on Appenders for more details on the various types available. More than one Appender can be attached to a Logger.分佈式

核心配置ide

 

 

 

 

 

 

 

 

 

 

上圖是log4j2發送日誌到kafka的核心類,其實最主要的 KafkaAppender ,其餘的幾個類是鏈接 kafka 服務的。高併發

  • KafkaAppender核心配置
@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    @PluginFactory
    public static KafkaAppender createAppender(
            @PluginElement("Layout") final Layout<? extends Serializable> layout,
            @PluginElement("Filter") final Filter filter,
            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
            @PluginElement("Properties") final Property[] properties) {
        final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
    }
    private final KafkaManager manager;
    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
        super(name, filter, layout, ignoreExceptions);
        this.manager = manager;
    }
    @Override
    public void append(final LogEvent event) {
        if (event.getLoggerName().startsWith("org.apache.kafka")) {
            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
        } else {
            try {
                if (getLayout() != null) {
                    manager.send(getLayout().toByteArray(event));
                } else {
                    manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
                }
            } catch (final Exception e) {
                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
            }
        }
    }
    @Override
    public void start() {
        super.start();
        manager.startup();
    }
    @Override
    public void stop() {
        super.stop();
        manager.release();
    }
  • log4j2.xml簡單配置
<?xml version="1.0" encoding="UTF-8"?>
  ...
  <Appenders>
    <Kafka name="Kafka" topic="log-test">
      <PatternLayout pattern="%date %message"/>
        <Property name="bootstrap.servers">localhost:9092</Property>
    </Kafka>
  </Appenders>
  
    <Loggers>
    <Root level="DEBUG">
      <AppenderRef ref="Kafka"/>
    </Root>
    <Logger name="org.apache.kafka" level="INFO" /> <!-- avoid recursive logging -->
  </Loggers>

其中 @Plugin 的name屬性對應的xml配置文件裏面Kafka標籤,固然這個也能夠自定義。與此同時,也須要將 @Plugin 的name屬性改成MyKafka。以下配置:

<MyKafka name="Kafka" topic="log-test">

自定義配置

有時候咱們會用到的屬性因爲默認的 KafkaAppender 不必定支持,因此須要必定程度的改寫。可是改寫也比較方便,只須要從構造器的 Properties kafkaProps 屬性中取值便可。爲了知足項目要求,我這邊定義了platform和serviceName兩個屬性。

經過 KafkaAppender 的源碼可知,他發送消息採起的是同步阻塞的方式。通過測試,一旦kafka服務掛掉,那麼將會影響項目服務正常的日誌輸出,而這不是我但願看到的,因此我對他作了必定的程度的修改。

feature::

  • kafka服務一直正常
  • 這種狀況屬於最理想的狀況,消息將源源不斷的發送至kafka broker
  • kafka服務掛掉,過一段時間後恢復正常
  • 當kafka服務在掛掉的那一刻,後續全部的消息將會輸出至 ConcurrentLinkedQueue 隊列裏面去。同時該隊列的消息也會不斷的被消費,輸出至本地文件。小心跳檢測到kafka broker恢復正常了,本地文件的內容將會被讀取,而後發送至kafka broker。須要注意的時候, 此時會有大量消息被實例化爲ProducerRecord 對象,堆內存的佔用率很是高,因此我用線程阻塞了一下!
  • kafka服務一直掛

全部的消息都會被輸出至本地文件。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索