seata中netty的使用源碼(二)

此次咱們看的是客戶端部分。bootstrap

1:在客戶端咱們使用的是註解 @GlobalTransactional。會建立代理 GlobalTransactionScanner。在代理的初始化代碼中,會進行TM和RM的初始化,代碼以下:緩存

private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    registerSpringShutdownHook();
}

2:在 TMClient 或者 RMClient 的init 方法裏,會建立 NettyClientBootstrap 實例。在 NettyClientBootstrap 構造過程當中,會建立 Bootstrap 實例,也會建立 NioEventLoopGroup 的客戶端事件選擇器。代碼以下:服務器

public class NettyClientBootstrap implements RemotingBootstrap {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private EventExecutorGroup defaultEventExecutorGroup;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    
    public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
        NettyPoolKey.TransactionRole transactionRole) {
        if (nettyClientConfig == null) {
            nettyClientConfig = new NettyClientConfig();
        }
        this.nettyClientConfig = nettyClientConfig;
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.transactionRole = transactionRole;
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
            new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
            selectorThreadSizeThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
}

3:建立以後,會調用 NettyClientBootstrap 的 start 方法,創建netty的客戶端代碼,以下:app

public void start() {
    this.bootstrap.group(this.eventLoopGroupWorker).channel( //綁定事件選擇器
        nettyClientConfig.getClientChannelClazz()).option( //設置通道類型,默認是NioSocketChannel
        ChannelOption.TCP_NODELAY, true) // TCP不緩存直接發送
        .option(ChannelOption.SO_KEEPALIVE, true) // TCP進行心跳檢測
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 設置鏈接超時時間
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //設置發送緩存區大小
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //設置接受緩衝區大小
          
    bootstrap.handler(new ChannelInitializer<SocketChannel>() { //設置通道處理器
            @Override
       public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(
                    new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //添加通道空閒心跳處理器
                         nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                         nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    .addLast(new ProtocolV1Decoder()) //通道消息解碼處理器
                    .addLast(new ProtocolV1Encoder()); //通道消息編碼處理器
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers); //添加處理器 ClientHandler
                 }
            }
        });
        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started");
      }
}

4:在seata客戶端,使用netty客戶端的時候,使用了池化技術,其工廠類是 NettyPoolableFactory。在 makeObject 方法中去獲取netty的鏈接通道。獲取通道的代碼以下:ide

public Channel getNewChannel(InetSocketAddress address) {
    Channel channel;
    ChannelFuture f = this.bootstrap.connect(address); // 鏈接netty服務器
    try {
        f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 等待鏈接完成
        if (f.isCancelled()) {
            throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
        } else if (!f.isSuccess()) {
            throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
        } else {
            channel = f.channel(); //獲取通道
        }
    } catch (Exception e) {
        throw new FrameworkException(e, "can not connect to services-server.");
    }
    return channel;
}

5:發送消息的示例代碼(這是須要獲取返回值的狀況,若是不須要獲取返回值,直接調用 channel.writeAndFlush()便可):oop

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                messageFuture1.setResultMessage(future.cause());
            }
            destroyChannel(future.channel());
        }
    });
    try {
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        return result;
    } catch (Exception exx) {
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw new RuntimeException(exx);
        }
    }
}
相關文章
相關標籤/搜索