本文主要研究一下rocketmq的TransientStorePooljava
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.javagit
public class TransientStorePool { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final int poolSize; private final int fileSize; private final Deque<ByteBuffer> availableBuffers; private final MessageStoreConfig storeConfig; public TransientStorePool(final MessageStoreConfig storeConfig) { this.storeConfig = storeConfig; this.poolSize = storeConfig.getTransientStorePoolSize(); this.fileSize = storeConfig.getMappedFileSizeCommitLog(); this.availableBuffers = new ConcurrentLinkedDeque<>(); } /** * It's a heavy init method. */ public void init() { for (int i = 0; i < poolSize; i++) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); availableBuffers.offer(byteBuffer); } } public void destroy() { for (ByteBuffer byteBuffer : availableBuffers) { final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize)); } } public void returnBuffer(ByteBuffer byteBuffer) { byteBuffer.position(0); byteBuffer.limit(fileSize); this.availableBuffers.offerFirst(byteBuffer); } public ByteBuffer borrowBuffer() { ByteBuffer buffer = availableBuffers.pollFirst(); if (availableBuffers.size() < poolSize * 0.4) { log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size()); } return buffer; } public int availableBufferNums() { if (storeConfig.isTransientStorePoolEnable()) { return availableBuffers.size(); } return Integer.MAX_VALUE; } }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.javagithub
public class MessageStoreConfig { //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; //The directory in which the commitlog is kept @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; //...... @ImportantField private boolean transientStorePoolEnable = false; //...... /** * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is * ASYNC_FLUSH * * @return <tt>true</tt> or <tt>false</tt> */ public boolean isTransientStorePoolEnable() { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole(); } public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { this.transientStorePoolEnable = transientStorePoolEnable; } //...... }