原文地址:https://www.xilidou.com/2018/01/22/merge-request/java
在高併發系統中,咱們常常遇到這樣的需求:系統產生大量的請求,可是這些請求實時性要求不高。咱們就能夠將這些請求合併,達到必定數量咱們統一提交。最大化的利用系統性IO,提高系統的吞吐性能。git
因此請求合併框架須要考慮如下兩個需求:github
咱們就聊聊一如何實現這樣一個需求。spring
閱讀這篇文章你將會了解到:shell
咱們就聊一聊實現這個東西的具體思路是什麼。但願你們可以學習到分析問題,設計模塊的一些套路。編程
底層使用什麼數據結構來持有須要合併的請求?安全
ArrayList
來持有。咱們可使用阻塞隊列來持有須要合併的請求。LockSuppor.park()
和LockSuppor.unpark
來暫停和激活操做線程。通過上面的分析,咱們就有了這樣一個數據結構:數據結構
private static class FlushThread<Item> implements Runnable{ private final String name; //隊列大小 private final int bufferSize; //操做間隔 private int flushInterval; //上一次提交的時間。 private volatile long lastFlushTime; private volatile Thread writer; //持有數據的阻塞隊列 private final BlockingQueue<Item> queue; //達成條件後具體執行的方法 private final Processor<Item> processor; //構造函數 public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) { this.name = name; this.bufferSize = bufferSize; this.flushInterval = flushInterval; this.lastFlushTime = System.currentTimeMillis(); this.processor = processor; this.queue = new ArrayBlockingQueue<>(queueSize); } //外部提交數據的方法 public boolean add(Item item){ boolean result = queue.offer(item); flushOnDemand(); return result; } //提供給外部的超時方法 public void timeOut(){ //超過兩次提交超過期間間隔 if(System.currentTimeMillis() - lastFlushTime >= flushInterval){ start(); } } //解除線程的阻塞 private void start(){ LockSupport.unpark(writer); } //當前的數據是否大於提交的條件 private void flushOnDemand(){ if(queue.size() >= bufferSize){ start(); } } //執行提交數據的方法 public void flush(){ lastFlushTime = System.currentTimeMillis(); List<Item> temp = new ArrayList<>(bufferSize); int size = queue.drainTo(temp,bufferSize); if(size > 0){ try { processor.process(temp); }catch (Throwable e){ log.error("process error",e); } } } //根據數據的尺寸和時間間隔判斷是否提交 private boolean canFlush(){ return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval; } @Override public void run() { writer = Thread.currentThread(); writer.setName(name); while (!writer.isInterrupted()){ while (!canFlush()){ //若是線程沒有被打斷,且不達到執行的條件,則阻塞線程 LockSupport.park(this); } flush(); } } }
一般咱們遇到定時相關的需求,首先想到的應該是使用 ScheduledThreadPoolExecutor
定時來調用FlushThread 的 timeOut 方法,若是你想到的是 Thread.sleep()
...那須要再努力學習,多看源碼了。多線程
咱們使用的FlushThread
實現了 Runnable
因此咱們能夠考慮使用線程池來持有多個FlushThread
。併發
因此咱們就有這樣的代碼:
public class Flusher<Item> { private final FlushThread<Item>[] flushThreads; private AtomicInteger index; //防止多個線程同時執行。增長一個隨機數間隔 private static final Random r = new Random(); private static final int delta = 50; private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1); private static ExecutorService POOL = Executors.newCachedThreadPool(); public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) { this.flushThreads = new FlushThread[threads]; if(threads > 1){ index = new AtomicInteger(); } for (int i = 0; i < threads; i++) { final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor); flushThreads[i] = flushThread; POOL.submit(flushThread); //定時調用 timeOut()方法。 TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); } } // 對 index 取模,保證多線程都能被add public boolean add(Item item){ int len = flushThreads.length; if(len == 1){ return flushThreads[0].add(item); } int mod = index.incrementAndGet() % len; return flushThreads[mod].add(item); } //上文已經描述 private static class FlushThread<Item> implements Runnable{ ...省略 } }
public interface Processor<T> { void process(List<T> list); }
咱們寫個測試方法測試一下:
//實現 Processor 將 String 所有輸出 public class PrintOutProcessor implements Processor<String>{ @Override public void process(List<String> list) { System.out.println("start flush"); list.forEach(System.out::println); System.out.println("end flush"); } }
public class Test { public static void main(String[] args) throws InterruptedException { Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor()); int index = 1; while (true){ stringFlusher.add(String.valueOf(index++)); Thread.sleep(1000); } } }
執行的結果:
start flush 1 2 3 end flush start flush 4 5 6 7 end flush
咱們發現並無達到10個數字就觸發了flush。由於出發了超時提交,雖然尚未達到規定的5
個數據,但仍是執行了 flush。
若是咱們去除 Thread.sleep(1000);
再看看結果:
start flush 1 2 3 4 5 end flush start flush 6 7 8 9 10 end flush
每5個數一次提交。完美。。。。
一個比較生動的例子給你們講解了一些多線程的具體運用。學習多線程應該多思考多動手,纔會有比較好的效果。但願這篇文章你們讀完之後有所收穫,歡迎交流。
github地址:https://github.com/diaozxin007/framework
徒手擼框架系列文章地址: