本文主要研究一下rocketmq的NettyClientConfigjava
org/apache/rocketmq/remoting/netty/NettyClientConfig.javagit
public class NettyClientConfig { /** * Worker thread number */ private int clientWorkerThreads = 4; private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE; private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE; private int connectTimeoutMillis = 3000; private long channelNotActiveInterval = 1000 * 60; /** * IdleStateEvent will be triggered when neither read nor write was performed for * the specified period of this time. Specify {@code 0} to disable */ private int clientChannelMaxIdleTimeSeconds = 120; private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean clientPooledByteBufAllocatorEnable = false; private boolean clientCloseSocketIfTimeout = false; private boolean useTLS; //...... }
這裏主要有幾個參數:
org/apache/rocketmq/remoting/netty/NettySystemConfig.javagithub
public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = "com.rocketmq.remoting.socket.sndbuf.size"; public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = "com.rocketmq.remoting.socket.rcvbuf.size"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientAsyncSemaphoreValue"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientOnewaySemaphoreValue"; public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); public static int socketSndbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); public static int socketRcvbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); }
org/apache/rocketmq/remoting/netty/NettyRemotingClient.javaapache
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final long LOCK_TIMEOUT_MILLIS = 3000; private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final Timer timer = new Timer("ClientHouseKeepingService", true); private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>(); private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; /** * Invoke the callback methods in this executor when process response. */ private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; private RPCHook rpcHook; public NettyRemotingClient(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, null); } public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } }); if (nettyClientConfig.isUseTLS()) { try { sslContext = TlsHelper.buildSslContext(true); log.info("SSL enabled for client"); } catch (IOException e) { log.error("Failed to create SSLContext", e); } catch (CertificateException e) { log.error("Failed to create SSLContext", e); throw new RuntimeException("Failed to create SSLContext", e); } } } private static int initValueIndex() { Random r = new Random(); return Math.abs(r.nextInt() % 999) % 999; } @Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } } //...... }
rocketmq的NettyClientConfig指定了NettyRemotingClient相關參數,這裏除了Netty相關的,還指定了兩個信號量,一個是clientOnewaySemaphoreValue,一個是clientAsyncSemaphoreValue。clientOnewaySemaphoreValue指定的是invokeOnewayImpl方法的調用控制,clientAsyncSemaphoreValue指定的是invokeAsyncImpl方法的調用頻率。bootstrap