聊聊Elasticsearch的SizeBlockingQueue

本文主要研究一下Elasticsearch的SizeBlockingQueuejava

SizeBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.javagit

public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final BlockingQueue<E> queue;
    private final int capacity;

    private final AtomicInteger size = new AtomicInteger();

    public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) {
        assert capacity >= 0;
        this.queue = queue;
        this.capacity = capacity;
    }

    @Override
    public int size() {
        return size.get();
    }

    public int capacity() {
        return this.capacity;
    }

    @Override
    public Iterator<E> iterator() {
        final Iterator<E> it = queue.iterator();
        return new Iterator<E>() {
            E current;

            @Override
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override
            public E next() {
                current = it.next();
                return current;
            }

            @Override
            public void remove() {
                // note, we can't call #remove on the iterator because we need to know // if it was removed or not if (queue.remove(current)) { size.decrementAndGet(); } } }; } @Override public E peek() { return queue.peek(); } @Override public E poll() { E e = queue.poll(); if (e != null) { size.decrementAndGet(); } return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = queue.poll(timeout, unit); if (e != null) { size.decrementAndGet(); } return e; } @Override public boolean remove(Object o) { boolean v = queue.remove(o); if (v) { size.decrementAndGet(); } return v; } /** * Forces adding an element to the queue, without doing size checks. */ public void forcePut(E e) throws InterruptedException { size.incrementAndGet(); try { queue.put(e); } catch (InterruptedException ie) { size.decrementAndGet(); throw ie; } } @Override public boolean offer(E e) { while (true) { final int current = size.get(); if (current >= capacity()) { return false; } if (size.compareAndSet(current, 1 + current)) { break; } } boolean offered = queue.offer(e); if (!offered) { size.decrementAndGet(); } return offered; } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("offer with timeout not allowed on size queue"); } @Override public void put(E e) throws InterruptedException { // note, not used in ThreadPoolExecutor throw new IllegalStateException("put not allowed on size queue"); } @Override public E take() throws InterruptedException { E e; try { e = queue.take(); size.decrementAndGet(); } catch (InterruptedException ie) { throw ie; } return e; } @Override public int remainingCapacity() { return capacity() - size.get(); } @Override public int drainTo(Collection<? super E> c) { int v = queue.drainTo(c); size.addAndGet(-v); return v; } @Override public int drainTo(Collection<? super E> c, int maxElements) { int v = queue.drainTo(c, maxElements); size.addAndGet(-v); return v; } @Override public Object[] toArray() { return queue.toArray(); } @Override public <T> T[] toArray(T[] a) { return (T[]) queue.toArray(a); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } } 複製代碼
  • SizeBlockingQueue繼承了AbstractQueue,同時實現了BlockingQueue接口;它的構造器要求輸入blockingQueue及capacity參數
  • SizeBlockingQueue有個AtomicInteger類型的size參數用於記錄queue的大小,它在poll、remove、offer、take等方法都會維護這個size參數
  • 其中offer方法會判斷當前size是否大於等於capacity,若是大於等於則直接返回false;而put方法則直接拋出IllegalStateException

ResizableBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.javagithub

final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {

    private volatile int capacity;

    ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
        super(queue, initialCapacity);
        this.capacity = initialCapacity;
    }

    @Override
    public int capacity() {
        return this.capacity;
    }

    @Override
    public int remainingCapacity() {
        return Math.max(0, this.capacity());
    }

    /** Resize the limit for the queue, returning the new size limit */
    public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
        assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
        assert optimalCapacity >= 0 : "desired capacity cannot be negative";
        assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
        assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";

        if (optimalCapacity == capacity) {
            // Yahtzee!
            return this.capacity;
        }

        if (optimalCapacity > capacity + adjustmentAmount) {
            // adjust up
            final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else if (optimalCapacity < capacity - adjustmentAmount) {
            // adjust down
            final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else {
            return this.capacity;
        }
    }
}
複製代碼
  • ResizableBlockingQueue繼承了SizeBlockingQueue,它提供了一個線程安全的adjustCapacity方法,用於resize隊列的capacity

EsExecutors

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java安全

public class EsExecutors {
	//......

    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
                                                ThreadFactory threadFactory, ThreadContext contextHolder) {
        BlockingQueue<Runnable> queue;
        if (queueCapacity < 0) {
            queue = ConcurrentCollections.newBlockingQueue();
        } else {
            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
        }
        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
            queue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
                                                         int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
                                                         ThreadFactory threadFactory, ThreadContext contextHolder) {
        if (initialQueueCapacity <= 0) {
            throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
                            initialQueueCapacity);
        }
        ResizableBlockingQueue<Runnable> queue =
                new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
        return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
                queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
                new EsAbortPolicy(), contextHolder);
    }

	//......
}
複製代碼
  • EsExecutors的newFixed建立的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed建立的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

小結

  • SizeBlockingQueue繼承了AbstractQueue,同時實現了BlockingQueue接口;它的構造器要求輸入blockingQueue及capacity參數;它有個AtomicInteger類型的size參數用於記錄queue的大小,它在poll、remove、offer、take等方法都會維護這個size參數;其中offer方法會判斷當前size是否大於等於capacity,若是大於等於則直接返回false;而put方法則直接拋出IllegalStateException
  • ResizableBlockingQueue繼承了SizeBlockingQueue,它提供了一個線程安全的adjustCapacity方法,用於resize隊列的capacity
  • EsExecutors的newFixed建立的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed建立的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

doc

相關文章
相關標籤/搜索