在互聯網設計架構過程當中,日誌異步落庫,儼然已是高併發環節中不可缺乏的一環。爲何說是高併發環節中不可缺乏的呢? 緣由在於,若是直接用mq進行日誌落庫的時候,低併發下,生產端生產數據,而後由消費端異步落庫,是沒有什麼問題的,並且性能也都是異常的好,估計tp99應該都在1ms之內。可是一旦併發增加起來,慢慢的你就發現生產端的tp99一直在增加,從1ms,變爲2ms,4ms,直至send timeout。尤爲在大促的時候,我司的系統就經歷過這個狀況,當時mq的發送耗時超過200ms,甚至一度有很多timeout產生。html
考慮到這種狀況在高併發的狀況下才出現,因此今天咱們就來探索更加可靠的方法來進行異步日誌落庫,保證所使用的方式不會由於太高的併發而出現接口ops持續降低甚至到不可用的狀況。apache
方案一: 基於log4j的異步appender實現架構
此種方案,依賴於log4j。在log4j的異步appender中,經過mq進行生產消費入庫。至關於在接口和mq之間創建了一個緩衝區,使得接口和mq的依賴分離,從而不讓mq的操做影響接口的ops。併發
此種方案因爲使用了異步方式,且因爲異步的discard policy策略,當大量數據過來,緩衝區滿了以後,會拋棄部分數據。此種方案適用於可以容忍數據丟失的業務場景,不適用於對數據完整有嚴格要求的業務場景。app
來看看具體的實現方式:less
首先,咱們須要自定義一個Appender,繼承自log4j的AppenderSkeleton類,實現方式以下:dom
public class AsyncJmqAppender extends AppenderSkeleton { @Resource(name = "messageProducer") private MessageProducer messageProducer; @Override protected void append(LoggingEvent loggingEvent) { asyncPushMessage(loggingEvent.getMessage()); } /** * 異步調用jmq輸出日誌 * @param message */ private void asyncPushMessage(Object message) { CompletableFuture.runAsync(() -> { Message messageConverted = (Message) message; try { messageProducer.send(messageConverted); } catch (JMQException e) { e.printStackTrace(); } }); } @Override public boolean requiresLayout() { return false; } @Override public void close() { } }
而後在log4j.xml中,爲此類進行配置:異步
<!--異步JMQ appender--> <appender name="async_mq_appender" class="com.jd.limitbuy.common.util.AsyncJmqAppender"> <!-- 設置File參數:日誌輸出文件名 --> <param name="File" value="D:/export/Instances/order/server1/logs/order.async.jmq" /> <!-- 設置是否在從新啓動服務時,在原有日誌的基礎添加新日誌 --> <param name="Append" value="true" /> <!-- 設置文件大小 --> <param name="MaxFileSize" value="10KB" /> <!-- 設置文件備份 --> <param name="MaxBackupIndex" value="10000" /> <!-- 設置輸出文件項目和格式 --> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%m%n" /> </layout> </appender> <logger name="async_mq_appender_logger"> <appender-ref ref="async_mq_appender"/> </logger>
最後就能夠按照以下的方式進行正常使用了:async
private static Logger logger = LoggerFactory.getLogger("filelog_appender_logger");
注意: 此處須要注意log4j的一個性能問題。在log4j的conversionPattern中,匹配符最好不要出現 C% L%通配符,壓測實踐代表,這兩個通配符會致使log4j打日誌的效率下降10倍。ide
方案一很簡便,且剝離了接口直接依賴mq致使的性能問題。可是沒法解決數據丟失的問題(可是咱們其實能夠在本地搞個策略落盤來不及處理的數據,能夠大大的減小數據丟失的概率)。可是不少的業務場景,是須要數據不丟失的,因此這就衍生出咱們的另外一套方案來。
方案二:增量消費log4j日誌
此種方式,是開啓worker在後臺增量消費log4j的日誌信息,和接口徹底脫離。此種方式相比方案一,能夠保證數據的不丟失,且能夠作到徹底不影響接口的ops。可是此種方式,因爲是後臺worker在後臺啓動進行掃描,會致使落庫的數據慢一些,好比一分鐘以後才落庫完畢。因此適用於對落庫數據實時性不高的場景。
具體的實現步驟以下:
首先,將須要進行增量消費的日誌統一打到一個文件夾,以天爲單位,天天生成一個帶時間戳日誌文件。因爲log4j不支持直接帶時間戳的日誌文件生成,因此這裏須要引入log4j.extras組件,而後配置log4j.xml以下:
以後在代碼中的申明方式以下:
private static Logger businessLogger = LoggerFactory.getLogger("file_rolling_logger");
最後在須要記錄日誌的地方使用方式以下:
businessLogger.error(JsonUtils.toJSONString(myMessage))
這樣就能夠將日誌打印到一個單獨的文件中,且按照日期,天天生成一個。
而後,當日志文件生成完畢後,咱們就能夠開啓咱們的worker進行增量消費了,這裏的增量消費方式,咱們選擇RandomAccessFile這個類來進行,因爲其獨特的位點讀取方式,可使得咱們很是方便的根據位點的位置來消費增量文件,從而避免了逐行讀取這種低效率的實現方式。
注意,爲每一個日誌文件都單首創建了一個位點文件,裏面存儲了對應的文件的位點讀取信息。當worker掃描開始的時候,會首先讀取位點文件裏面的位點信息,而後找到相應的日誌文件,從位點信息位置開始進行消費。這就是整個增量消費worker的核心。具體代碼實現以下(代碼太長,作了摺疊):
/** * @Description: 增量日誌掃描worker * @Detail: 此worker主要用來掃描增量日誌,日誌自己會在不停的插入中,此worker會不停的掃描此日誌來將數據上傳到kafka集羣 * @date 2018-04-08 10:30 */ public class LimitBuyScanWorker { /** * 日誌和位點文件保存的目錄 */ private static final String FILE_DIRECTORY = "D:\\export\\Instances\\order\\server1\\logs\\"; /** * 每次步進的長度,此處爲1000行 */ private static final int SCAN_STEP = 1000; /** * 日誌文件名前綴 */ private static final String LOG_FILE_PREFIX = "limitbuy.soa.order."; /** * 位點文件名後綴 */ private static final String OFT_FILE_APPENDIX = ".offset"; public void logScanner() { //當前時間 Date currentDate = new Date(); //今日 String currentDay = DateUtil.formatDate("yyyy-MM-dd", currentDate); //今日日誌文件路徑 String currentLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay; logger.error("今日的日誌文件路徑:" + currentLogFilePath); //今日位點文件路徑 String currentOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay + OFT_FILE_APPENDIX; //昨日 String yesterDay = DateUtil.formatDate("yyyy-MM-dd", DateUtil.queryPlusDay(currentDate, -1)); //昨日日誌文件路徑 String yesterdayLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay; logger.error("昨日的日誌文件路徑:" + yesterdayLogFilePath); //昨日位點文件路徑 String yesterdayOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay + OFT_FILE_APPENDIX; //先檢測昨日位點和文件體積是否一致,不一致則表明未消費完畢 boolean yesterdayConsumedOK = checkIfConsumeOK(yesterdayLogFilePath, yesterdayOffsetFilePath); logger.error("昨日的日誌文件已被消費完畢:" + yesterdayConsumedOK); //昨日的文件已掃描完畢 if (yesterdayConsumedOK) { //掃描並消費今日增量日誌 scanAndConsumeLog(currentLogFilePath, currentOffsetFilePath); } //昨日的文件未掃描完畢 else { //掃描並消費昨日增量日誌 scanAndConsumeLog(yesterdayLogFilePath, yesterdayOffsetFilePath); } } /** * 檢測日誌是否被掃描消費完畢,true:消費完畢;false:未消費完畢 * @Description 此舉主要防止log4j在零點大促開始的時候,忽然的滾動文件形成的部分增量日誌不會被消費的問題 * @param logFilePath * @param offsetFilePath */ private boolean checkIfConsumeOK(String logFilePath, String offsetFilePath) { try { //打開文件 RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r"); //獲得當前位點 long currentOffset = checkOffset(offsetFilePath); //獲得文件總長 long currentFileLength = randomAccessFile.length(); //比對 if (currentOffset >= currentFileLength) { return true; } return false; } catch (FileNotFoundException e) { logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(FileNotFoundException):", e); AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage()); return false; } catch (IOException e) { logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(IOException):", e); AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage()); return false; } } /** * 掃描並消費增量日誌 * @param logFilePath * @param offsetFilePath */ private void scanAndConsumeLog(String logFilePath, String offsetFilePath) { try { RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r"); //獲得當前位點 long currentOffset = checkOffset(offsetFilePath); logger.error("開始位點==>" + currentOffset); //重置位點到當前位點 if (currentOffset <= randomAccessFile.length()) { randomAccessFile.seek(currentOffset); } //讀取@SCAN_STEP行 for (long i = currentOffset; i < currentOffset + SCAN_STEP; i++) { //獲得行 String result = randomAccessFile.readLine(); //若是內容不爲空 if (StringUtil.isNotBlank(result)) { //TODO 邏輯實現 } } //讀取@SCAN_STEP行以後的位點 logger.error("讀取" + SCAN_STEP + "行以後位點==>" + randomAccessFile.getFilePointer()); //若是update不成功,能夠不處理,後面掃描進來從新過一遍便可 updateOffset(randomAccessFile.getFilePointer(), offsetFilePath); logger.error("文件總長==>" + randomAccessFile.length()); } catch (FileNotFoundException e) { logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(FileNotFoundException):", e); AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage()); } catch (IOException e) { logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(IOException):", e); AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage()); } } /** * 校驗位點 * 不存在則建立並賦值爲0 * 已存在則更新位點 * @param offsetFilePath * @return * @throws IOException */ private long checkOffset(String offsetFilePath) throws IOException { File offsetFile = new File(offsetFilePath); //若是位點文件不存在,則建立位點文件並返回0 if (!offsetFile.exists()) { updateOffset(0, offsetFilePath); return 0; } //若是位點文件存在,則返回位點文件內容 else { FileReader fileReader = new FileReader(offsetFilePath); StringBuilder stringBuilder = new StringBuilder(); char[] bytesChar = new char[50]; fileReader.read(bytesChar); fileReader.close(); for (char c : bytesChar) { stringBuilder.append(c); } String filteredOffset = stringBuilder.toString().trim(); if (StringUtil.isNotBlank(filteredOffset)) { return Long.parseLong(filteredOffset); } else { return 0; } } } /** * 更新位點信息 * @param offset * @param offsetFilePath */ private void updateOffset(long offset, String offsetFilePath) throws IOException { FileWriter fileWriter = new FileWriter(offsetFilePath); fileWriter.write(offset + ""); fileWriter.flush(); fileWriter.close(); } }
此種方式因爲worker掃描是每隔一段時間啓動一次進行消費,因此致使數據從產生到入庫,可能經歷時間超過一分鐘以上,可是在一些對數據延遲要求比較高的業務場景,好比庫存扣減,是不能容忍的,因此這裏咱們就引伸出第三種作法,基於內存文件隊列的異步日誌消費。
方案三:基於內存文件隊列的異步日誌消費
因爲方案一和方案二都嚴重依賴log4j,且方案自己都存在着要麼丟數據,要麼入庫時間長的缺點,因此都並非那麼盡如人意。可是本方案的作法,既解決了數據丟失的問題,又解決了數據入庫時間被拉長的尷尬,因此是終極解決之道。並且在大促銷過程當中,此種方式經歷了實戰檢驗,能夠大面積的推廣使用。
此方案中提到的內存文件隊列,是我司自研的一款基於RandomAccessFile和MappedByteBuffer實現的內存文件隊列。隊列核心使用了ArrayBlockingQueue,並提供了produce方法,進行數據入管道操做,提供了consume方法,進行數據出管道操做。並且後臺有一個worker一直啓動着,每隔5ms或者遍歷了100條數據以後,就將數據落盤一次,以防數據丟失。具體的設計,就這麼多,感興趣的能夠根據我提供的信息,本身實踐一下。
因爲有此中間件的加持,數據生產的時候,只須要入壓入管道,而後消費端進行消費便可。未被消費的數據,會進行落盤操做,謹防數據丟失。當大促的時候,大量數據涌來的時候,管道滿了的狀況下會阻塞接口,數據不會被拋棄。雖然可能會致使接口在那一瞬間無響應,可是因爲有落盤操做和消費操做(此操做操控的是JVM堆外內存數據,不受GC的影響,因此不會出現操做暫停的狀況,爲何呢?由於用了MappedByteBuffer),此種阻塞並未影響到接口總體的ops。
在實際使用的時候,ArrayBlockingQueue做爲核心隊列,顯然是全局加鎖的,後續咱們考慮升級爲無鎖隊列,因此將會參考Netty中的有界無鎖隊列:MpscArrayQueue。預計性能將會再好一些。
受限於公司政策,我僅提供大體思路,可是不會提供具體代碼,有問題評論區交流吧。
上面就是在進行異步日誌消費的時候,我所經歷的三個階段,而且一步一步的優化到目前的方式。雖然過程曲折,可是結果使人歡欣鼓舞。若是喜歡就給個推薦,後續我將會持續更新你所不知道的系列,以期達到拋磚引玉的效果。