昨天進行了ELK日誌環境的搭建,從網上看是說logstash有性能瓶頸,解決的方式就是加入消息隊列或者使用logstash集羣。因此今天在原有框架上增長了kafka做爲消息隊列。html
kafka:git
從官網下載tar 包,解壓便可github
kafka配置使用默認配置便可web
啓動的時候出現提示錯誤: 找不到或沒法加載主類 的解決方案spring
在配置好kafka的server.properties文件後,cmd進入命令窗口輸入命令:.\bin\windows\kafka-server-start.bat config\server.properties提示錯誤:錯誤: 找不到或沒法加載主類 Files\Java\jdk1.7.0_80\lib;C:\Program 解決方式以下:在kafka安裝目錄中找到bin\windows目錄中的kafka-run-class.bat找到142行爲%CLASSPATH%加上雙引號
apache
修改前:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
修改後:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
bootstrap
修改昨天的logstash數據源配置windows
input { kafka { bootstrap_servers => "localhost:9092" topics => ["logs"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "applog" } }
修改logback的配置,將logstash改成kafkabash
引入lagback-kafka依賴 app
<dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.1.0</version> </dependency>
修改logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="10 seconds"> <springProperty scope="context" name="springAppName" source="${spring.application.name}" /> <property name="CONSOLE_LOG_PATTERN" value="%date [%thread] %-5level %logger{36} - %msg%n" /> <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender"> <withJansi>true</withJansi> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>utf8</charset> </encoder> </appender> <!--<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <destination>127.0.0.1:4560</destination> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <providers> <timestamp> <timeZone>UTC</timeZone> </timestamp> <pattern> <pattern> { "severity":"%level", "service": "${springAppName:-}", "trace": "%X{X-B3-TraceId:-}", "span": "%X{X-B3-SpanId:-}", "exportable": "%X{X-Span-Export:-}", "pid": "${PID:-}", "thread": "%thread", "class": "%logger{40}", "rest": "%message" } </pattern> </pattern> </providers> </encoder> </appender>--> <!-- This is the kafkaAppender --> <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <!-- This is the default encoder that encodes every log message to an utf8-encoded string --> <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder"> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </layout> </encoder> <topic>logs</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --> <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --> <!-- bootstrap.servers is the only mandatory producerConfig --> <producerConfig>bootstrap.servers=localhost:9092</producerConfig> <!-- this is the fallback appender if kafka is not available. --> <appender-ref ref="stdout" /> </appender> <appender name="dailyRollingFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File>main.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>main.%d{yyyy-MM-dd}.log</FileNamePattern> <maxHistory>30</maxHistory> </rollingPolicy> <encoder> <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{35} - %msg %n</Pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>DEBUG</level> </filter> </appender> <springProfile name="!production"> <logger name="com.example" level="DEBUG" /> <logger name="org.springframework.web" level="INFO"/> <root level="info"> <appender-ref ref="stdout" /> <appender-ref ref="dailyRollingFileAppender" /> <appender-ref ref="kafkaAppender" /> </root> </springProfile> <springProfile name="production"> <logger name="com.example" level="DEBUG" /> <logger name="org.springframework.web" level="INFO"/> <root level="info"> <appender-ref ref="stdout" /> <appender-ref ref="dailyRollingFileAppender" /> <appender-ref ref="kafkaAppender" /> </root> </springProfile> </configuration>
啓動zookeper
進入kafka下的bin目錄,執行命令
.\zookeeper-server-start.bat C:\work\kafka_2.11-1.1.0\config\zookeeper.properti es
啓動kafka server
執行命令
.\kafka-server-start.bat C:\work\kafka_2.11-1.1.0\config\server.properties
剩下部分與昨天相同 啓動ElasticSearch,啓動logstash,啓動kibana
訪問Controller,能夠看到日誌也寫入了ElasticSearch