【Flume】- 收集 Log4j 日誌上送Kafka

Flume 收集 Log4j 日誌上送Kafka存儲


環境準備

  1. 下載Flume: http://flume.apache.org/
  2. 安裝: 解壓下載包到自定義路徑
  3. 配置agent
# 功能:過濾器過濾json信息存儲kafka
agent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type = avro
agent.sources.s1.channels = c1
agent.sources.s1.bind = centos
agent.sources.s1.port = 4444
agent.sources.s1.interceptors = search-replace
agent.sources.s1.interceptors.search-replace.type = search_replace
agent.sources.s1.interceptors.search-replace.searchPattern = [^{]*(?=\\{)
agent.sources.s1.interceptors.search-replace.replaceString =

agent.channels.c1.type = memory
agent.channels.c1.capacity = 100
agent.channels.c1.transactionCapacity = 10

agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.bootstrap.servers = centos:9092
agent.sinks.k1.channel = c1
agent.sinks.k1.kafka.topic = flume
agent.sinks.k1.kafka.flumeBatchSize = 1
agent.sinks.k1.batchSize = 5
agent.sinks.k1.kafka.producer.acks = 1

解釋:apache

  • sources: avro 用於接收Log4j日誌
  • interceptors: search-replace 搜索渠道指定規則字符,數據轉換
  • channels: memory 提供日誌處理吞吐量
  • sink: 上送Kafka主題: flume
  1. 啓動Flume Agent 命令:flume-ng agent -c conf -f ./conf/flume-kafka.conf -n agent -Dflume.root.logger=debug,console

注意:最好調試時日誌級別唯debug,方便定位問題,環境穩定後改成Infojson

  1. 啓動Kafka,並配置主題監聽
啓動kafka: kafka-server-start.sh server.properties
監聽主題Topic:  kafka-console-consumer.sh --bootstrap-server centos:9092 --topic flume

測試代碼

pom:bootstrap

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sdk</artifactId>
    <version>1.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flume.flume-ng-clients</groupId>
    <artifactId>flume-ng-log4jappender</artifactId>
    <version>1.9.0</version>
</dependency>
 <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.24</version>
</dependency>

log4j.propertiescentos

log4j.rootLogger=debug,stdout,flume
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.72.129
log4j.appender.flume.Port=4444
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m

### 輸出信息到控制檯 ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{hh:mm:ss,SSS} [%t] %-5p [%c:%L] %x - %m%n
for (int i = 0; i < 100; i++) {
            log.info("{'name':'Adam', 'age':'26', 'skill':'reading'}");
        }

效果

相關文章
相關標籤/搜索