併發編程實踐部分源碼使用github裏面部分開源框架源碼,從新整理,若有錯誤或版權問題,請指出指正。java
歡迎star、fork,讀書筆記系列會同步更新mysql
gitgit
https://github.com/xuminwlt/j360-jdkgithub
模塊j360-jdk-applicationsql
日常開發過程當中使用併發框架的場景在SSH的框架裏面直接接觸的機會並非不少,基本上大量使用的就是部分原子類,然而在底層實現中,爲了實現高併發的可能,jdk併發編程框架幾乎所謂不在,每一個併發編程類以及其參數都有各自所擅長的場景,併發編程框架不是一卡通,可是卻很像搭積木,經過組合幾乎不少的併發場景都不在話下,這裏就是用批量日誌管理接口的場景中,如何使用併發編程類實現高併發下的日誌管理。數據庫
業務日誌在每一個系統中都是不可或缺的功能點,業務日誌的輸出也有多種的輸出形式,會話界面、文本、數據庫等等,一般業務日誌在系統中的埋點會做爲一個日誌單元,一個業務系統天天產生的日誌數量=每一個處理流程*埋點*pv(或者同單位request),在高併發系統中,單個集羣節點處理日誌也一般採用併發框架批量進行處理,這裏分別設置兩個條件來對收集的日誌進行處理:
編程
1:每nSeconds批量將收集的日誌隊列輸出併發
2:日誌隊列佔用的內存>設置的閾值(保護n秒內的隊列過大)app
分別表明工廠類、日誌方法接口類、代理類以及實現類的工廠類、方法實現類
框架
先看接口類的方法:
public interface BizLogger { public void log(BizLogPo bizLogPo); public void log(List<BizLogPo> bizLogPos); }
分別處理單條和多條日誌
再看會話打印實現類:
private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName()); @Override public void log(BizLogPo jobLogPo) { LOGGER.info(JSONUtils.toJSONString(jobLogPo)); } @Override public void log(List<BizLogPo> jobLogPos) { for (BizLogPo jobLogPo : jobLogPos) { log(jobLogPo); } }
這裏使用slf4j接口類做爲會話打印的輸出接口,實現類一樣可使用mysql、file等輸出形式
經過工廠類配置得到實現類,一般會使用多個實現類提供接口,而多個實現類難以同時對實現過程進行控制,這裏引入了代理類來調用具體的實現類,而併發編程框架一般在代理類中進行集中控制管理:
public class BizLoggerDelegate implements BizLogger { private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class); // 3S 檢查輸盤一第二天志 private int flushPeriod; private BizLogger jobLogger; private boolean lazyLog = false; private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; private BlockingQueue<BizLogPo> memoryQueue; // 日誌批量刷盤數量 private int batchFlushSize = 100; private int overflowSize = 10000; // 內存中最大的日誌量閥值 private int maxMemoryLogSize; private AtomicBoolean flushing = new AtomicBoolean(false); public BizLoggerDelegate(Config config) { BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() { @Override public BizLogger getJobLogger() { return new ConsoleBizLogger(); } }; jobLogger = jobLoggerFactory.getJobLogger(); lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false); if (lazyLog) { // 無界Queue memoryQueue = new LinkedBlockingQueue<BizLogPo>(); maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000); flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3); executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger")); scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { if (flushing.compareAndSet(false, true)) { checkAndFlush(); } } catch (Throwable t) { LOGGER.error("CheckAndFlush log error", t); } } }, flushPeriod, flushPeriod, TimeUnit.SECONDS); } } /** * 檢查內存中是否有日誌,若是有就批量刷盤 */ private void checkAndFlush() { try { int nowSize = memoryQueue.size(); if (nowSize == 0) { return; } List<BizLogPo> batch = new ArrayList<BizLogPo>(); for (int i = 0; i < nowSize; i++) { BizLogPo jobLogPo = memoryQueue.poll(); batch.add(jobLogPo); if (batch.size() >= batchFlushSize) { flush(batch); } } if (batch.size() > 0) { flush(batch); } } finally { flushing.compareAndSet(true, false); } } private void checkOverflowSize() { if (memoryQueue.size() > overflowSize) { throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available"); } } private void flush(List<BizLogPo> batch) { boolean flushSuccess = false; try { jobLogger.log(batch); flushSuccess = true; } finally { if (!flushSuccess) { memoryQueue.addAll(batch); } batch.clear(); } } /** * 檢查內存中的日誌量是否超過閥值,若是超過須要批量刷盤日誌 */ private void checkCapacity() { if (memoryQueue.size() > maxMemoryLogSize) { // 超過閥值,須要批量刷盤 if (flushing.compareAndSet(false, true)) { // 這裏能夠採用new Thread, 由於這裏只會同時new一個 new Thread(new Runnable() { @Override public void run() { try { checkAndFlush(); } catch (Throwable t) { LOGGER.error("Capacity full flush error", t); } } }).start(); } } } @Override public void log(BizLogPo jobLogPo) { if (jobLogPo == null) { return; } if (lazyLog) { checkOverflowSize(); memoryQueue.offer(jobLogPo); checkCapacity(); } else { jobLogger.log(jobLogPo); } } @Override public void log(List<BizLogPo> jobLogPos) { if (CollectionUtils.isEmpty(jobLogPos)) { return; } if (lazyLog) { checkOverflowSize(); for (BizLogPo jobLogPo : jobLogPos) { memoryQueue.offer(jobLogPo); } // checkCapacity checkCapacity(); } else { jobLogger.log(jobLogPos); } } }
這裏寫一個測試類測試下結果:
@Test public void loggerTest() throws InterruptedException { Config config = new Config(); config.setParameter("biz.logger","console"); List<BizLogPo> list = new ArrayList<BizLogPo>(); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } TimeUnit.SECONDS.sleep(5); BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config); jobLoggerDelegate.log(list); }
一般開發使用Spring環境時,新增Spring適配工廠類,經過Spring配置下:
public class BizLoggerFactoryBean implements FactoryBean<BizLogger>, InitializingBean, DisposableBean { public BizLogger getBizLogger() { return bizLogger; } public void setBizLogger(BizLogger bizLogger) { this.bizLogger = bizLogger; } private BizLogger bizLogger; @Override public void destroy() throws Exception { } @Override public BizLogger getObject() throws Exception { return bizLogger; } @Override public Class<?> getObjectType() { return bizLogger.getClass(); } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { } }
新增Spring的配置類:
LoggerSpringConfig ApplicationContextAware { ApplicationContext (ApplicationContext applicationContext) BeansException { .= applicationContext} (=) BizLogger () Exception { BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()} }
新增Spring的測試類:
@Test public void loggerSpringTest(){ ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class); BizLogger bizLogger = (BizLogger) context.getBean("bizLogger"); List<BizLogPo> list = new ArrayList<BizLogPo>(); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } bizLogger.log(list); }
實際使用中,把LoggerDelelate配置到Bean裏面便可。
到這裏一個簡單的使用併發框架實現的日誌處理接口完成了,固然實現類中可使用批量SQL的形式進行處理,加強sql的吞吐量。