Kafka+Log4j實現日誌集中管理

引言

前段時間寫的《Spring+Log4j+ActiveMQ實現遠程記錄日誌——實戰+分析》獲得了許多同窗的承認,在承認的同時,也有同窗提出可使用Kafka來集中管理日誌,因而今天就來學習一下。html

特別說明,因爲網絡上關於Kafka+Log4j的完整例子並很少,我也是一邊學習一邊使用,所以若是有解釋得很差或者錯誤的地方,歡迎批評指正,若是你有好的想法,也歡迎留言探討。java

第一部分 搭建Kafka環境

安裝Kafkashell

下載:http://kafka.apache.org/downloads.htmlapache

tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>

啓動Zookeeperapi

啓動Zookeeper前須要配置一下config/zookeeper.properties:網絡

接下來啓動Zookeeperapp

bin/zookeeper-server-start.sh config/zookeeper.properties

啓動Kafka Server學習

啓動Kafka Server前須要配置一下config/server.properties。主要配置如下幾項,內容就不說了,註釋裏都很詳細:ui

而後啓動Kafka Servergoogle

bin/kafka-server-start.sh config/server.properties

 建立Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看建立的Topic

>bin/kafka-topics.sh --list --zookeeper localhost:2181

啓動控制檯Producer,向Kafka發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
^C

啓動控制檯Consumer,消費剛剛發送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有當delete.topic.enable=true時,該操做纔有效

配置Kafka集羣(單臺機器上)

首先拷貝server.properties文件爲多份(這裏演示4個節點Kafka集羣,所以還須要拷貝3份配置文件):

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties

修改server1.properties的如下內容:

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

同理修改server2.propertiesserver3.properties的這些內容,並保持全部配置文件的zookeeper.connect屬性都指向運行在本機的zookeeper地址localhost:2181。注意,因爲這幾個Kafka節點都將運行在同一臺機器上,所以須要保證這幾個值不一樣,這裏以累加的方式處理。例如在server2.properties上:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

server3.properties也配置好之後,依次啓動這些節點:

bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在邏輯上能夠被認爲是一個queue,每條消費都必須指定它的Topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠線性提升,物理上把Topic分紅一個或多個Partition,每一個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的全部消息和索引文件

如今在Kafka集羣上建立備份因子爲3,分區數爲4Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

說明:備份因子replication-factor越大,則說明集羣容錯性越強,就是當集羣down掉後,數據恢復的可能性越大。全部的分區數裏的內容共同組成了一份數據,分區數partions越大,則該topic的消息就越分散,集羣中的消息分佈就越均勻。

而後使用kafka-topics.sh--describe參數查看一下Topickafka的詳情:


輸出的第一行是全部分區的概要,接下來的每一行是一個分區的描述。能夠看到Topickafka的消息,PartionCount=4ReplicationFactor=3正是咱們建立時指定的分區數和備份因子。

另外:Leader是指負責這個分區全部讀寫的節點;Replicas是指這個分區所在的全部節點(不論它是否活着);ISRReplicas的子集,表明存有這個分區信息並且當前活着的節點。

partition:0這個分區來講,該分區的Leaderserver0,分佈在id012這三個節點上,並且這三個節點都活着。

再來看下Kafka集羣的日誌:


其中kafka-logs-0表明server0的日誌,kafka-logs-1表明server1的日誌,以此類推。

從上面的配置可知,id0123的節點分別對應server0, server1, server2, server3。而上例中的partition:0分佈在id0, 1, 2這三個節點上,所以能夠在server0, server1, server2這三個節點上看到有kafka-0這個文件夾。這個kafka-0就表明Topickafkapartion0

第二部分 Kafka+Log4j項目整合

先來看下Maven項目結構圖:


做爲Demo,文件很少。先看看pom.xml引入了哪些jar包:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
</dependency>

重要的內容是log4j.properties:

log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.
log4j.logger.com.demo.kafka=DEBUG,kafka

# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kafka
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
 
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.java裏面就很簡單啦,主要是經過log4j輸出日誌:

package com.demo.kafka;
import org.apache.log4j.Logger;
public class App {
    private static final Logger LOGGER = Logger.getLogger(App.class);
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            LOGGER.info("Info [" + i + "]");
            Thread.sleep(1000);
        }
    }
}

MyConsumer.java用於消費kafka中的信息:

package com.demo.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class MyConsumer {
    private static final String ZOOKEEPER = "localhost:2181";
    //groupName能夠隨意給,由於對於kafka裏的每條消息,每一個group都會完整的處理一遍
    private static final String GROUP_NAME = "test_group";
    private static final String TOPIC_NAME = "kafka";
    private static final int CONSUMER_NUM = 4;
    private static final int PARTITION_NUM = 4;

    public static void main(String[] args) {
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zookeeper.connect", ZOOKEEPER);
        props.put("zookeeper.connectiontimeout.ms", "1000000");
        props.put("group.id", GROUP_NAME);

        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = 
            Consumer.createJavaConsumerConnector(consumerConfig);

        // create 4 partitions of the stream for topic 「test」, to allow 4
        // threads to consume
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = 
            consumerConnector.createMessageStreams(
                ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);

        // consume the messages in the threads
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new Runnable() {
                public void run() {
                    for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
                        // process message (msgAndMetadata.message())
                        System.out.println(new String(msgAndMetadata.message()));
                    }
                }
            });
        }
    }
}

MyProducer.java用於向Kafka發送消息,但不經過log4jappender發送。此案例中能夠不要。可是我仍是放在這裏:

package com.demo.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {
    private static final String TOPIC = "kafka";
    private static final String CONTENT = "This is a single message";
    private static final String BROKER_LIST = "localhost:9092";
    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("serializer.class", SERIALIZER_CLASS);
        props.put("metadata.broker.list", BROKER_LIST);
        
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        //Send one message.
        KeyedMessage<String, String> message = 
            new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);
        
        //Send multiple messages.
        List<KeyedMessage<String,String>> messages = 
            new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 5; i++) {
            messages.add(new KeyedMessage<String, String>
                (TOPIC, "Multiple message at a time. " + i));
        }
        producer.send(messages);
    }
}

到這裏,代碼就結束了。

第三部分 運行與驗證

先運行MyConsumer,使其處於監聽狀態。同時,還能夠啓動Kafka自帶的ConsoleConsumer來驗證是否跟MyConsumer的結果一致。最後運行App.java

先來看看MyConsumer的輸出:

再來看看ConsoleConsumer的輸出:

能夠看到,儘管發往Kafka的消息去往了不一樣的地方,可是內容是同樣的,並且一條也很多。最後再來看看Kafka的日誌。

咱們知道,Topickafka的消息有4partion,從以前的截圖可知這4partion均勻分佈在4kafka節點上,因而我對每個partion隨機選取一個節點查看了日誌內容。

上圖中黃色選中部分依次表明在server0上查看partion0,在server1上查看partion1,以此類推。

紅色部分是日誌內容,因爲在建立Topic時準備將20條日誌分紅4個區存儲,能夠很清楚的看到,這20條日誌確實是很均勻的存儲在了幾個partion上。

摘一點Infoq上的話:每一個日誌文件都是一個log entrie序列,每一個log entrie包含一個4字節整型數值(值爲N+5),1個字節的"magic value"4個字節的CRC校驗碼,其後跟N個字節的消息體。每條消息都有一個當前Partition下惟一的64字節的offset,它指明瞭這條消息的起始位置。磁盤上存儲的消息格式以下:

message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte 
crc : 4 bytes 
payload : n bytes

這裏咱們看到的日誌文件的每一行,就是一個log entrie,每一行前面沒法顯示的字符(藍色選中部分),就是(message length + magic value + crc)了。而log entrie的後部分,則是消息體的內容了


問題:

1. 若是要使用此種方式,有一種場景是提取某天或者某小時的日誌,那麼如何設計Topic呢?是否是要在Topic上帶入日期或者小時數?還有更好的設計方案嗎?

2. 假設按每小時設計Topic,那麼如何在使用諸如logger.info()這樣的方法時,自動根據時間去改變Topic呢?有相似的例子嗎?

----歡迎交流,共同進步。


樣例下載:百度網盤

連接: http://pan.baidu.com/s/1i400DZv 密碼: f25c


參考頁面:

http://kafka.apache.org/07/quickstart.html

http://kafka.apache.org/documentation.html#quickstart

http://www.infoq.com/cn/articles/kafka-analysis-part-1

相關文章
相關標籤/搜索