此次咱們看的是客戶端部分。bootstrap
1:在客戶端咱們使用的是註解 @GlobalTransactional。會建立代理 GlobalTransactionScanner。在代理的初始化代碼中,會進行TM和RM的初始化,代碼以下:緩存
private void initClient() { if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook(); }
2:在 TMClient 或者 RMClient 的init 方法裏,會建立 NettyClientBootstrap 實例。在 NettyClientBootstrap 構造過程當中,會建立 Bootstrap 實例,也會建立 NioEventLoopGroup 的客戶端事件選擇器。代碼以下:服務器
public class NettyClientBootstrap implements RemotingBootstrap { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class); private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private EventExecutorGroup defaultEventExecutorGroup; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) { if (nettyClientConfig == null) { nettyClientConfig = new NettyClientConfig(); } this.nettyClientConfig = nettyClientConfig; int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize(); this.transactionRole = transactionRole; this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize)); this.defaultEventExecutorGroup = eventExecutorGroup; }
3:建立以後,會調用 NettyClientBootstrap 的 start 方法,創建netty的客戶端代碼,以下:app
public void start() { this.bootstrap.group(this.eventLoopGroupWorker).channel( //綁定事件選擇器 nettyClientConfig.getClientChannelClazz()).option( //設置通道類型,默認是NioSocketChannel ChannelOption.TCP_NODELAY, true) // TCP不緩存直接發送 .option(ChannelOption.SO_KEEPALIVE, true) // TCP進行心跳檢測 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 設置鏈接超時時間 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //設置發送緩存區大小 .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //設置接受緩衝區大小 bootstrap.handler(new ChannelInitializer<SocketChannel>() { //設置通道處理器 @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //添加通道空閒心跳處理器 nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) .addLast(new ProtocolV1Decoder()) //通道消息解碼處理器 .addLast(new ProtocolV1Encoder()); //通道消息編碼處理器 if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); //添加處理器 ClientHandler } } }); if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) { LOGGER.info("NettyClientBootstrap has started"); } }
4:在seata客戶端,使用netty客戶端的時候,使用了池化技術,其工廠類是 NettyPoolableFactory。在 makeObject 方法中去獲取netty的鏈接通道。獲取通道的代碼以下:ide
public Channel getNewChannel(InetSocketAddress address) { Channel channel; ChannelFuture f = this.bootstrap.connect(address); // 鏈接netty服務器 try { f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 等待鏈接完成 if (f.isCancelled()) { throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server."); } else if (!f.isSuccess()) { throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server."); } else { channel = f.channel(); //獲取通道 } } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } return channel; }
5:發送消息的示例代碼(這是須要獲取返回值的狀況,若是不須要獲取返回值,直接調用 channel.writeAndFlush()便可):oop
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); String remoteAddr = ChannelUtil.getAddressFromChannel(channel); doBeforeRpcHooks(remoteAddr, rpcMessage); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); doAfterRpcHooks(remoteAddr, rpcMessage, result); return result; } catch (Exception exx) { if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } }