寫這個程序主要是用來理解生產者消費者模型,以及經過這個Demo來理解Redis的單線程取原子任務是怎麼實現的和鞏固一下併發相關的知識;這個雖然是個Demo,可是隻要稍加改下Appender部分也是能夠用於項目中的,假設項目裏確實不須要log4j/logback之類的日誌組件的時候;java
1.利用LinkedList做爲MQ(還能夠用jdk自帶的LinkedBlockingQueue,不過這個Demo主要是爲了更好的理解原理所以寫的比較底層);安全
2.利用一個Daemon線程做爲消費者從MQ裏實時獲取日誌對象/日誌記錄,並將它提交給線程池,由線程池再遍歷全部的appender並調用它們的通知方法,這個地方還能夠根據場景進行效率優化,如將循環遍歷appender改成將每一個appender都再此提交到線程池實現異步通知觀察者;併發
3.爲生產者提供log方法做爲生產日誌記錄的接口,不管是生產日誌對象仍是消費日誌對象在操做隊列時都須要對隊列加鎖,由於我的用的是非併發包裏的;app
4.消費者在獲取以前會先判斷MQ裏是否有數據,有則獲取並提交給線程池處理,不然wait;異步
5.生產者生產了日誌對象後經過notify通知消費者去取,由於只有一個消費者,而生產者是不會wait的所以只須要notify而不用notifyAllide
6.剩下的就看代碼來講明吧;高併發
package me.study.mqlogger.log; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.DEBUG; import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.ERROR; /** * @author wangsong * @version 1.0 * @description 這裏只是作一個簡單的logger實現,不提供Appender之類的功能,主要是用來學習生產者和消費者及MQ的實現原理 * @date 9/26/19 6:07 PM */ public class MyLogger{ private LogLevel loggerLevel = DEBUG; private String charset = "UTF-8"; // 暫且沒用,可是當須要序列化時是可能用到的; // TODO 也能夠直接用LinkedQueue,而後手動經過ReentrantLock來實現併發時的數據安全(synchronized也可) //private BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<LogRecord>(); // 能夠理解爲支持併發的LinkedList // TODO 想了一下既然是要學習原理乾脆就實現的更底層一點 private final Queue<LogRecord> records = new LinkedList<LogRecord>(); // TODO 用於記錄生產了多少條日誌,可供外部獲取 private AtomicLong produceCount = new AtomicLong(0); // TODO 用於記錄消費了多少條日誌 private AtomicLong consumeCount = new AtomicLong(0); // TODO 日誌記錄的Consumer private Thread consumer = new LogDaemon(); public MyLogger(){ consumer.setDaemon(true); consumer.start(); } /** * 對外提供的接口,即log方法就是生產者用於生產日誌數據的接口 * @param msg * @param level */ public void log(String msg, LogLevel level){ Date curr = generateCurrDate(); log(new LogRecord(level, msg, curr)); } /** * 對外提供的接口,即log方法就是生產者用於生產日誌數據的接口 * @param msg */ public void log(String msg){ Date curr = generateCurrDate(); log(new LogRecord(this.loggerLevel, msg, curr)); } /** * 給生產者(即調用log的方法均可以理解爲生產者在生產日誌對象)提供用於生產日誌記錄的接口 * @param record */ public void log(LogRecord record){ // ReentrantLock能夠替代synchronized,不過當前場景下synchronized已經足夠 synchronized (this.records){ // TODO 若是用的是LinkedBlockingQueue是不須要這個的 this.records.offer(record); this.produceCount.incrementAndGet(); this.records.notify(); // TODO 只有一個線程會records.wait(),所以notify()足夠 } } // TODO 相似Redis的那個單線程,用於讀取命令對象,而這裏則是用於讀取LogRecord並經過appender將數據寫到相應位置 private class LogDaemon extends Thread{ private volatile boolean valid = true; // 充當appenders的角色 private List<Writer> appenders = null; private ExecutorService threadPool = new ThreadPoolExecutor(1, 3 , 180000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024)); @Override public void run() { while(this.valid){ // TODO 根據最少知道原則,在這裏不要去想總體裏是否存在打斷此線程的地方,你就認爲此線程是可能被外界打斷的便可,所以須要作必定處理 try { synchronized (MyLogger.this.records) { if (MyLogger.this.records.size() <= 0) { MyLogger.this.records.wait(); } final LogRecord firstRecord = MyLogger.this.records.poll(); MyLogger.this.consumeCount.incrementAndGet(); //threadPool.submit() threadPool.execute(() -> MyLogger.this.notifyAppender(this.appenders, firstRecord)); } }catch (InterruptedException ex){ this.valid = false; ex.printStackTrace(); }catch (Throwable t){ t.printStackTrace(); } } } } private void notifyAppender(final List<Writer> appenders, final LogRecord record) { if(appenders == null){ PrintWriter writer = new PrintWriter(record.level == ERROR ? System.err : System.out); writer.append(record.toString()); writer.flush(); }else{ // TODO 這種是同步的方式,若是是異步的方式能夠將每一個appender的執行都由一個Runnable對象包裝,而後submit給線程池(或者中間加個中間件) for(Writer writer : appenders){ try { writer.append(record.toString()); }catch (IOException ex){ ex.printStackTrace(); } } } } /** * 用於產生當前時間的模塊,防止由於併發而致使LogRecord的timestamp根實際狀況不符 */ private Lock currDateLock = new ReentrantLock(); // 直接用synchronized亦可 private Date generateCurrDate(){ currDateLock.lock(); Date result = new Date(); currDateLock.unlock(); return result; } // 生產者生產的數據對象 public static class LogRecord{ private LogLevel level; private String msg; private Date timestamp; private static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); private SimpleDateFormat dateFormat = DEFAULT_DATE_FORMAT; /*public LogRecord(){ this(INFO, ""); }*/ public LogRecord(LogLevel level, String msg){ this(level, msg, new Date()); // 仍是最好由外界設置timestamp,不然高併發下會比較不許 } // TODO 最好用這個,否則高併發下timestamp容易出現順序不許確的狀況。 public LogRecord(LogLevel level, String msg, Date timestamp){ this.level = level; this.msg = msg; this.timestamp = timestamp; } @Override public String toString(){ return String.format("[Level:%s, Datetime:%s] : %s\n", level, dateFormat.format(timestamp), msg); } public LogLevel getLevel() { return level; } public String getMsg() { return msg; } public void setDateFormat(SimpleDateFormat dateFormat) { this.dateFormat = dateFormat; } public void setTimestamp(Date timestamp) { this.timestamp = timestamp; } } public enum LogLevel{ // TODO 內部enum默認就是static INFO, DEBUG, ERROR } public LogLevel getLoggerLevel() { return loggerLevel; } public void setLoggerLevel(LogLevel loggerLevel) { this.loggerLevel = loggerLevel; } public String getCharset() { return charset; } public void setCharset(String charset) { this.charset = charset; } public AtomicLong getProduceCount() { return produceCount; } public AtomicLong getConsumeCount() { return consumeCount; } }
package me.study.mqlogger; import me.silentdoer.mqlogger.log.MyLogger; import java.util.Scanner; /** * @author wangsong * @version 1.0 * @description the description * @date 9/26/19 10:13 PM */ public class Entrance { private static MyLogger logger = new MyLogger(); public static void main(String[] args){ //logger.setLoggerLevel(MyLogger.LogLevel.ERROR); Scanner scanner = new Scanner(System.in); String line; while(!(line = scanner.nextLine()).equals("exit")){ if(line.equals("")) continue; logger.log(line); System.out.println(String.format("共生產了%s條日誌。", logger.getConsumeCount())); try { Thread.sleep(500); }catch (InterruptedException ex){ } System.out.println(String.format("共消費了%s條日誌。", logger.getProduceCount())); } } }
package me.study.mqlogger; import me.silentdoer.mqlogger.log.MyLogger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author wangsong * @version 1.0 * @description the description * @date 9/26/19 10:32 PM */ public class Entrance2 { private static MyLogger logger = new MyLogger(); public static void main(String[] args){ logger.setLoggerLevel(MyLogger.LogLevel.ERROR); ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int index = i + 1; threadPool.execute(() -> { logger.log(String.format("生產的第%s條記錄。", index)); System.out.println(String.format("共生產了%s條記錄。", index)); }); try { Thread.sleep(100); }catch (InterruptedException ex){ } } try { Thread.sleep(3000); System.out.println(String.format("共%s條記錄被消費。", logger.getConsumeCount())); }catch (InterruptedException ex){ } //threadPool.shutdown(); //threadPool.shutdownNow(); } }
若是想實現像BlockingQueue同樣可以控制MQ的元素個數範圍,則能夠經過ReentrantLock的Confition來實現,即經過lock建立兩個Condition對象,一個用來描述是否MQ中元素達到上限的狀況,一個用於描述MQ中元素降到下限的狀況;學習
不管是達到上限或降到下限都會經過相應的condition對象來阻塞對應的生產者或消費者的生產/消費過程從而實現MQ元素個數的可控性;測試