本文主要研究一下storm client的netty buffer watermarkhtml
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.javajava
/** * Netty based messaging: The netty write buffer high watermark in bytes. * <p> * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to * return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK * low water mark}. * </p> */ @isInteger @isPositiveNumber public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark"; /** * Netty based messaging: The netty write buffer low watermark in bytes. * <p> * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water * mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true. * </p> */ @isInteger @isPositiveNumber public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
# The netty write buffer high watermark in bytes. # If the number of bytes queued in the netty's write buffer exceeds this value, the netty client will block # until the value falls below the low water mark. storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB # The netty write buffer low watermark in bytes. # Once the number of bytes queued in the write buffer exceeded the high water mark and then # dropped down below this value, any blocked clients will unblock and start processing further messages. storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
在defaults.yaml文件中,low.watermark默認大小爲8388608,即8M;high.watermark默認大小爲16777216,即16M
storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.javaapache
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host, int port) { this.topoConf = topoConf; closing = false; this.scheduler = scheduler; int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK)); int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK)); // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false)); LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}", host, port, bufferSize, lowWatermark, highWatermark); int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1); // Initiate connection to remote destination this.eventLoopGroup = eventLoopGroup; // Initiate connection to remote destination bootstrap = new Bootstrap() .group(this.eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_SNDBUF, bufferSize) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark)) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf)); dstAddress = new InetSocketAddress(host, port); dstAddressPrefixedName = prefixedName(dstAddress); launchChannelAliveThread(); scheduleConnect(NO_DELAY_MS); int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); batcher = new MessageBuffer(messageBatchSize); String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY); if (clazz == null) { waitStrategy = new WaitStrategyProgressive(); } else { waitStrategy = ReflectionUtils.newInstance(clazz); } waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); }
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/WriteBufferWaterMark.javabootstrap
/** * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark}, {@link Channel#isWritable()} * will start to return {@code false}. * <p> * If the number of bytes queued in the write buffer exceeds the * {@linkplain #high high water mark} and then * dropped down below the {@linkplain #low low water mark}, * {@link Channel#isWritable()} will start to return * {@code true} again. */ public final class WriteBufferWaterMark { private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024; private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024; public static final WriteBufferWaterMark DEFAULT = new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false); private final int low; private final int high; /** * Create a new instance. * * @param low low water mark for write buffer. * @param high high water mark for write buffer */ public WriteBufferWaterMark(int low, int high) { this(low, high, true); } /** * This constructor is needed to keep backward-compatibility. */ WriteBufferWaterMark(int low, int high, boolean validate) { if (validate) { if (low < 0) { throw new IllegalArgumentException("write buffer's low water mark must be >= 0"); } if (high < low) { throw new IllegalArgumentException( "write buffer's high water mark cannot be less than " + " low water mark (" + low + "): " + high); } } this.low = low; this.high = high; } /** * Returns the low water mark for the write buffer. */ public int low() { return low; } /** * Returns the high water mark for the write buffer. */ public int high() { return high; } @Override public String toString() { StringBuilder builder = new StringBuilder(55) .append("WriteBufferWaterMark(low: ") .append(low) .append(", high: ") .append(high) .append(")"); return builder.toString(); } }
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.javaapi
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); private volatile int unwritable; /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. */ public long bytesBeforeWritable() { long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark(); // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? 0 : bytes; } return 0; } /** * Decrement the pending bytes which will be written at some point. * This method is thread-safe! */ void decrementPendingOutboundBytes(long size) { decrementPendingOutboundBytes(size, true, true); } private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } } private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.javaapp
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); private volatile int unwritable; /** * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did * not exceed the write watermark of the {@link Channel} and * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to * {@code false}. */ public boolean isWritable() { return unwritable == 0; } /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. */ public long bytesBeforeUnwritable() { long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize; // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability. // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized // together. totalPendingSize will be updated before isWritable(). if (bytes > 0) { return isWritable() ? bytes : 0; } return 0; } /** * Increment the pending bytes which will be written at some point. * This method is thread-safe! */ void incrementPendingOutboundBytes(long size) { incrementPendingOutboundBytes(size, true); } private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } } private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } } private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }
默認16M
)以及storm.messaging.netty.buffer.low.watermark(默認8M
)其實配置的是netty的ChannelOption.WRITE_BUFFER_WATER_MARK目前來看這兩個方法貌似調用的比較少
)目前應該是這兩個方法起做用
),當小於lowWatermark或者大於highWatermark的時候,分別觸發setWritable及setUnwritable,更改ChannelOutboundBuffer的unwritable字段,進而影響isWritable方法;在isWritable爲true的時候會立馬執行寫請求,當返回false的時候,寫請求會被放入隊列等待isWritable爲true時才能執行這些堆積的寫請求