ELK環境搭建(二)—— 加入kafka

簡介

昨天進行了ELK日誌環境的搭建,從網上看是說logstash有性能瓶頸,解決的方式就是加入消息隊列或者使用logstash集羣。因此今天在原有框架上增長了kafka做爲消息隊列。html

安裝

kafka:git

從官網下載tar 包,解壓便可github

配置

kafka配置使用默認配置便可web

啓動的時候出現提示錯誤: 找不到或沒法加載主類 的解決方案spring

在配置好kafka的server.properties文件後,cmd進入命令窗口輸入命令:.\bin\windows\kafka-server-start.bat config\server.properties提示錯誤:錯誤: 找不到或沒法加載主類 Files\Java\jdk1.7.0_80\lib;C:\Program 解決方式以下:在kafka安裝目錄中找到bin\windows目錄中的kafka-run-class.bat找到142行爲%CLASSPATH%加上雙引號apache

修改前: 
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %* 
修改後: 
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
bootstrap

修改昨天的logstash數據源配置windows

input {  
  kafka {
    bootstrap_servers => "localhost:9092"
    topics  =>  ["logs"]
 }
}
  
output {  
  elasticsearch {  
     hosts => ["localhost:9200"]  
     index => "applog"  
  }  
}

修改logback的配置,將logstash改成kafkabash

引入lagback-kafka依賴 app

<dependency>
			<groupId>com.github.danielwegener</groupId>
			<artifactId>logback-kafka-appender</artifactId>
			<version>0.1.0</version>
		</dependency>

修改logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds">

    <springProperty scope="context" name="springAppName"
                    source="${spring.application.name}" />

    <property name="CONSOLE_LOG_PATTERN"
              value="%date [%thread] %-5level %logger{36} - %msg%n" />

    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
        <withJansi>true</withJansi>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!--<appender name="logstash"
              class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:4560</destination>
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity":"%level",
                        "service": "${springAppName:-}",
                        "trace": "%X{X-B3-TraceId:-}",
                        "span": "%X{X-B3-SpanId:-}",
                        "exportable": "%X{X-Span-Export:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger{40}",
                        "rest": "%message"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>-->

    <!-- This is the kafkaAppender -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <!-- This is the default encoder that encodes every log message to an utf8-encoded string  -->
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </layout>
        </encoder>
        <topic>logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

        <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>

        <!-- this is the fallback appender if kafka is not available. -->
        <appender-ref ref="stdout" />
    </appender>


    <appender name="dailyRollingFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <File>main.log</File>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>main.%d{yyyy-MM-dd}.log</FileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{35} - %msg %n</Pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>DEBUG</level>
        </filter>
    </appender>

    <springProfile name="!production">
        <logger name="com.example" level="DEBUG" />
        <logger name="org.springframework.web" level="INFO"/>
        <root level="info">
            <appender-ref ref="stdout" />
            <appender-ref ref="dailyRollingFileAppender" />
            <appender-ref ref="kafkaAppender" />
        </root>
    </springProfile>

    <springProfile name="production">
        <logger name="com.example" level="DEBUG" />
        <logger name="org.springframework.web" level="INFO"/>
        <root level="info">
            <appender-ref ref="stdout" />
            <appender-ref ref="dailyRollingFileAppender" />
            <appender-ref ref="kafkaAppender" />
        </root>
    </springProfile>
</configuration>

啓動

啓動zookeper

進入kafka下的bin目錄,執行命令

.\zookeeper-server-start.bat C:\work\kafka_2.11-1.1.0\config\zookeeper.properti
es

啓動kafka server

執行命令

.\kafka-server-start.bat C:\work\kafka_2.11-1.1.0\config\server.properties

剩下部分與昨天相同  啓動ElasticSearch,啓動logstash,啓動kibana

訪問Controller,能夠看到日誌也寫入了ElasticSearch

相關文章
相關標籤/搜索