這兩天在學習storm實時流的時候須要將logback日誌寫入kafka,這期間遇到了不少坑,這裏把遇到的坑和解決的問題記錄一下,和你們共勉html
因爲第一次使用kafka,按照以往的經驗,以爲應該是引入clinet的依賴 因此就引入了java
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
複製代碼
而後 Producer 引入的是 import org.apache.kafka.clients.producer.Producer
結果就在調用producer.send方法的時候一直阻塞,也不報錯,而且 properties.put("serializer.class","kafka.serializer.StringEncoder"); 方法也一直報找不到類
apache
結果一番周折,最終發現原來是引入的包不對,應該是引入json
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
複製代碼
而且因爲kafka是用scala語言寫的,因此還應該引入scala的源依賴api
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
複製代碼
而且應該 import kafka.javaapi.producer.Producer;
bash
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:66)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:270)
at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657)
at org.apache.hadoop.conf.Configuration.<clinit>(Configuration.java:173)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
... 8 more
複製代碼
百度了一下,發現,緣由是:==log4j-over-slf4j和slf4j-log4j12是跟Java日誌系統相關的兩個jar包,當它們同時出如今classpath下時,就可能會引發堆棧溢出異常。==app
解決的辦法也很簡單,在引入kafka依賴的時候,排除log4j12的依賴ide
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
複製代碼
這樣問題就迎刃而解了。oop
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<!--配置自定義日誌輸出類-->
<appender name="KAFKA" class="com.gwf.log.KafkaAppender">
<topic>mytopic</topic>
<zookeeperHost>localhost:2181</zookeeperHost>
<brokerList>localhost:9092</brokerList>
<formatter class="com.gwf.log.formatter.JsonFormatter">
<expectJson>false</expectJson>
</formatter>
</appender>
<!--debug 級別日誌使用KAFKA寫入-->
<root level="debug">
<appender-ref ref="KAFKA"/>
</root>
</configuration>
複製代碼
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.gwf.log.formatter.Formatter;
public class JsonFormatter implements Formatter {
private static final String QUOTE = "\"";
private static final String COLON = ":";
private static final String COMMA = ",";
private boolean expectJson = false;
@Override
public String format(ILoggingEvent event) {
StringBuilder sb = new StringBuilder();
sb.append("{");
fieldName("level",sb);
quoto(event.getLevel().levelStr,sb);
sb.append(COMMA);
fieldName("logger",sb);
quoto(event.getLoggerName(),sb);
sb.append(COMMA);
fieldName("timestamp",sb);
sb.append(event.getTimeStamp());
sb.append(COMMA);
fieldName("message",sb);
if(this.expectJson){
sb.append(event.getFormattedMessage());
}else {
quoto(event.getFormattedMessage(),sb);
}
sb.append("}");
return sb.toString();
}
private static void fieldName(String name,StringBuilder sb){
quoto(name,sb);
sb.append(COLON);
}
private static void quoto(String value,StringBuilder sb){
sb.append(QUOTE);
sb.append(value);
sb.append(QUOTE);
}
private boolean isExpectJson(){
return expectJson;
}
public void setExpectJson(boolean expectJson){
this.expectJson = expectJson;
}
}
複製代碼
package com.gwf.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.gwf.log.formatter.Formatter;
import com.gwf.log.formatter.MessageFormatter;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import lombok.Data;
import java.util.Properties;
@Data
public class KafkaAppender extends AppenderBase<ILoggingEvent> {
private String topic;
private String zookeeperHost;
private String brokerList;
private Producer<String,String> producer;
private Formatter formatter;
@Override
public void start() {
if(null == this.formatter){
this.formatter = new MessageFormatter();
}
super.start();
Properties properties = new Properties();
properties.put("metadata.broker.list",brokerList);
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");
ProducerConfig config = new ProducerConfig(properties);
this.producer = new Producer<String, String>(config);
}
@Override
public void stop() {
super.stop();
this.producer.close();
}
@Override
protected void append(ILoggingEvent iLoggingEvent) {
//講日誌轉換成json
String payload = this.formatter.format(iLoggingEvent);
producer.send(new KeyedMessage<String, String>(topic,payload));
}
}
複製代碼
package com.gwf.log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RogueApplication {
private static final Logger LOG = LoggerFactory.getLogger(RogueApplication.class);
public static void main(String[] args) throws InterruptedException {
int slowCount = 6;
int fastCount = 15;
//slow state
for (int i=0;i<slowCount;i++){
LOG.warn("This is a warning (slow state)");
Thread.sleep(5000);
}
//enter rapid state
for(int i=0;i<fastCount;i++){
LOG.warn("This is a warning (rapid state)");
Thread.sleep(1000);
}
//return to slow state
for(int i=0;i<slowCount;i++){
LOG.warn("This is a warning (slow state)");
Thread.sleep(5000);
}
}
}
複製代碼