如何構建「高性能」「大小無限」(磁盤)隊列?

假設場景:java

  1. 針對一個高併發的應用,你是否會選擇打印訪問日誌?
  2. 針對分佈式的應用,你是否會選擇將全部日誌打印到日誌中心?git

解決方案:github

  1. 若是若是你選擇爲了性能,不打印日誌,那無可厚非。可是你得考慮清楚,出問題的時候是否可以作到快速排查?
  2. 你以爲日誌分佈在各臺機器上很方便,那不用日誌中心也行!數據庫

  若是,你仍是會選擇打印大量的訪問日誌,若是你仍是會選擇打印日誌到日誌中心,那麼本文對你有用!json

  若是本身實現一個日誌中心,不說很難吧,也仍是要費很大力氣的,好比性能,好比容量大小!緩存

  因此,本文選擇阿里雲的 loghub 做爲日誌中心,收集全部日誌!安全

loghub 常規操做:網絡

  在提出本文主題以前,我們要看看loghub本身的方式,以及存在的問題!
  在官方接入文檔裏,就建議我們使用 logProducer 接入。數據結構

  其實 logProducer 已經作了太多的優化,好比當日志包數據達到必定數量,才統一進行發送,異步發送等等!併發

  至於爲何還會存在本篇文章,則是因爲這些優化還不夠,好比 這些日誌發送仍然會影響業務性能,仍然會受到內存限制,仍然會搶佔大量cpu。。。

  好吧,接入方式:

  1. 引入maven依賴:

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log-logback-appender</artifactId>
            <version>0.1.13</version>
        </dependency>

 

  2. logback中添加appender:

    <appender name="LOGHUB-APPENDER" class="appender:com.aliyun.openservices.log.logback.LoghubAppender">
        <endpoint>${loghub.endpoint}</endpoint>
        <accessKeyId>${loghub.accessKeyId}</accessKeyId>
        <accessKey>${loghub.accessKey}</accessKey>
        <projectName>${loghub.projectName}</projectName>
        <logstore>test-logstore</logstore>
        <topic>${loghub.topic}</topic>
        <packageTimeoutInMS>1500</packageTimeoutInMS>
        <logsCountPerPackage>4096</logsCountPerPackage>
        <!-- 4718592=4M, 3145728=3M, 2097152=2M -->
        <logsBytesPerPackage>3145728</logsBytesPerPackage>
        <!-- 17179869184=2G(溢出丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M-->
        <memPoolSizeInByte>536870912</memPoolSizeInByte>
        <retryTimes>1</retryTimes>
        <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
    </appender>
    <root level="${logging.level}">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="LOGHUB-APPENDER" />
    </root>

 

  3. 在代碼中進行日誌打印:

    private static Logger logger = LoggerFactory.getLogger(MyClass.class);
    logger.warn("give me five: {}", name);

 

看似高效接入,存在的問題:

  1. loghub日誌的發送是異步的沒錯,可是當發送網絡很慢時,將會出現大量內存堆積;
  2. 堆積也不怕,如上配置,當堆積內存達到必定限度時,就不會再大了。他是怎麼辦到的?其實就是經過一個鎖,將後續全部請求所有阻塞了,這想一想都以爲可怕;
  3. 網絡慢咱們能夠多開幾個發送線程嘛,是的,這樣能在必定程度上緩解發送問題,可是基本也無補,另外,日誌發送線程開多以後,線程的調度將會更可怕,而這只是一個無關緊要的功能而已啊;

 

針對以上問題,咱們能作什麼?

  1. 去除沒必要要的日誌打印,這不是廢話嘛,能這麼幹早幹了!
  2. 在網絡慢的時候,減小日誌打印;這有點牽強,不過能夠試試!
  3. 直接使用異步線程進行日誌接收和發送,從根本上解決問題!
  4. 若是使用異步線程進行發送,那麼當日志大量堆積怎麼辦?
  5. 使用本地文件存儲須要進行發送的日誌,解決大量日誌堆積問題,待網絡暢通後,快速發送!

 

  考慮到使用異步線程發送日誌、使用本地磁盤存儲大量日誌堆積,問題應該基本都解決了!
  可是具體怎麼作呢?
  如何異步?
  如何存儲磁盤?

  這些都是很現實的問題!

  若是看到這裏,以爲很low的同窗,基本能夠撤了!

 

下面咱們來看看具體實施方案:

1. 如何異步?

  能想像到的,基本就是使用一個隊列來接收日誌寫請求,而後,開另外的消費線程進行消費便可!

  可是,這樣會有什麼問題?由於外部請求訪問進來,都是併發的,這個隊列得線程安全吧!用 synchronized ? 用阻塞隊列?

  總之,看起來都會有一個並行轉串行的問題,這會給應用併發能力帶去打擊的!

  因此,咱們得減輕這鎖的做用。咱們可使用多個隊列來解決這個問題,相似於分段鎖!若是併發能力不夠,則增長鎖數量便可!

  提及來仍是很抽象吧,現成的代碼擼去吧!

  1. 覆蓋原來的 logProducer 的 appender, 使用本身實現的appender, 主要就是解決異步問題:

    <appender name="LOGHUB-APPENDER" class="com.test.AsyncLoghubAppender">
        <endpoint>${loghub.endpoint}</endpoint>
        <accessKeyId>${loghub.accessKeyId}</accessKeyId>
        <accessKey>${loghub.accessKey}</accessKey>
        <projectName>${loghub.projectName}</projectName>
        <logstore>apollo-alarm</logstore>
        <topic>${loghub.topic}</topic>
        <packageTimeoutInMS>1500</packageTimeoutInMS>
        <logsCountPerPackage>4096</logsCountPerPackage>
        <!-- 4718592=4M, 3145728=3M, 2097152=2M -->
        <logsBytesPerPackage>3145728</logsBytesPerPackage>
        <!-- 17179869184=2G(溢出丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M-->
        <memPoolSizeInByte>536870912</memPoolSizeInByte>
        <retryTimes>1</retryTimes>
        <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
    </appender>

 

  2. 接下來就是核心的異步實現: AsyncLoghubAppender

import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.classic.spi.StackTraceElementProxy;
import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.util.IOUtils;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.logback.LoghubAppender;
import com.aliyun.openservices.log.logback.LoghubAppenderCallback;
import com.test.biz.cache.LocalDiskEnhancedQueueManager;
import com.test.biz.cache.LocalDiskEnhancedQueueManagerFactory;
import com.test.model.LoghubItemsWrapper;
import com.taobao.notify.utils.threadpool.NamedThreadFactory;
import org.joda.time.DateTime;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 異步寫loghub appender, 解決框架的appender 沒法承受高併發寫的問題
 *
 */
public class AsyncLoghubAppender<E> extends LoghubAppender<E> {

    /**
     * put 線程,從業務線程接收消息過來
     */
    private ExecutorService puterExecutor;

    /**
     * 隊列搬運線程執行器
     */
    private ExecutorService takerExecutor;

    /**
     * mapdb 操做腳手架
     */
    private LocalDiskEnhancedQueueManager localDiskEnhancedQueueManager;

    /**
     * 日誌消息傳球手
     */
    private List<LinkedBlockingQueue<LoghubItemsWrapper>> distributeLogItemPoster;

    // puter 的線程數,與cpu核數保持一致
    private final int puterThreadNum = 4;

    // taker 的線程數,能夠稍微少點
    private final int takerThreadNum = 1;

    @Override
    public void start() {
        super.start();
        // 開啓單個put 線程
        doStart();
    }

    private void doStart() {
        initMapDbQueue();
        initPosterQueue();
        startPutterThread();
        startTakerThread();
    }

    /**
     * 初始化 mapdb 數據庫
     */
    private void initMapDbQueue() {
        localDiskEnhancedQueueManager = LocalDiskEnhancedQueueManagerFactory.newMapDbQueue();
    }

    /**
     * 初始化消息傳球手數據
     */
    private void initPosterQueue() {
        distributeLogItemPoster = new ArrayList<>();
        for(int i = 0; i < puterThreadNum; i++) {
            distributeLogItemPoster.add(new LinkedBlockingQueue<>(10000000));
        }
    }

    /**
     * 開啓 putter 線程組,此線程組不該慢於業務線程太多,不然致使內存溢出
     */
    private void startPutterThread() {
        puterExecutor = Executors.newFixedThreadPool(puterThreadNum,
                new NamedThreadFactory("Async-LoghubItemPoster"));
        for(int i = 0; i < puterThreadNum; i++) {
            puterExecutor.execute(new InnerQueuePuterThread(distributeLogItemPoster.get(i)));
        }
    }

    /**
     * 初始化取數線程組,此線程組能夠運行慢
     */
    private void startTakerThread() {
        takerExecutor = Executors.newFixedThreadPool(takerThreadNum,
                new NamedThreadFactory("Async-LoghubAppender"));
        for(int i = 0; i < takerThreadNum; i++) {
            takerExecutor.execute(new InnerQueueTakerThread());
        }
    }

    @Override
    public void stop() {
        super.stop();
        localDiskEnhancedQueueManager.close();
    }
    
    // copy from parent
    @Override
    public void append(E eventObject) {
        try {
            appendEvent(eventObject);
        } catch (Exception e) {
            addError("Failed to append event.", e);
        }
    }

    /**
     * 優雅停機
     */
    public void shutdown() {
        puterExecutor.shutdown();
        try {
            puterExecutor.awaitTermination(60, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            addError("【日誌appender】loghub shutdown interupt", e);
            Thread.currentThread().interrupt();
        }
    }

    // modify from parent
    private void appendEvent(E eventObject) {
        //init Event Object
        if (!(eventObject instanceof LoggingEvent)) {
            return;
        }
        LoggingEvent event = (LoggingEvent) eventObject;

        List<LogItem> logItems = new ArrayList<>();
        LogItem item = new LogItem();
        logItems.add(item);
        item.SetTime((int) (event.getTimeStamp() / 1000));

        DateTime dateTime = new DateTime(event.getTimeStamp());
        item.PushBack("time", dateTime.toString(formatter));
        item.PushBack("level", event.getLevel().toString());
        item.PushBack("thread", event.getThreadName());

        StackTraceElement[] caller = event.getCallerData();
        if (caller != null && caller.length > 0) {
            item.PushBack("location", caller[0].toString());
        }

        String message = event.getFormattedMessage();
        item.PushBack("message", message);

        IThrowableProxy iThrowableProxy = event.getThrowableProxy();
        if (iThrowableProxy != null) {
            String throwable = getExceptionInfo(iThrowableProxy);
            throwable += fullDump(event.getThrowableProxy().getStackTraceElementProxyArray());
            item.PushBack("throwable", throwable);
        }

        if (this.encoder != null) {
            // 框架也未處理好該問題,暫時忽略
//            item.PushBack("log", new String(this.encoder.encode(eventObject)));
        }

        LoghubItemsWrapper itemWrapper = new LoghubItemsWrapper();
        itemWrapper.setLogItemList(logItems);
        putItemToPoster(itemWrapper);

    }

    /**
     * 將隊列放入 poster 中
     *
     * @param itemsWrapper 日誌信息
     */
    private void putItemToPoster(LoghubItemsWrapper itemsWrapper) {
        try {
            LinkedBlockingQueue<LoghubItemsWrapper> selectedQueue = getLoadBalancedQueue();
            selectedQueue.put(itemsWrapper);
        } catch (InterruptedException e) {
            addError("【日誌appender】放入隊列中斷");
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 選擇一個隊列進行日誌放入
     *
     * @return 隊列容器
     */
    private LinkedBlockingQueue<LoghubItemsWrapper> getLoadBalancedQueue() {
        long selectQueueIndex = System.nanoTime() % distributeLogItemPoster.size();
        return distributeLogItemPoster.get((int) selectQueueIndex);
    }

    // copy from parent
    private String fullDump(StackTraceElementProxy[] stackTraceElementProxyArray) {
        StringBuilder builder = new StringBuilder();
        for (StackTraceElementProxy step : stackTraceElementProxyArray) {
            builder.append(CoreConstants.LINE_SEPARATOR);
            String string = step.toString();
            builder.append(CoreConstants.TAB).append(string);
            ThrowableProxyUtil.subjoinPackagingData(builder, step);
        }
        return builder.toString();
    }

    // copy from parent
    private String getExceptionInfo(IThrowableProxy iThrowableProxy) {
        String s = iThrowableProxy.getClassName();
        String message = iThrowableProxy.getMessage();
        return (message != null) ? (s + ": " + message) : s;
    }

    class InnerQueuePuterThread implements Runnable {

        private LinkedBlockingQueue<LoghubItemsWrapper> queue;

        public InnerQueuePuterThread(LinkedBlockingQueue<LoghubItemsWrapper> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            // put the item to mapdb
            while (!Thread.interrupted()) {
                LoghubItemsWrapper itemsWrapper = null;
                try {
                    itemsWrapper = queue.take();
                } catch (InterruptedException e) {
                    addError("【日誌appender】poster隊列中斷");
                    Thread.currentThread().interrupt();
                }
                if(itemsWrapper != null) {
                    flushLogItemToMapDb(itemsWrapper);
                }
            }
        }

        /**
         * 將內存隊列存儲到 mapdb 中, 由消費線程獲取
         *
         * @param itemsWrapper 日誌信息
         */
        private void flushLogItemToMapDb(LoghubItemsWrapper itemsWrapper) {
            byte[] itemBytes = JSONObject.toJSONBytes(itemsWrapper.getLogItemList());
            localDiskEnhancedQueueManager.push(itemBytes);
        }
    }

    /**
     * for debug, profiler for mapdb
     */
    private static final AtomicLong takerCounter = new AtomicLong(0);

    class InnerQueueTakerThread implements Runnable {

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            while (!Thread.interrupted()) {
                //item = fullLogQueues.take();      // take items without lock
                try {
                    while (localDiskEnhancedQueueManager.isEmpty()) {
                        Thread.sleep(100L);
                    }
                }
                catch (InterruptedException e) {
                    addError("【日誌appender】中斷異常", e);
                    Thread.currentThread().interrupt();
                    break;
                }
                byte[] itemBytes = localDiskEnhancedQueueManager.pollFirstItem();
                try {
                    if(itemBytes != null
                            && itemBytes != localDiskEnhancedQueueManager.EMPTY_VALUE_BYTE_ARRAY) {
                        List<LogItem> itemWrapper = JSONObject.parseArray(
                                new String(itemBytes, IOUtils.UTF8),
                                LogItem.class);
                        if(itemWrapper != null) {
                            doSend(itemWrapper);
                        }
                    }
                    else {
                        // 若是數據不爲空,且一直在循環,說明存在異常,暫時處理爲重置隊列,但應從根本上解決問題
                        localDiskEnhancedQueueManager.reset();
                    }
                }
                catch (Exception e) {
                    addError("【日誌appender】json解析異常", e);
                }
                // for debug test, todo: 上線時去除該代碼
                if(takerCounter.incrementAndGet() % 1000 == 0) {
                    System.out.println(LocalDateTime.now() + " - "
                            + Thread.currentThread().getName() + ": per 1000 items took time: "
                            + (System.currentTimeMillis() - startTime) + " ms.");
                    startTime = System.currentTimeMillis();
                }
            }
        }

        /**
         * 發送數據邏輯,主要爲 loghub
         *
         * @param item logItem
         */
        private void doSend(List<LogItem> item) {
            AsyncLoghubAppender.this.doSendToLoghub(item);
        }
    }

    /**
     * 發送數據邏輯,loghub
     *
     * @param item logItem
     */
    private void doSendToLoghub(List<LogItem> item) {
        producer.send(projectConfig.projectName, logstore, topic, source, item,
                new LoghubAppenderCallback<>(AsyncLoghubAppender.this,
                        projectConfig.projectName, logstore, topic, source, item));
    }

}

  如上實現,簡單說明下:

  1. 開啓n個消費線程的 distributeLogItemPoster 阻塞隊列,用於接收業務線程發來的日誌請求;
  2. 開啓n個消費線程, 將從業務線程接收過來的請求隊列,放入磁盤隊列中,從而避免可能內存溢出;
  3. 開啓m個taker線程,從磁盤隊列中取出數據,進行loghub的發送任務;

  如上,咱們已經徹底將日誌的發送任務轉移到異步來處理了!

  可是,這樣真的就ok了嗎?磁盤隊列是什麼?可靠嗎?性能如何?線程安全嗎?

 

2. 如何存儲磁盤隊列?

  好吧。我們這裏使用的是 mapdb 來實現的磁盤隊列, mapdb 的 github star數超3k, 應該仍是不錯了!

  可是,它更多的是用來作磁盤緩存,隊列並無過多關注,無論怎麼樣,咱們仍是能夠選擇的!

  mapdb項目地址: https://github.com/jankotek/mapdb

  其實mapdb有幾個現成的隊列可用: IndexTreeList, TreeSet. 可是咱們仔細看下他的官宣,看到這些數據結構只支持少許數據時的存儲,在數據量巨大以後,性能徹底沒法保證,甚至 poll 一個元素須要1s+ 。

  因此,還得拋棄其隊列實現,只是本身實現一個了,其 HashTree 是個不錯的選擇, 使用 HashTree 來實現隊列,惟一的難點在於,如何進行元素迭代;(你們不仿先自行思考下)

  下面咱們來看下個人一個實現方式:

import com.test.biz.cache.LocalDiskEnhancedQueueManager;
import com.taobao.notify.utils.threadpool.NamedThreadFactory;
import org.mapdb.BTreeMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.NavigableSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * MapDb 實現的內存隊列工具類
 *
 */
public class LocalDiskEnhancedQueueManagerMapDbImpl implements LocalDiskEnhancedQueueManager {

    private static final Logger logger = LoggerFactory.getLogger(LocalDiskEnhancedQueueManagerMapDbImpl.class);

    /**
     * 默認存儲文件
     */
    private final String DEFAULT_DB_FILE = "/opt/mapdb/logappender.db";

    /**
     * 隊列名
     */
    private final String LOG_ITEM_LIST_TABLE = "hub_log_appender";
    private final String LOG_ITEM_TREE_SET_TABLE = "hub_log_appender_tree_set";
    private final String LOG_ITEM_HASH_MAP_TABLE = "hub_log_appender_hash_map";
    private final String LOG_ITEM_BTREE_TABLE = "hub_log_appender_btree";

    private final String QUEUE_OFFSET_HOLDER_BTREE_TABLE = "queue_offset_holder_btree_table";

    /**
     * db 實例
     */
    private final DB mapDb;

//    private IndexTreeList<byte[]> indexTreeListQueue;

    /**
     * 僞裝是個隊列
     */
    private NavigableSet<byte[]> treeSetQueue;
    private BTreeMap<byte[], Byte> bTreeQueue;
    private ConcurrentMap<Long, byte[]> concurrentMapQueue;

    /**
     * 隊列偏移量持有器, 對於小容量的節點使用 btree 處理很好
     */
    private BTreeMap<String, Long> queueOffsetDiskHolder;

    /**
     * 讀隊列偏移器, jvm 運行時使用該值, 該值被定時刷新到 mapdb 中
     *
     *      會有部分數據重複狀況
     *
     */
    private AtomicLong readerOfQueueOffsetJvmHolder;

    /**
     * 寫隊列偏移器, jvm 運行時使用該值, 該值被定時刷新到 mapdb 中
     *
     *      會有部分數據重複狀況
     */
    private AtomicLong writerOfQueueOffsetJvmHolder;

    private final String readerOffsetCacheKeyName = "loghub_appender_queue_key_read_offset";
    private final String writerOffsetCacheKeyName = "loghub_appender_queue_key_write_offset";

    /**
     * mapdb 構造方法,給出隊列持有者
     *
     */
    public LocalDiskEnhancedQueueManagerMapDbImpl() {
        mapDb = DBMaker.fileDB(getDbFilePath())
                .checksumHeaderBypass()
                .closeOnJvmShutdown()
                .fileChannelEnable()
                .fileMmapEnableIfSupported()
                // 嘗試修復刪除元素後磁盤文件大小不變化的bug
                 .cleanerHackEnable()
                .concurrencyScale(128)
                .make();
        initQueueOffsetHolder();
        initQueueOwner();
        initCleanUselessSpaceJob();
    }

    /**
     * 初始化隊列偏移器
     */
    private void initQueueOffsetHolder() {
        queueOffsetDiskHolder = mapDb.treeMap(QUEUE_OFFSET_HOLDER_BTREE_TABLE,
                                    Serializer.STRING, Serializer.LONG)
                                    .createOrOpen();
        initQueueReaderOffset();
        initQueueWriterOffset();
    }

    /**
     * 初始化讀偏移數據
     */
    private void initQueueReaderOffset() {
        Long readerQueueOffsetFromDisk = queueOffsetDiskHolder.get(readerOffsetCacheKeyName);
        if(readerQueueOffsetFromDisk == null) {
            readerOfQueueOffsetJvmHolder = new AtomicLong(1);
        }
        else {
            readerOfQueueOffsetJvmHolder = new AtomicLong(readerQueueOffsetFromDisk);
        }
    }

    /**
     * 初始化寫偏移數據
     */
    private void initQueueWriterOffset() {
        Long writerQueueOffsetFromDisk = queueOffsetDiskHolder.get(writerOffsetCacheKeyName);
        if(writerQueueOffsetFromDisk == null) {
            writerOfQueueOffsetJvmHolder = new AtomicLong(1);
        }
        else {
            writerOfQueueOffsetJvmHolder = new AtomicLong(writerQueueOffsetFromDisk);
        }
    }

    /**
     * 刷入最新的讀偏移
     */
    private void flushQueueReaderOffset() {
        queueOffsetDiskHolder.put(readerOffsetCacheKeyName, readerOfQueueOffsetJvmHolder.get());
    }

    /**
     * 刷入最新的讀偏移
     */
    private void flushQueueWriterOffset() {
        queueOffsetDiskHolder.put(writerOffsetCacheKeyName, writerOfQueueOffsetJvmHolder.get());
    }

    /**
     * 初始化隊列容器
     */
    private void initQueueOwner() {
//        indexTreeListQueue = db.indexTreeList(LOG_ITEM_LIST_TABLE, Serializer.BYTE_ARRAY).createOrOpen();
//        bTreeQueue = mapDb.treeMap(LOG_ITEM_BTREE_TABLE,
//                                            Serializer.BYTE_ARRAY, Serializer.BYTE)
//                                            .counterEnable()
//                                            .valuesOutsideNodesEnable()
//                                            .createOrOpen();
//        treeSetQueue = mapDb.treeSet(LOG_ITEM_TREE_SET_TABLE, Serializer.BYTE_ARRAY)
//                                            .createOrOpen();
        concurrentMapQueue = mapDb.hashMap(LOG_ITEM_HASH_MAP_TABLE, Serializer.LONG, Serializer.BYTE_ARRAY)
                                        .counterEnable()
                                        // 當處理能力不好時,就將該日誌打印丟掉
                                        .expireMaxSize(100 * 10000 * 10000L)
                                        // 3小時後還沒消費就過時了
                                        .expireAfterCreate(3L, TimeUnit.HOURS)
                                        .expireAfterGet()
                                        .createOrOpen();
    }

    /**
     * 清理無用空間,如磁盤文件等等
     */
    private void initCleanUselessSpaceJob() {
        ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Async-MapDbSpaceCleaner"));
        // 每過10分鐘清理一次無用空間,看狀況調整
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            mapDb.getStore().compact();
        }, 0L, 10L, TimeUnit.MINUTES);

        // 每過10s刷入一次讀寫偏移,容許重複和丟失
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            flushQueueWriterOffset();
            flushQueueReaderOffset();
        }, 30L, 10L, TimeUnit.SECONDS);
    }

    /**
     * 獲取文件存儲位置,考慮後續擴展被子類覆蓋
     *
     * @return db文件地址
     */
    protected String getDbFilePath() {
        return DEFAULT_DB_FILE;
    }

    /**
     * 獲取下一個隊列讀編號 (確保準確性可承受,性能可承受)
     *
     * @return 隊列編號
     */
    private long getNextReaderId() {
        return readerOfQueueOffsetJvmHolder.incrementAndGet();
    }

    /**
     * 獲取下一個隊列寫編號 (確保準確性可承受,性能可承受)
     *
     * @return 隊列編號
     */
    private long getNextWriterId() {
        return writerOfQueueOffsetJvmHolder.incrementAndGet();
    }

    @Override
    public boolean push(byte[] itemBytes) {
//        return indexTreeListQueue.add(itemBytes);
//        bTreeQueue.put(itemBytes, (byte)1 );
//        treeSetQueue.add(itemBytes);
        concurrentMapQueue.put(getNextWriterId(), itemBytes);
        return true;
    }

    @Override
    public byte[] pollFirstItem() {
        // 使用時不得使用修改元素方法
//        return indexTreeListQueue.remove(index);
//         Map.Entry<byte[], Byte> entry = bTreeQueue.pollFirstEntry();
//        return treeSetQueue.pollFirst();
        return concurrentMapQueue.remove(getNextReaderId());
    }

    @Override
    public boolean isEmpty() {
        // 隊列爲空,不必定表明就沒有可供讀取的元素了,由於 counter 可能落後於併發寫操做了
        // 隊列不爲空,不必定表明就必定有可供讀取的元素,由於 counter 可能落後於併發 remove 操做了
        // 當讀指針等於寫指針時,則表明全部元素已被讀取完成,應該是比較準確的空斷定標準
        return concurrentMapQueue.isEmpty()
                    || readerOfQueueOffsetJvmHolder.get() == writerOfQueueOffsetJvmHolder.get();
    }

    @Override
    public void close() {
        flushQueueWriterOffset();
        flushQueueReaderOffset();
        mapDb.close();
    }

    @Override
    public void reset() {
        concurrentMapQueue.clear();
        // 同步兩個值,非準確的
        readerOfQueueOffsetJvmHolder.set(writerOfQueueOffsetJvmHolder.get());
        logger.error("【mapdb緩存】讀寫指針衝突,強制重置指針,請注意排查併發問題. reader:{}, writer:{}",
                            readerOfQueueOffsetJvmHolder.get(), writerOfQueueOffsetJvmHolder.get());
    }

}

  如上,就是使用 mapdb的hashMap 實現了磁盤隊列功能,主要思路以下:

  1. 使用一個long的自增數據做爲 hashMap 的key,將隊列存入value中;
  2. 使用另外一個 long 的自增指針作爲讀key, 依次讀取數據;
  3. 讀寫指針都按期刷入磁盤,以防出異常crash時沒法恢復;
  4. 當實在出現了未預料的bug時,容許直接丟棄衝突日誌,從一個新的讀取點開始新的工做;

  最後,再加一個工廠類,生成mapdb隊列實例: LocalDiskEnhancedQueueManagerFactory

import com.test.biz.cache.impl.LocalDiskEnhancedQueueManagerMapDbImpl;

/**
 * 本地磁盤隊列等實例工廠類
 *
 */
public class LocalDiskEnhancedQueueManagerFactory {

    /**
     * 生產一個mapDb實現的隊列實例
     *
     * @return mapdb 隊列實例
     */
    public static LocalDiskEnhancedQueueManager newMapDbQueue() {
        return new LocalDiskEnhancedQueueManagerMapDbImpl();
    }

    /**
     * 生產一個使用 ehcache 實現的隊列實例
     *
     * @return ehcache 隊列實例
     */
    public static LocalDiskEnhancedQueueManager newEhcacheQueue() {
        // 有興趣的同窗能夠實現下
        return null;
    }

    /**
     * 生產一個使用 fqueue 實現的隊列實例
     *
     * @return fqueue 隊列實例
     */
    public static LocalDiskEnhancedQueueManager newFQueueQueue() {
        // 有興趣的同窗能夠實現下, 不過不太建議
        return null;
    }


    /**
     * 生產一個使用 本身直接寫磁盤文件 實現的隊列實例
     *
     * @return file 隊列實例
     */
    public static LocalDiskEnhancedQueueManager newOwnFileQueue() {
        // 有興趣的同窗能夠挑戰下
        return null;
    }

}

  這樣,咱們就實現了一個既能知足高併發場景下的日誌打印需求了吧。業務線程優先,日誌線程異步化、可丟棄、cpu佔用少、內存不限制。

 

老話: 優化之路,道阻且長!

相關文章
相關標籤/搜索