徒手擼框架--高併發環境下的請求合併

原文地址:https://www.xilidou.com/2018/01/22/merge-request/java

在高併發系統中,咱們常常遇到這樣的需求:系統產生大量的請求,可是這些請求實時性要求不高。咱們就能夠將這些請求合併,達到必定數量咱們統一提交。最大化的利用系統性IO,提高系統的吞吐性能。git

因此請求合併框架須要考慮如下兩個需求:github

  1. 當請求收集到必定數量時提交數據
  2. 一段時間後若是請求沒有達到指定的數量也進行提交

咱們就聊聊一如何實現這樣一個需求。spring

閱讀這篇文章你將會了解到:shell

  • ScheduledThreadPoolExecutor
  • 阻塞隊列
  • 線程安全的參數
  • LockSuppor的使用

設計思路和實現

咱們就聊一聊實現這個東西的具體思路是什麼。但願你們可以學習到分析問題,設計模塊的一些套路。編程

  1. 底層使用什麼數據結構來持有須要合併的請求?安全

    • 既然咱們的系統是在高併發的環境下使用,那咱們確定不能使用,普通的ArrayList來持有。咱們可使用阻塞隊列來持有須要合併的請求。
    • 咱們的數據結構須要提供一個 add() 的方法給外部,用於提交數據。當外部add數據之後,須要檢查隊列裏面的數據的個數是否達到咱們限額?達到數量提交數據,不達到繼續等待。
    • 數據結構還須要提供一個timeOut()的方法,外部有一個計時器定時調用這個timeOut方法,若是方法被調用,則直接向遠程提交數據。
    • 條件知足的時候線程執行提交動做,條件不知足的時候線程應當暫停,等待隊列達到提交數據的條件。因此咱們能夠考慮使用 LockSuppor.park()LockSuppor.unpark 來暫停和激活操做線程。

通過上面的分析,咱們就有了這樣一個數據結構:數據結構

private static class FlushThread<Item> implements Runnable{

        private final String name;

        //隊列大小
        private final int bufferSize;
        //操做間隔
        private int flushInterval;

        //上一次提交的時間。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有數據的阻塞隊列
        private final BlockingQueue<Item> queue;

        //達成條件後具體執行的方法
        private final Processor<Item> processor;

        //構造函數
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交數據的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供給外部的超時方法
        public void timeOut(){
            //超過兩次提交超過期間間隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }
        
        //解除線程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //當前的數據是否大於提交的條件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //執行提交數據的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List<Item> temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根據數據的尺寸和時間間隔判斷是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //若是線程沒有被打斷,且不達到執行的條件,則阻塞線程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }
  1. 如何實現定時提交呢?

一般咱們遇到定時相關的需求,首先想到的應該是使用 ScheduledThreadPoolExecutor定時來調用FlushThread 的 timeOut 方法,若是你想到的是 Thread.sleep()...那須要再努力學習,多看源碼了。多線程

  1. 怎樣進一步的提高系統的吞吐量?

咱們使用的FlushThread 實現了 Runnable 因此咱們能夠考慮使用線程池來持有多個FlushThread併發

因此咱們就有這樣的代碼:

public class Flusher<Item> {

    private final FlushThread<Item>[] flushThreads;

    private AtomicInteger index;

    //防止多個線程同時執行。增長一個隨機數間隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {

        this.flushThreads = new FlushThread[threads];


        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定時調用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 對 index 取模,保證多線程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已經描述
    private static class FlushThread<Item> implements Runnable{
        ...省略
    }
}
  1. 面向接口編程,提高系統擴展性:
public interface Processor<T> {
    void process(List<T> list);
}

使用

咱們寫個測試方法測試一下:

//實現 Processor 將 String 所有輸出
public class PrintOutProcessor implements Processor<String>{
    @Override
    public void process(List<String> list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}
public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

執行的結果:

start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

咱們發現並無達到10個數字就觸發了flush。由於出發了超時提交,雖然尚未達到規定的5
個數據,但仍是執行了 flush。

若是咱們去除 Thread.sleep(1000); 再看看結果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush

每5個數一次提交。完美。。。。

總結

一個比較生動的例子給你們講解了一些多線程的具體運用。學習多線程應該多思考多動手,纔會有比較好的效果。但願這篇文章你們讀完之後有所收穫,歡迎交流。

github地址:https://github.com/diaozxin007/framework

徒手擼框架系列文章地址:

徒手擼框架--實現IoC

徒手擼框架--實現Aop

相關文章
相關標籤/搜索