聊聊flink的MemoryPool

本文主要研究一下flink的MemoryPoolhtml

MemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.javajava

abstract static class MemoryPool {

        abstract int getNumberOfAvailableMemorySegments();

        abstract MemorySegment allocateNewSegment(Object owner);

        abstract MemorySegment requestSegmentFromPool(Object owner);

        abstract void returnSegmentToPool(MemorySegment segment);

        abstract void clear();
    }
  • MemoryPool定義了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear這幾個抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool這兩個子類

HybridHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.javaapache

static final class HybridHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<byte[]> availableMemory;

        private final int segmentSize;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(new byte[segmentSize]);
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            byte[] buf = availableMemory.remove();
            return  MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
                availableMemory.add(heapSegment.getArray());
                heapSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridHeapMemoryPool繼承了MemoryPool,它使用的是jvm的heap內存;構造器接收numInitialSegments、segmentSize兩個參數用於初始化availableMemory這個ArrayDeque,該queue的元素類型爲byte[]
  • allocateNewSegment方法調用的是MemorySegmentFactory.allocateUnpooledSegment,用於分配unpooled memory;requestSegmentFromPool方法調用的是availableMemory.remove(),而後調用MemorySegmentFactory.wrapPooledHeapMemory包裝爲MemorySegment,這個方法沒有判斷ArrayDeque的大小就直接remove,須要注意
  • returnSegmentToPool方法只對HybridMemorySegment類型進行處理,首先將它的byte[]歸還到availableMemory,以後調用heapSegment.free()釋放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法調用的是availableMemory.clear()

HybridOffHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.javaapi

static final class HybridOffHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<ByteBuffer> availableMemory;

        private final int segmentSize;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            ByteBuffer buf = availableMemory.remove();
            return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
                ByteBuffer buf = hybridSegment.getOffHeapBuffer();
                availableMemory.add(buf);
                hybridSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridOffHeapMemoryPool繼承了MemoryPool,它使用的是OffHeap;構造器接收numInitialSegments、segmentSize兩個參數用於初始化availableMemory這個ArrayDeque,該queue的元素類型爲ByteBuffer
  • allocateNewSegment方法調用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用於分配unpooled off-heap memory;requestSegmentFromPool方法調用的是availableMemory.remove(),而後調用MemorySegmentFactory.wrapPooledOffHeapMemory包裝爲MemorySegment,這個方法沒有判斷ArrayDeque的大小就直接remove,須要注意
  • returnSegmentToPool方法只對HybridMemorySegment類型進行處理,首先將它的ByteBuffer歸還到availableMemory,以後調用heapSegment.free()釋放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法調用的是availableMemory.clear()

小結

  • MemoryPool定義了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear這幾個抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool這兩個子類
  • HybridHeapMemoryPool繼承了MemoryPool,它使用的是jvm的heap內存;構造器接收numInitialSegments、segmentSize兩個參數用於初始化availableMemory這個ArrayDeque,該queue的元素類型爲byte[];allocateNewSegment方法調用的是MemorySegmentFactory.allocateUnpooledSegment,用於分配unpooled memory;requestSegmentFromPool方法調用的是availableMemory.remove(),而後調用MemorySegmentFactory.wrapPooledHeapMemory包裝爲MemorySegment,這個方法沒有判斷ArrayDeque的大小就直接remove,須要注意;returnSegmentToPool方法只對HybridMemorySegment類型進行處理,首先將它的byte[]歸還到availableMemory,以後調用heapSegment.free()釋放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法調用的是availableMemory.clear()
  • HybridOffHeapMemoryPool繼承了MemoryPool,它使用的是OffHeap;構造器接收numInitialSegments、segmentSize兩個參數用於初始化availableMemory這個ArrayDeque,該queue的元素類型爲ByteBuffer;allocateNewSegment方法調用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用於分配unpooled off-heap memory;requestSegmentFromPool方法調用的是availableMemory.remove(),而後調用MemorySegmentFactory.wrapPooledOffHeapMemory包裝爲MemorySegment,這個方法沒有判斷ArrayDeque的大小就直接remove,須要注意;returnSegmentToPool方法只對HybridMemorySegment類型進行處理,首先將它的ByteBuffer歸還到availableMemory,以後調用heapSegment.free()釋放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法調用的是availableMemory.clear()

doc

相關文章
相關標籤/搜索