假設場景:java
1. 針對一個高併發的應用,你是否會選擇打印訪問日誌?
2. 針對分佈式的應用,你是否會選擇將全部日誌打印到日誌中心?git
解決方案:github
1. 若是若是你選擇爲了性能,不打印日誌,那無可厚非。可是你得考慮清楚,出問題的時候是否可以作到快速排查?
2. 你以爲日誌分佈在各臺機器上很方便,那不用日誌中心也行!數據庫
若是,你仍是會選擇打印大量的訪問日誌,若是你仍是會選擇打印日誌到日誌中心,那麼本文對你有用!json
若是本身實現一個日誌中心,不說很難吧,也仍是要費很大力氣的,好比性能,好比容量大小!緩存
因此,本文選擇阿里雲的 loghub 做爲日誌中心,收集全部日誌!安全
loghub 常規操做:網絡
在提出本文主題以前,我們要看看loghub本身的方式,以及存在的問題!
在官方接入文檔裏,就建議我們使用 logProducer 接入。數據結構
其實 logProducer 已經作了太多的優化,好比當日志包數據達到必定數量,才統一進行發送,異步發送等等!併發
至於爲何還會存在本篇文章,則是因爲這些優化還不夠,好比 這些日誌發送仍然會影響業務性能,仍然會受到內存限制,仍然會搶佔大量cpu。。。
好吧,接入方式:
1. 引入maven依賴:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log-logback-appender</artifactId> <version>0.1.13</version> </dependency>
2. logback中添加appender:
<appender name="LOGHUB-APPENDER" class="appender:com.aliyun.openservices.log.logback.LoghubAppender"> <endpoint>${loghub.endpoint}</endpoint> <accessKeyId>${loghub.accessKeyId}</accessKeyId> <accessKey>${loghub.accessKey}</accessKey> <projectName>${loghub.projectName}</projectName> <logstore>test-logstore</logstore> <topic>${loghub.topic}</topic> <packageTimeoutInMS>1500</packageTimeoutInMS> <logsCountPerPackage>4096</logsCountPerPackage> <!-- 4718592=4M, 3145728=3M, 2097152=2M --> <logsBytesPerPackage>3145728</logsBytesPerPackage> <!-- 17179869184=2G(溢出丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M--> <memPoolSizeInByte>536870912</memPoolSizeInByte> <retryTimes>1</retryTimes> <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <root level="${logging.level}"> <appender-ref ref="STDOUT"/> <appender-ref ref="LOGHUB-APPENDER" /> </root>
3. 在代碼中進行日誌打印:
private static Logger logger = LoggerFactory.getLogger(MyClass.class); logger.warn("give me five: {}", name);
看似高效接入,存在的問題:
1. loghub日誌的發送是異步的沒錯,可是當發送網絡很慢時,將會出現大量內存堆積;
2. 堆積也不怕,如上配置,當堆積內存達到必定限度時,就不會再大了。他是怎麼辦到的?其實就是經過一個鎖,將後續全部請求所有阻塞了,這想一想都以爲可怕;
3. 網絡慢咱們能夠多開幾個發送線程嘛,是的,這樣能在必定程度上緩解發送問題,可是基本也無補,另外,日誌發送線程開多以後,線程的調度將會更可怕,而這只是一個無關緊要的功能而已啊;
針對以上問題,咱們能作什麼?
1. 去除沒必要要的日誌打印,這不是廢話嘛,能這麼幹早幹了!
2. 在網絡慢的時候,減小日誌打印;這有點牽強,不過能夠試試!
3. 直接使用異步線程進行日誌接收和發送,從根本上解決問題!
4. 若是使用異步線程進行發送,那麼當日志大量堆積怎麼辦?
5. 使用本地文件存儲須要進行發送的日誌,解決大量日誌堆積問題,待網絡暢通後,快速發送!
考慮到使用異步線程發送日誌、使用本地磁盤存儲大量日誌堆積,問題應該基本都解決了!
可是具體怎麼作呢?
如何異步?
如何存儲磁盤?
這些都是很現實的問題!
若是看到這裏,以爲很low的同窗,基本能夠撤了!
下面咱們來看看具體實施方案:
1. 如何異步?
能想像到的,基本就是使用一個隊列來接收日誌寫請求,而後,開另外的消費線程進行消費便可!
可是,這樣會有什麼問題?由於外部請求訪問進來,都是併發的,這個隊列得線程安全吧!用 synchronized ? 用阻塞隊列?
總之,看起來都會有一個並行轉串行的問題,這會給應用併發能力帶去打擊的!
因此,咱們得減輕這鎖的做用。咱們可使用多個隊列來解決這個問題,相似於分段鎖!若是併發能力不夠,則增長鎖數量便可!
提及來仍是很抽象吧,現成的代碼擼去吧!
1. 覆蓋原來的 logProducer 的 appender, 使用本身實現的appender, 主要就是解決異步問題:
<appender name="LOGHUB-APPENDER" class="com.test.AsyncLoghubAppender"> <endpoint>${loghub.endpoint}</endpoint> <accessKeyId>${loghub.accessKeyId}</accessKeyId> <accessKey>${loghub.accessKey}</accessKey> <projectName>${loghub.projectName}</projectName> <logstore>apollo-alarm</logstore> <topic>${loghub.topic}</topic> <packageTimeoutInMS>1500</packageTimeoutInMS> <logsCountPerPackage>4096</logsCountPerPackage> <!-- 4718592=4M, 3145728=3M, 2097152=2M --> <logsBytesPerPackage>3145728</logsBytesPerPackage> <!-- 17179869184=2G(溢出丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M--> <memPoolSizeInByte>536870912</memPoolSizeInByte> <retryTimes>1</retryTimes> <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender>
2. 接下來就是核心的異步實現: AsyncLoghubAppender
import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.classic.spi.StackTraceElementProxy; import ch.qos.logback.classic.spi.ThrowableProxyUtil; import ch.qos.logback.core.CoreConstants; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.util.IOUtils; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.logback.LoghubAppender; import com.aliyun.openservices.log.logback.LoghubAppenderCallback; import com.test.biz.cache.LocalDiskEnhancedQueueManager; import com.test.biz.cache.LocalDiskEnhancedQueueManagerFactory; import com.test.model.LoghubItemsWrapper; import com.taobao.notify.utils.threadpool.NamedThreadFactory; import org.joda.time.DateTime; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 異步寫loghub appender, 解決框架的appender 沒法承受高併發寫的問題 * */ public class AsyncLoghubAppender<E> extends LoghubAppender<E> { /** * put 線程,從業務線程接收消息過來 */ private ExecutorService puterExecutor; /** * 隊列搬運線程執行器 */ private ExecutorService takerExecutor; /** * mapdb 操做腳手架 */ private LocalDiskEnhancedQueueManager localDiskEnhancedQueueManager; /** * 日誌消息傳球手 */ private List<LinkedBlockingQueue<LoghubItemsWrapper>> distributeLogItemPoster; // puter 的線程數,與cpu核數保持一致 private final int puterThreadNum = 4; // taker 的線程數,能夠稍微少點 private final int takerThreadNum = 1; @Override public void start() { super.start(); // 開啓單個put 線程 doStart(); } private void doStart() { initMapDbQueue(); initPosterQueue(); startPutterThread(); startTakerThread(); } /** * 初始化 mapdb 數據庫 */ private void initMapDbQueue() { localDiskEnhancedQueueManager = LocalDiskEnhancedQueueManagerFactory.newMapDbQueue(); } /** * 初始化消息傳球手數據 */ private void initPosterQueue() { distributeLogItemPoster = new ArrayList<>(); for(int i = 0; i < puterThreadNum; i++) { distributeLogItemPoster.add(new LinkedBlockingQueue<>(10000000)); } } /** * 開啓 putter 線程組,此線程組不該慢於業務線程太多,不然致使內存溢出 */ private void startPutterThread() { puterExecutor = Executors.newFixedThreadPool(puterThreadNum, new NamedThreadFactory("Async-LoghubItemPoster")); for(int i = 0; i < puterThreadNum; i++) { puterExecutor.execute(new InnerQueuePuterThread(distributeLogItemPoster.get(i))); } } /** * 初始化取數線程組,此線程組能夠運行慢 */ private void startTakerThread() { takerExecutor = Executors.newFixedThreadPool(takerThreadNum, new NamedThreadFactory("Async-LoghubAppender")); for(int i = 0; i < takerThreadNum; i++) { takerExecutor.execute(new InnerQueueTakerThread()); } } @Override public void stop() { super.stop(); localDiskEnhancedQueueManager.close(); } // copy from parent @Override public void append(E eventObject) { try { appendEvent(eventObject); } catch (Exception e) { addError("Failed to append event.", e); } } /** * 優雅停機 */ public void shutdown() { puterExecutor.shutdown(); try { puterExecutor.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { addError("【日誌appender】loghub shutdown interupt", e); Thread.currentThread().interrupt(); } } // modify from parent private void appendEvent(E eventObject) { //init Event Object if (!(eventObject instanceof LoggingEvent)) { return; } LoggingEvent event = (LoggingEvent) eventObject; List<LogItem> logItems = new ArrayList<>(); LogItem item = new LogItem(); logItems.add(item); item.SetTime((int) (event.getTimeStamp() / 1000)); DateTime dateTime = new DateTime(event.getTimeStamp()); item.PushBack("time", dateTime.toString(formatter)); item.PushBack("level", event.getLevel().toString()); item.PushBack("thread", event.getThreadName()); StackTraceElement[] caller = event.getCallerData(); if (caller != null && caller.length > 0) { item.PushBack("location", caller[0].toString()); } String message = event.getFormattedMessage(); item.PushBack("message", message); IThrowableProxy iThrowableProxy = event.getThrowableProxy(); if (iThrowableProxy != null) { String throwable = getExceptionInfo(iThrowableProxy); throwable += fullDump(event.getThrowableProxy().getStackTraceElementProxyArray()); item.PushBack("throwable", throwable); } if (this.encoder != null) { // 框架也未處理好該問題,暫時忽略 // item.PushBack("log", new String(this.encoder.encode(eventObject))); } LoghubItemsWrapper itemWrapper = new LoghubItemsWrapper(); itemWrapper.setLogItemList(logItems); putItemToPoster(itemWrapper); } /** * 將隊列放入 poster 中 * * @param itemsWrapper 日誌信息 */ private void putItemToPoster(LoghubItemsWrapper itemsWrapper) { try { LinkedBlockingQueue<LoghubItemsWrapper> selectedQueue = getLoadBalancedQueue(); selectedQueue.put(itemsWrapper); } catch (InterruptedException e) { addError("【日誌appender】放入隊列中斷"); Thread.currentThread().interrupt(); } } /** * 選擇一個隊列進行日誌放入 * * @return 隊列容器 */ private LinkedBlockingQueue<LoghubItemsWrapper> getLoadBalancedQueue() { long selectQueueIndex = System.nanoTime() % distributeLogItemPoster.size(); return distributeLogItemPoster.get((int) selectQueueIndex); } // copy from parent private String fullDump(StackTraceElementProxy[] stackTraceElementProxyArray) { StringBuilder builder = new StringBuilder(); for (StackTraceElementProxy step : stackTraceElementProxyArray) { builder.append(CoreConstants.LINE_SEPARATOR); String string = step.toString(); builder.append(CoreConstants.TAB).append(string); ThrowableProxyUtil.subjoinPackagingData(builder, step); } return builder.toString(); } // copy from parent private String getExceptionInfo(IThrowableProxy iThrowableProxy) { String s = iThrowableProxy.getClassName(); String message = iThrowableProxy.getMessage(); return (message != null) ? (s + ": " + message) : s; } class InnerQueuePuterThread implements Runnable { private LinkedBlockingQueue<LoghubItemsWrapper> queue; public InnerQueuePuterThread(LinkedBlockingQueue<LoghubItemsWrapper> queue) { this.queue = queue; } @Override public void run() { // put the item to mapdb while (!Thread.interrupted()) { LoghubItemsWrapper itemsWrapper = null; try { itemsWrapper = queue.take(); } catch (InterruptedException e) { addError("【日誌appender】poster隊列中斷"); Thread.currentThread().interrupt(); } if(itemsWrapper != null) { flushLogItemToMapDb(itemsWrapper); } } } /** * 將內存隊列存儲到 mapdb 中, 由消費線程獲取 * * @param itemsWrapper 日誌信息 */ private void flushLogItemToMapDb(LoghubItemsWrapper itemsWrapper) { byte[] itemBytes = JSONObject.toJSONBytes(itemsWrapper.getLogItemList()); localDiskEnhancedQueueManager.push(itemBytes); } } /** * for debug, profiler for mapdb */ private static final AtomicLong takerCounter = new AtomicLong(0); class InnerQueueTakerThread implements Runnable { @Override public void run() { long startTime = System.currentTimeMillis(); while (!Thread.interrupted()) { //item = fullLogQueues.take(); // take items without lock try { while (localDiskEnhancedQueueManager.isEmpty()) { Thread.sleep(100L); } } catch (InterruptedException e) { addError("【日誌appender】中斷異常", e); Thread.currentThread().interrupt(); break; } byte[] itemBytes = localDiskEnhancedQueueManager.pollFirstItem(); try { if(itemBytes != null && itemBytes != localDiskEnhancedQueueManager.EMPTY_VALUE_BYTE_ARRAY) { List<LogItem> itemWrapper = JSONObject.parseArray( new String(itemBytes, IOUtils.UTF8), LogItem.class); if(itemWrapper != null) { doSend(itemWrapper); } } else { // 若是數據不爲空,且一直在循環,說明存在異常,暫時處理爲重置隊列,但應從根本上解決問題 localDiskEnhancedQueueManager.reset(); } } catch (Exception e) { addError("【日誌appender】json解析異常", e); } // for debug test, todo: 上線時去除該代碼 if(takerCounter.incrementAndGet() % 1000 == 0) { System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + ": per 1000 items took time: " + (System.currentTimeMillis() - startTime) + " ms."); startTime = System.currentTimeMillis(); } } } /** * 發送數據邏輯,主要爲 loghub * * @param item logItem */ private void doSend(List<LogItem> item) { AsyncLoghubAppender.this.doSendToLoghub(item); } } /** * 發送數據邏輯,loghub * * @param item logItem */ private void doSendToLoghub(List<LogItem> item) { producer.send(projectConfig.projectName, logstore, topic, source, item, new LoghubAppenderCallback<>(AsyncLoghubAppender.this, projectConfig.projectName, logstore, topic, source, item)); } }
如上實現,簡單說明下:
1. 開啓n個消費線程的 distributeLogItemPoster 阻塞隊列,用於接收業務線程發來的日誌請求;
2. 開啓n個消費線程, 將從業務線程接收過來的請求隊列,放入磁盤隊列中,從而避免可能內存溢出;
3. 開啓m個taker線程,從磁盤隊列中取出數據,進行loghub的發送任務;
如上,咱們已經徹底將日誌的發送任務轉移到異步來處理了!
可是,這樣真的就ok了嗎?磁盤隊列是什麼?可靠嗎?性能如何?線程安全嗎?
2. 如何存儲磁盤隊列?
好吧。我們這裏使用的是 mapdb 來實現的磁盤隊列, mapdb 的 github star數超3k, 應該仍是不錯了!
可是,它更多的是用來作磁盤緩存,隊列並無過多關注,無論怎麼樣,咱們仍是能夠選擇的!
mapdb項目地址: https://github.com/jankotek/mapdb
其實mapdb有幾個現成的隊列可用: IndexTreeList, TreeSet. 可是咱們仔細看下他的官宣,看到這些數據結構只支持少許數據時的存儲,在數據量巨大以後,性能徹底沒法保證,甚至 poll 一個元素須要1s+ 。
因此,還得拋棄其隊列實現,只是本身實現一個了,其 HashTree 是個不錯的選擇, 使用 HashTree 來實現隊列,惟一的難點在於,如何進行元素迭代;(你們不仿先自行思考下)
下面咱們來看下個人一個實現方式:
import com.test.biz.cache.LocalDiskEnhancedQueueManager; import com.taobao.notify.utils.threadpool.NamedThreadFactory; import org.mapdb.BTreeMap; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.NavigableSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * MapDb 實現的內存隊列工具類 * */ public class LocalDiskEnhancedQueueManagerMapDbImpl implements LocalDiskEnhancedQueueManager { private static final Logger logger = LoggerFactory.getLogger(LocalDiskEnhancedQueueManagerMapDbImpl.class); /** * 默認存儲文件 */ private final String DEFAULT_DB_FILE = "/opt/mapdb/logappender.db"; /** * 隊列名 */ private final String LOG_ITEM_LIST_TABLE = "hub_log_appender"; private final String LOG_ITEM_TREE_SET_TABLE = "hub_log_appender_tree_set"; private final String LOG_ITEM_HASH_MAP_TABLE = "hub_log_appender_hash_map"; private final String LOG_ITEM_BTREE_TABLE = "hub_log_appender_btree"; private final String QUEUE_OFFSET_HOLDER_BTREE_TABLE = "queue_offset_holder_btree_table"; /** * db 實例 */ private final DB mapDb; // private IndexTreeList<byte[]> indexTreeListQueue; /** * 僞裝是個隊列 */ private NavigableSet<byte[]> treeSetQueue; private BTreeMap<byte[], Byte> bTreeQueue; private ConcurrentMap<Long, byte[]> concurrentMapQueue; /** * 隊列偏移量持有器, 對於小容量的節點使用 btree 處理很好 */ private BTreeMap<String, Long> queueOffsetDiskHolder; /** * 讀隊列偏移器, jvm 運行時使用該值, 該值被定時刷新到 mapdb 中 * * 會有部分數據重複狀況 * */ private AtomicLong readerOfQueueOffsetJvmHolder; /** * 寫隊列偏移器, jvm 運行時使用該值, 該值被定時刷新到 mapdb 中 * * 會有部分數據重複狀況 */ private AtomicLong writerOfQueueOffsetJvmHolder; private final String readerOffsetCacheKeyName = "loghub_appender_queue_key_read_offset"; private final String writerOffsetCacheKeyName = "loghub_appender_queue_key_write_offset"; /** * mapdb 構造方法,給出隊列持有者 * */ public LocalDiskEnhancedQueueManagerMapDbImpl() { mapDb = DBMaker.fileDB(getDbFilePath()) .checksumHeaderBypass() .closeOnJvmShutdown() .fileChannelEnable() .fileMmapEnableIfSupported() // 嘗試修復刪除元素後磁盤文件大小不變化的bug .cleanerHackEnable() .concurrencyScale(128) .make(); initQueueOffsetHolder(); initQueueOwner(); initCleanUselessSpaceJob(); } /** * 初始化隊列偏移器 */ private void initQueueOffsetHolder() { queueOffsetDiskHolder = mapDb.treeMap(QUEUE_OFFSET_HOLDER_BTREE_TABLE, Serializer.STRING, Serializer.LONG) .createOrOpen(); initQueueReaderOffset(); initQueueWriterOffset(); } /** * 初始化讀偏移數據 */ private void initQueueReaderOffset() { Long readerQueueOffsetFromDisk = queueOffsetDiskHolder.get(readerOffsetCacheKeyName); if(readerQueueOffsetFromDisk == null) { readerOfQueueOffsetJvmHolder = new AtomicLong(1); } else { readerOfQueueOffsetJvmHolder = new AtomicLong(readerQueueOffsetFromDisk); } } /** * 初始化寫偏移數據 */ private void initQueueWriterOffset() { Long writerQueueOffsetFromDisk = queueOffsetDiskHolder.get(writerOffsetCacheKeyName); if(writerQueueOffsetFromDisk == null) { writerOfQueueOffsetJvmHolder = new AtomicLong(1); } else { writerOfQueueOffsetJvmHolder = new AtomicLong(writerQueueOffsetFromDisk); } } /** * 刷入最新的讀偏移 */ private void flushQueueReaderOffset() { queueOffsetDiskHolder.put(readerOffsetCacheKeyName, readerOfQueueOffsetJvmHolder.get()); } /** * 刷入最新的讀偏移 */ private void flushQueueWriterOffset() { queueOffsetDiskHolder.put(writerOffsetCacheKeyName, writerOfQueueOffsetJvmHolder.get()); } /** * 初始化隊列容器 */ private void initQueueOwner() { // indexTreeListQueue = db.indexTreeList(LOG_ITEM_LIST_TABLE, Serializer.BYTE_ARRAY).createOrOpen(); // bTreeQueue = mapDb.treeMap(LOG_ITEM_BTREE_TABLE, // Serializer.BYTE_ARRAY, Serializer.BYTE) // .counterEnable() // .valuesOutsideNodesEnable() // .createOrOpen(); // treeSetQueue = mapDb.treeSet(LOG_ITEM_TREE_SET_TABLE, Serializer.BYTE_ARRAY) // .createOrOpen(); concurrentMapQueue = mapDb.hashMap(LOG_ITEM_HASH_MAP_TABLE, Serializer.LONG, Serializer.BYTE_ARRAY) .counterEnable() // 當處理能力不好時,就將該日誌打印丟掉 .expireMaxSize(100 * 10000 * 10000L) // 3小時後還沒消費就過時了 .expireAfterCreate(3L, TimeUnit.HOURS) .expireAfterGet() .createOrOpen(); } /** * 清理無用空間,如磁盤文件等等 */ private void initCleanUselessSpaceJob() { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Async-MapDbSpaceCleaner")); // 每過10分鐘清理一次無用空間,看狀況調整 scheduledExecutorService.scheduleAtFixedRate(() -> { mapDb.getStore().compact(); }, 0L, 10L, TimeUnit.MINUTES); // 每過10s刷入一次讀寫偏移,容許重複和丟失 scheduledExecutorService.scheduleAtFixedRate(() -> { flushQueueWriterOffset(); flushQueueReaderOffset(); }, 30L, 10L, TimeUnit.SECONDS); } /** * 獲取文件存儲位置,考慮後續擴展被子類覆蓋 * * @return db文件地址 */ protected String getDbFilePath() { return DEFAULT_DB_FILE; } /** * 獲取下一個隊列讀編號 (確保準確性可承受,性能可承受) * * @return 隊列編號 */ private long getNextReaderId() { return readerOfQueueOffsetJvmHolder.incrementAndGet(); } /** * 獲取下一個隊列寫編號 (確保準確性可承受,性能可承受) * * @return 隊列編號 */ private long getNextWriterId() { return writerOfQueueOffsetJvmHolder.incrementAndGet(); } @Override public boolean push(byte[] itemBytes) { // return indexTreeListQueue.add(itemBytes); // bTreeQueue.put(itemBytes, (byte)1 ); // treeSetQueue.add(itemBytes); concurrentMapQueue.put(getNextWriterId(), itemBytes); return true; } @Override public byte[] pollFirstItem() { // 使用時不得使用修改元素方法 // return indexTreeListQueue.remove(index); // Map.Entry<byte[], Byte> entry = bTreeQueue.pollFirstEntry(); // return treeSetQueue.pollFirst(); return concurrentMapQueue.remove(getNextReaderId()); } @Override public boolean isEmpty() { // 隊列爲空,不必定表明就沒有可供讀取的元素了,由於 counter 可能落後於併發寫操做了 // 隊列不爲空,不必定表明就必定有可供讀取的元素,由於 counter 可能落後於併發 remove 操做了 // 當讀指針等於寫指針時,則表明全部元素已被讀取完成,應該是比較準確的空斷定標準 return concurrentMapQueue.isEmpty() || readerOfQueueOffsetJvmHolder.get() == writerOfQueueOffsetJvmHolder.get(); } @Override public void close() { flushQueueWriterOffset(); flushQueueReaderOffset(); mapDb.close(); } @Override public void reset() { concurrentMapQueue.clear(); // 同步兩個值,非準確的 readerOfQueueOffsetJvmHolder.set(writerOfQueueOffsetJvmHolder.get()); logger.error("【mapdb緩存】讀寫指針衝突,強制重置指針,請注意排查併發問題. reader:{}, writer:{}", readerOfQueueOffsetJvmHolder.get(), writerOfQueueOffsetJvmHolder.get()); } }
如上,就是使用 mapdb的hashMap 實現了磁盤隊列功能,主要思路以下:
1. 使用一個long的自增數據做爲 hashMap 的key,將隊列存入value中;
2. 使用另外一個 long 的自增指針作爲讀key, 依次讀取數據;
3. 讀寫指針都按期刷入磁盤,以防出異常crash時沒法恢復;
4. 當實在出現了未預料的bug時,容許直接丟棄衝突日誌,從一個新的讀取點開始新的工做;
最後,再加一個工廠類,生成mapdb隊列實例: LocalDiskEnhancedQueueManagerFactory
import com.test.biz.cache.impl.LocalDiskEnhancedQueueManagerMapDbImpl; /** * 本地磁盤隊列等實例工廠類 * */ public class LocalDiskEnhancedQueueManagerFactory { /** * 生產一個mapDb實現的隊列實例 * * @return mapdb 隊列實例 */ public static LocalDiskEnhancedQueueManager newMapDbQueue() { return new LocalDiskEnhancedQueueManagerMapDbImpl(); } /** * 生產一個使用 ehcache 實現的隊列實例 * * @return ehcache 隊列實例 */ public static LocalDiskEnhancedQueueManager newEhcacheQueue() { // 有興趣的同窗能夠實現下 return null; } /** * 生產一個使用 fqueue 實現的隊列實例 * * @return fqueue 隊列實例 */ public static LocalDiskEnhancedQueueManager newFQueueQueue() { // 有興趣的同窗能夠實現下, 不過不太建議 return null; } /** * 生產一個使用 本身直接寫磁盤文件 實現的隊列實例 * * @return file 隊列實例 */ public static LocalDiskEnhancedQueueManager newOwnFileQueue() { // 有興趣的同窗能夠挑戰下 return null; } }
這樣,咱們就實現了一個既能知足高併發場景下的日誌打印需求了吧。業務線程優先,日誌線程異步化、可丟棄、cpu佔用少、內存不限制。
老話: 優化之路,道阻且長!