併發編程源碼分析一之Log接口

併發編程實踐部分源碼使用github裏面部分開源框架源碼,從新整理,若有錯誤或版權問題,請指出指正。java

歡迎star、fork,讀書筆記系列會同步更新mysql

gitgit

https://github.com/xuminwlt/j360-jdkgithub

模塊j360-jdk-applicationsql


前言

日常開發過程當中使用併發框架的場景在SSH的框架裏面直接接觸的機會並非不少,基本上大量使用的就是部分原子類,然而在底層實現中,爲了實現高併發的可能,jdk併發編程框架幾乎所謂不在,每一個併發編程類以及其參數都有各自所擅長的場景,併發編程框架不是一卡通,可是卻很像搭積木,經過組合幾乎不少的併發場景都不在話下,這裏就是用批量日誌管理接口的場景中,如何使用併發編程類實現高併發下的日誌管理。數據庫


內容

業務日誌在每一個系統中都是不可或缺的功能點,業務日誌的輸出也有多種的輸出形式,會話界面、文本、數據庫等等,一般業務日誌在系統中的埋點會做爲一個日誌單元,一個業務系統天天產生的日誌數量=每一個處理流程*埋點*pv(或者同單位request),在高併發系統中,單個集羣節點處理日誌也一般採用併發框架批量進行處理,這裏分別設置兩個條件來對收集的日誌進行處理:
編程

1:每nSeconds批量將收集的日誌隊列輸出併發

2:日誌隊列佔用的內存>設置的閾值(保護n秒內的隊列過大)app


類圖

包結構

類說明

分別表明工廠類、日誌方法接口類、代理類以及實現類的工廠類、方法實現類
框架

先看接口類的方法:

public interface BizLogger {
    public void log(BizLogPo bizLogPo);

    public void log(List<BizLogPo> bizLogPos);
}

分別處理單條和多條日誌

再看會話打印實現類:

private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName());

@Override
public void log(BizLogPo jobLogPo) {
    LOGGER.info(JSONUtils.toJSONString(jobLogPo));
}

@Override
public void log(List<BizLogPo> jobLogPos) {
    for (BizLogPo jobLogPo : jobLogPos) {
        log(jobLogPo);
    }
}

這裏使用slf4j接口類做爲會話打印的輸出接口,實現類一樣可使用mysql、file等輸出形式

經過工廠類配置得到實現類,一般會使用多個實現類提供接口,而多個實現類難以同時對實現過程進行控制,這裏引入了代理類來調用具體的實現類,而併發編程框架一般在代理類中進行集中控制管理:

public class BizLoggerDelegate implements BizLogger {

    private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class);

    // 3S 檢查輸盤一第二天志
    private int flushPeriod;

    private BizLogger jobLogger;
    private boolean lazyLog = false;
    private ScheduledExecutorService executor;
    private ScheduledFuture scheduledFuture;
    private BlockingQueue<BizLogPo> memoryQueue;
    // 日誌批量刷盤數量
    private int batchFlushSize = 100;
    private int overflowSize = 10000;
    // 內存中最大的日誌量閥值
    private int maxMemoryLogSize;
    private AtomicBoolean flushing = new AtomicBoolean(false);

    public BizLoggerDelegate(Config config) {
        BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() {
            @Override
            public BizLogger getJobLogger() {
                return new ConsoleBizLogger();
            }
        };
        jobLogger = jobLoggerFactory.getJobLogger();
        lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false);
        if (lazyLog) {

            // 無界Queue
            memoryQueue = new LinkedBlockingQueue<BizLogPo>();
            maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000);
            flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3);

            executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger"));
            scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (flushing.compareAndSet(false, true)) {
                            checkAndFlush();
                        }
                    } catch (Throwable t) {
                        LOGGER.error("CheckAndFlush log error", t);
                    }
                }
            }, flushPeriod, flushPeriod, TimeUnit.SECONDS);

        }
    }

    /**
     * 檢查內存中是否有日誌,若是有就批量刷盤
     */
    private void checkAndFlush() {
        try {
            int nowSize = memoryQueue.size();
            if (nowSize == 0) {
                return;
            }
            List<BizLogPo> batch = new ArrayList<BizLogPo>();
            for (int i = 0; i < nowSize; i++) {
                BizLogPo jobLogPo = memoryQueue.poll();
                batch.add(jobLogPo);

                if (batch.size() >= batchFlushSize) {
                    flush(batch);
                }
            }
            if (batch.size() > 0) {
                flush(batch);
            }

        } finally {
            flushing.compareAndSet(true, false);
        }
    }

    private void checkOverflowSize() {
        if (memoryQueue.size() > overflowSize) {
            throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available");
        }
    }

    private void flush(List<BizLogPo> batch) {
        boolean flushSuccess = false;
        try {
            jobLogger.log(batch);
            flushSuccess = true;
        } finally {
            if (!flushSuccess) {
                memoryQueue.addAll(batch);
            }
            batch.clear();
        }
    }

    /**
     * 檢查內存中的日誌量是否超過閥值,若是超過須要批量刷盤日誌
     */
    private void checkCapacity() {
        if (memoryQueue.size() > maxMemoryLogSize) {
            // 超過閥值,須要批量刷盤
            if (flushing.compareAndSet(false, true)) {
                // 這裏能夠採用new Thread, 由於這裏只會同時new一個
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            checkAndFlush();
                        } catch (Throwable t) {
                            LOGGER.error("Capacity full flush error", t);
                        }
                    }
                }).start();
            }
        }
    }

    @Override
    public void log(BizLogPo jobLogPo) {
        if (jobLogPo == null) {
            return;
        }
        if (lazyLog) {
            checkOverflowSize();
            memoryQueue.offer(jobLogPo);
            checkCapacity();
        } else {
            jobLogger.log(jobLogPo);
        }
    }

    @Override
    public void log(List<BizLogPo> jobLogPos) {
        if (CollectionUtils.isEmpty(jobLogPos)) {
            return;
        }
        if (lazyLog) {
            checkOverflowSize();
            for (BizLogPo jobLogPo : jobLogPos) {
                memoryQueue.offer(jobLogPo);
            }
            // checkCapacity
            checkCapacity();
        } else {
            jobLogger.log(jobLogPos);
        }
    }

}


這裏寫一個測試類測試下結果:

@Test
public void loggerTest() throws InterruptedException {
    Config config = new Config();
    config.setParameter("biz.logger","console");

    List<BizLogPo> list = new ArrayList<BizLogPo>();
    for(int i =0;i<=10;i++){
        BizLogPo jobLogPo = new BizLogPo();
        jobLogPo.setMsg("hello" + i);
        list.add(jobLogPo);
    }
    TimeUnit.SECONDS.sleep(5);
    BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config);
    jobLoggerDelegate.log(list);
}

一般開發使用Spring環境時,新增Spring適配工廠類,經過Spring配置下:

public class BizLoggerFactoryBean implements FactoryBean<BizLogger>,
        InitializingBean, DisposableBean {

        public BizLogger getBizLogger() {
                return bizLogger;
        }

        public void setBizLogger(BizLogger bizLogger) {
                this.bizLogger = bizLogger;
        }

        private BizLogger bizLogger;

        @Override
        public void destroy() throws Exception {

        }

        @Override
        public BizLogger getObject() throws Exception {
                return bizLogger;
        }

        @Override
        public Class<?> getObjectType() {
                return bizLogger.getClass();
        }

        @Override
        public boolean isSingleton() {
                return true;
        }

        @Override
        public void afterPropertiesSet() throws Exception {

        }
}

新增Spring的配置類:

LoggerSpringConfig ApplicationContextAware {

    ApplicationContext (ApplicationContext applicationContext) BeansException {
        .= applicationContext}

    (=)
    BizLogger () Exception {
        BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()}

}

新增Spring的測試類:

@Test
public void loggerSpringTest(){
    ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class);
    BizLogger bizLogger = (BizLogger) context.getBean("bizLogger");
    List<BizLogPo> list = new ArrayList<BizLogPo>();
    for(int i =0;i<=10;i++){
        BizLogPo jobLogPo = new BizLogPo();
        jobLogPo.setMsg("hello" + i);
        list.add(jobLogPo);
    }
    bizLogger.log(list);
}


實際使用中,把LoggerDelelate配置到Bean裏面便可。

到這裏一個簡單的使用併發框架實現的日誌處理接口完成了,固然實現類中可使用批量SQL的形式進行處理,加強sql的吞吐量。

相關文章
相關標籤/搜索