日誌收集的方案有不少,包括各類日誌過濾清洗,分析,統計,並且看起來都很高大上。本文只描述一個打入kafka的功能。
流程:app->kafka->logstash->es->kibana
業務應用直接將日誌打入kafka,而後由logstash消費,數據進入es。
另外一方面,應用在服務器上會打日誌文件。html
首先,咱們來初步實現這個方案,搭建elk略去不談,其中特別注意各個版本的兼容。這裏主要在代碼層面講解如何實現的歷程。
要將日誌數據寫入kafka,咱們想只要依賴官方提供的kafka client就能夠了,翻看github,有現成的:連接java
沒多少代碼,通看一遍,在此基礎上進行修改便可。
如下代碼在spring boot框架基礎。
核心appender代碼:git
public class KafkaAppender<E> extends KafkaAppenderConfig<E> { /** * Kafka clients uses this prefix for its slf4j logging. * This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects. */ private static final String KAFKA_LOGGER_PREFIX = "org.apache.kafka.clients"; public static final Logger logger = LoggerFactory.getLogger(KafkaAppender.class); private LazyProducer lazyProducer = null; private final AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>(); private final ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>(); private final FailedDeliveryCallback<E> failedDeliveryCallback = new FailedDeliveryCallback<E>() { @Override public void onFailedDelivery(E evt, Throwable throwable) { aai.appendLoopOnAppenders(evt); } }; public KafkaAppender() { // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer) addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); } @Override public void doAppend(E e) { ensureDeferredAppends(); if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { deferAppend(e); } else { super.doAppend(e); } } @Override public void start() { // only error free appenders should be activated if (!checkPrerequisites()) return; lazyProducer = new LazyProducer(); super.start(); } @Override public void stop() { super.stop(); if (lazyProducer != null && lazyProducer.isInitialized()) { try { lazyProducer.get().close(); } catch (KafkaException e) { this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e); } lazyProducer = null; } } @Override public void addAppender(Appender<E> newAppender) { aai.addAppender(newAppender); } @Override public Iterator<Appender<E>> iteratorForAppenders() { return aai.iteratorForAppenders(); } @Override public Appender<E> getAppender(String name) { return aai.getAppender(name); } @Override public boolean isAttached(Appender<E> appender) { return aai.isAttached(appender); } @Override public void detachAndStopAllAppenders() { aai.detachAndStopAllAppenders(); } @Override public boolean detachAppender(Appender<E> appender) { return aai.detachAppender(appender); } @Override public boolean detachAppender(String name) { return aai.detachAppender(name); } @Override protected void append(E e) { // encode 邏輯 final byte[] payload = encoder.doEncode(e); final byte[] key = keyingStrategy.createKey(e); final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[],byte[]>(topic, key, payload); Producer producer = lazyProducer.get(); if(producer == null){ logger.error("kafka producer is null"); return; } // 核心發送方法 deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); } protected Producer<byte[], byte[]> createProducer() { return new KafkaProducer<byte[], byte[]>(new HashMap<String, Object>(producerConfig)); } private void deferAppend(E event) { queue.add(event); } // drains queue events to super private void ensureDeferredAppends() { E event; while ((event = queue.poll()) != null) { super.doAppend(event); } } /** * Lazy initializer for producer, patterned after commons-lang. * * @see <a href="https://commons.apache.org/proper/commons-lang/javadocs/api-3.4/org/apache/commons/lang3/concurrent/LazyInitializer.html">LazyInitializer</a> */ private class LazyProducer { private volatile Producer<byte[], byte[]> producer; private boolean initialized; public Producer<byte[], byte[]> get() { Producer<byte[], byte[]> result = this.producer; if (result == null) { synchronized(this) { if(!initialized){ result = this.producer; if(result == null) { // 注意 這裏initialize可能失敗,好比傳入servers爲非法字符,返回producer爲空,因此只用initialized標記來確保不進行重複初始化,而避免不斷出錯的初始化 this.producer = result = this.initialize(); initialized = true; } } } } return result; } protected Producer<byte[], byte[]> initialize() { Producer<byte[], byte[]> producer = null; try { producer = createProducer(); } catch (Exception e) { addError("error creating producer", e); } return producer; } public boolean isInitialized() { return producer != null; } } }
以上代碼對producer
生產時進行initialized標記,確保在異常場景時只生產一次。
在實際場景中好比咱們的servers配置非ip的字符,initialize方法會返回null,由於判斷是否進行initialize()方法是判斷producer
是否爲空,因此進入不斷失敗的狀況,從而致使應用啓動失敗。
配置logback-spring.xml:github
<springProperty scope="context" name="LOG_KAFKA_SERVERS" source="application.log.kafka.bootstrap.servers"/> <springProperty scope="context" name="LOG_KAFKA_TOPIC" source="application.log.kafka.topic"/> <appender name="KafkaAppender" class="com.framework.common.log.kafka.KafkaAppender"> <topic>${LOG_KAFKA_TOPIC}</topic> <producerConfig>bootstrap.servers=${LOG_KAFKA_SERVERS}</producerConfig> </appender>
bootstrap.properties配置:spring
application.log.kafka.bootstrap.servers=10.0.11.55:9092 application.log.kafka.topic=prod-java
在打入kafka的json進行自定義,上面的encoder.doEncode(e)進行擴展:apache
public class FormatKafkaMessageEncoder<E> extends KafkaMessageEncoderBase<E> { protected static final int BUILDER_CAPACITY = 2048; protected static final int LENGTH_OPTION = 2048; public static final String CAUSED_BY = "Caused by: "; public static final String SUPPRESSED = "Suppressed: "; public static final char TAB = '\t'; public byte[] encode(ILoggingEvent event) { Map<String, String> formatMap = new HashMap<>(); formatMap.put("timestamp", event.getTimeStamp()!=0?String.valueOf(new Date(event.getTimeStamp())):""); formatMap.put("span", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-SpanId"):""); formatMap.put("trace", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-TraceId"):""); formatMap.put("class", event.getLoggerName()); formatMap.put("level", event.getLevel() != null?event.getLevel().toString():""); formatMap.put("message", event.getMessage()); formatMap.put("stacktrace", event.getThrowableProxy()!=null?convertStackTrace(event.getThrowableProxy()):""); formatMap.put("thread", event.getThreadName()); formatMap.put("ip", IpUtil.getLocalIP()); formatMap.put("application", event.getLoggerContextVO()!=null&&event.getLoggerContextVO().getPropertyMap()!=null? event.getLoggerContextVO().getPropertyMap().get("springAppName"):""); String formatJson = JSONObject.toJSONString(formatMap); return formatJson.getBytes(); } @Override public byte[] doEncode(E event) { return encode((ILoggingEvent) event); } public String convertStackTrace(IThrowableProxy tp){ StringBuilder sb = new StringBuilder(BUILDER_CAPACITY); recursiveAppend(sb, tp, null); return sb.toString(); } private void recursiveAppend(StringBuilder sb, IThrowableProxy tp, String prefix) { if(tp == null){ return; } if (prefix != null) { sb.append(prefix); } sb.append(tp.getClassName()).append(": ").append(tp.getMessage()); sb.append(CoreConstants.LINE_SEPARATOR); StackTraceElementProxy[] stepArray = tp.getStackTraceElementProxyArray(); boolean unrestrictedPrinting = LENGTH_OPTION > stepArray.length; int maxIndex = (unrestrictedPrinting) ? stepArray.length : LENGTH_OPTION; for (int i = 0; i < maxIndex; i++) { sb.append(TAB); StackTraceElementProxy element = stepArray[i]; sb.append(element); sb.append(CoreConstants.LINE_SEPARATOR); } IThrowableProxy[] suppressed = tp.getSuppressed(); if (suppressed != null) { for (IThrowableProxy current : suppressed) { recursiveAppend(sb, current, SUPPRESSED); } } recursiveAppend(sb, tp.getCause(), CAUSED_BY); } }
其中recursiveAppend方法是模仿ch.qos.logback.classic.spi.ThrowableProxyUtil
,用來答應異常的所有堆棧。
還有這個ip的獲取問題,InetAddress.getLocalHost().getHostAddress()
解決不了。
如下是詳細代碼:json
public class IpUtil { public static final String DEFAULT_IP = "127.0.0.1"; public static String cacheLocalIp = null; private static Logger logger = LoggerFactory.getLogger(IpUtil.class); /** * 直接根據第一個網卡地址做爲其內網ipv4地址,避免返回 127.0.0.1 * * @return */ private static String getLocalIpByNetworkCard() { String ip = null; try { for (Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements(); ) { NetworkInterface item = e.nextElement(); for (InterfaceAddress address : item.getInterfaceAddresses()) { if (item.isLoopback() || !item.isUp()) { continue; } if (address.getAddress() instanceof Inet4Address) { Inet4Address inet4Address = (Inet4Address) address.getAddress(); ip = inet4Address.getHostAddress(); } } } } catch (Exception e) { logger.error("getLocalIpByNetworkCard error", e); try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e1) { logger.error("InetAddress.getLocalHost().getHostAddress() error", e1); ip = DEFAULT_IP; } } return ip == null ? DEFAULT_IP : ip; } public synchronized static String getLocalIP() { if(cacheLocalIp == null){ cacheLocalIp = getLocalIpByNetworkCard(); return cacheLocalIp; }else{ return cacheLocalIp; } } }
另外在logback-spring.xml中配置了本地日誌appender:bootstrap
<!-- 按照天天生成日誌文件 --> <appender name="filelog" class="ch.qos.logback.core.rolling.RollingFileAppender"> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <!-- rollover daily --> <fileNamePattern>${LOG_FOLDER}/${springAppName}.%d{yyyy-MM-dd}.%i.log</fileNamePattern> <!-- each file should be at most 100MB, keep 6 days worth of history--> <maxFileSize>300MB</maxFileSize> <!--歷史文件保留個數--> <maxHistory>5</maxHistory> </rollingPolicy> <encoder> <!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符--> <pattern>${CONSOLE_LOG_PATTERN}</pattern> </encoder> </appender>
注意這裏使用SizeAndTimeBasedRollingPolicy而不是使用TimeBasedRollingPolicy+SizeBasedTriggeringPolicy。
後者是按文件大小優先級最高不會自動按日期生成新的log文件。api
至此,一個打入kafka日誌的代碼就算完結了,功能徹底,執行正確。服務器
思考下,在啓動應用或在應用運行時,kafka沒法正確接收信息,好比掛掉了。那麼這個打日誌的功能會怎麼表現呢?
固然是每次寫日誌都會嘗試去連kafka,可是失敗,必然影響應用狀態。
因此想到熔斷的思路,假設kafka掛掉,能夠經過熔斷的方式下降對應用的影響。
這裏就實現了一下熔斷器的邏輯。
熔斷器:
/** * @desc 熔斷器 * 1,使用failureCount和consecutiveSuccessCount控制斷路器狀態的流轉,二者都使用了AtomicInteger以確保併發場數量的精準 * 2,successCount 沒有使用AtomicInteger 不確保準確性 * 3,failureThreshold,consecutiveSuccessThreshold,timeout參數非法賦默認值 */ public class CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class); private String name; /** * 熔斷器狀態 */ private CircuitBreakerState state; /** * 失敗次數閥值 */ private int failureThreshold; /** * 熔斷狀態時間窗口 */ private long timeout; /** * 失敗次數 */ private AtomicInteger failureCount; /** * 成功次數 (併發不許確) */ private int successCount; /** * 半開時間窗口裏連續成功的次數 */ private AtomicInteger consecutiveSuccessCount; /** * 半開時間窗口裏連續成功的次數閥值 */ private int consecutiveSuccessThreshold; public CircuitBreaker(String name, int failureThreshold, int consecutiveSuccessThreshold, long timeout) { if(failureThreshold <= 0){ failureThreshold = 1; } if(consecutiveSuccessThreshold <= 0){ consecutiveSuccessThreshold = 1; } if(timeout <= 0){ timeout = 10000; } this.name = name; this.failureThreshold = failureThreshold; this.consecutiveSuccessThreshold = consecutiveSuccessThreshold; this.timeout = timeout; this.failureCount = new AtomicInteger(0); this.consecutiveSuccessCount = new AtomicInteger(0); state = new CloseCircuitBreakerState(this); } public void increaseFailureCount(){ failureCount.addAndGet(1); } public void increaseSuccessCount(){ successCount++; } public void increaseConsecutiveSuccessCount(){ consecutiveSuccessCount.addAndGet(1); } public boolean increaseFailureCountAndThresholdReached(){ return failureCount.addAndGet(1) >= failureThreshold; } public boolean increaseConsecutiveSuccessCountAndThresholdReached(){ return consecutiveSuccessCount.addAndGet(1) >= consecutiveSuccessThreshold; } public boolean isNotOpen(){ return !isOpen(); } /** * 熔斷開啓 關閉保護方法的調用 * @return */ public boolean isOpen(){ return state instanceof OpenCircuitBreakerState; } /** * 熔斷關閉 保護方法正常執行 * @return */ public boolean isClose(){ return state instanceof CloseCircuitBreakerState; } /** * 熔斷半開 保護方法容許測試調用 * @return */ public boolean isHalfClose(){ return state instanceof HalfOpenCircuitBreakerState; } public void transformToCloseState(){ state = new CloseCircuitBreakerState(this); } public void transformToHalfOpenState(){ state = new HalfOpenCircuitBreakerState(this); } public void transformToOpenState(){ state = new OpenCircuitBreakerState(this); } /** * 重置失敗次數 */ public void resetFailureCount() { failureCount.set(0); } /** * 重置連續成功次數 */ public void resetConsecutiveSuccessCount() { consecutiveSuccessCount.set(0); } public long getTimeout() { return timeout; } /** * 判斷是否到達失敗閥值 * @return */ protected boolean failureThresholdReached() { return failureCount.get() >= failureThreshold; } /** * 判斷連續成功次數是否達到閥值 * @return */ protected boolean consecutiveSuccessThresholdReached(){ return consecutiveSuccessCount.get() >= consecutiveSuccessThreshold; } /** * 保護方法失敗後操做 */ public void actFailed(){ state.actFailed(); } /** * 保護方法成功後操做 */ public void actSuccess(){ state.actSuccess(); } public static interface Executor { /** * 任務執行接口 * */ void execute(); } public void execute(Executor executor){ if(!isOpen()){ try{ executor.execute(); this.actSuccess(); }catch (Exception e){ this.actFailed(); logger.error("CircuitBreaker executor error", e); } }else{ logger.error("CircuitBreaker named {} is open", this.name); } } public String show(){ Map<String, Object> map = new HashMap<>(); map.put("name:",name); map.put("state", isClose()); map.put("failureThreshold:",failureThreshold); map.put("failureCount:",failureCount); map.put("consecutiveSuccessThreshold:",consecutiveSuccessThreshold); map.put("consecutiveSuccessCount:",consecutiveSuccessCount); map.put("successCount:",successCount); map.put("timeout:",timeout); map.put("state class",state.getClass()); return JSONObject.toJSONString(map); } }
狀態機:
public interface CircuitBreakerState { /** * 保護方法失敗後操做 */ void actFailed(); /** * 保護方法成功後操做 */ void actSuccess(); } public abstract class AbstractCircuitBreakerState implements CircuitBreakerState{ protected CircuitBreaker circuitBreaker; public AbstractCircuitBreakerState(CircuitBreaker circuitBreaker) { this.circuitBreaker = circuitBreaker; } @Override public void actFailed() { circuitBreaker.increaseFailureCount(); } @Override public void actSuccess() { circuitBreaker.increaseSuccessCount(); } } public class CloseCircuitBreakerState extends AbstractCircuitBreakerState{ public CloseCircuitBreakerState(CircuitBreaker circuitBreaker) { super(circuitBreaker); circuitBreaker.resetFailureCount(); circuitBreaker.resetConsecutiveSuccessCount(); } @Override public void actFailed() { // 進入開啓狀態 if (circuitBreaker.increaseFailureCountAndThresholdReached()) { circuitBreaker.transformToOpenState(); } } } public class HalfOpenCircuitBreakerState extends AbstractCircuitBreakerState{ public HalfOpenCircuitBreakerState(CircuitBreaker circuitBreaker) { super(circuitBreaker); circuitBreaker.resetConsecutiveSuccessCount(); } @Override public void actFailed() { super.actFailed(); circuitBreaker.transformToOpenState(); } @Override public void actSuccess() { super.actSuccess(); // 達到成功次數的閥值 關閉熔斷 if(circuitBreaker.increaseFailureCountAndThresholdReached()){ circuitBreaker.transformToCloseState(); } } } public class OpenCircuitBreakerState extends AbstractCircuitBreakerState{ public OpenCircuitBreakerState(CircuitBreaker circuitBreaker) { super(circuitBreaker); final Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { circuitBreaker.transformToHalfOpenState(); timer.cancel(); } }, circuitBreaker.getTimeout()); } }
/* @desc 熔斷器工廠 集中應用中的CircuitBreaker * 注意:這裏一個熔斷器一旦生產,生命週期和應用同樣,不會被清除 */ public class CircuitBreakerFactory { private static ConcurrentHashMap<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap(); public CircuitBreaker getCircuitBreaker(String name){ CircuitBreaker circuitBreaker = circuitBreakerMap.get(name); return circuitBreaker; } /** * * @param name 惟一名稱 * @param failureThreshold 失敗次數閥值 * @param consecutiveSuccessThreshold 時間窗內成功次數閥值 * @param timeout 時間窗 * 1,close狀態時 失敗次數>=failureThreshold,進入open狀態 * 2,open狀態時每隔timeout時間會進入halfOpen狀態 * 3,halfOpen狀態裏須要連續成功次數達到consecutiveSuccessThreshold, * 便可進入close狀態,出現失敗則繼續進入open狀態 * @return */ public static CircuitBreaker buildCircuitBreaker(String name, int failureThreshold, int consecutiveSuccessThreshold, long timeout){ CircuitBreaker circuitBreaker = new CircuitBreaker(name, failureThreshold, consecutiveSuccessThreshold, timeout); circuitBreakerMap.put(name, circuitBreaker); return circuitBreaker; } }
發送kafka消息時使用熔斷器:
/** * 因日誌爲非業務應用核心服務,防止kafka不穩定致使影響應用狀態,這裏使用使用熔斷機制 失敗3次開啓熔斷,每隔20秒半開熔斷,連續成功兩次關閉熔斷。 */ CircuitBreaker circuitBreaker = CircuitBreakerFactory.buildCircuitBreaker("KafkaAppender-c", 3, 2, 20000); public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event, final FailedDeliveryCallback<E> failedDeliveryCallback) { if(circuitBreaker.isNotOpen()){ try { producer.send(record, (metadata, exception) -> { if (exception != null) { circuitBreaker.actFailed(); failedDeliveryCallback.onFailedDelivery(event, exception); logger.error("kafka producer send log error",exception); }else{ circuitBreaker.actSuccess(); } }); return true; } catch (KafkaException e) { circuitBreaker.actFailed(); failedDeliveryCallback.onFailedDelivery(event, e); logger.error("kafka send log error",e); return false; } }else{ logger.error("kafka log circuitBreaker open"); return false; } }
1,elk搭建時需特別注意各個版本的兼容,kafka client的版本需和kafka版本保持一致 2,方案允許kafka日誌失敗,而本地日誌更加可靠,因此用熔斷器方案,以應對萬一。也可用於對其餘第三方請求時使用。