rocketmq源碼解析之NamesrvController啓動②建立mqclient①

說在前面java

接上次apache

 

源碼解析json

返回到這個方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#startbootstrap

@Override
    public void  start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST://服務只啓動,不建立
                this.serviceState = ServiceState.START_FAILED;
                this.defaultMQAdminExt.changeInstanceNameToPID();
//                建立mqclient對象 =》
                this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
//                註冊管理服務處理器=》
                boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
                        + "] has created already, specifed another name please."
                        + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
                }

//                啓動mqclient =》
                mqClientInstance.start();
                log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The AdminExt service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
            default:
                break;
        }
    }

進入這個方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)建立mqclient對象數組

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
//       從本地緩存中獲取client對象,簡單的通常會concurrentHashMap當本地緩存,性能很高
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

進入到這個方法org.apache.rocketmq.client.impl.factory.MQClientInstance#registerAdminExt註冊管理服務緩存

public boolean registerAdminExt(final String group, final MQAdminExtInner admin) {
    if (null == group || null == admin) {
        return false;
    }

    MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
    if (prev != null) {
        log.warn("the admin group[{}] exist already.", group);
        return false;
    }

    return true;
}

進入這個方法啓動mqclient,org.apache.rocketmq.client.impl.factory.MQClientInstance#start微信

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST://僅建立不啓動
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server 若是啓動的時候命令行沒有指定name server的地址,就去獲取
                    if (null == this.clientConfig.getNamesrvAddr()) {
//                        監測鏈接是否可用
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel 啓動請求響應的channel =》
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks 啓動調度任務
                    this.startScheduledTask();
                    // Start pull service 啓動pull服務
                    this.pullMessageService.start();
                    // Start rebalance service 啓動負載均衡服務
                    this.rebalanceService.start();
                    // Start push service 啓動push服務
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

啓動mqclient,進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#start負載均衡

@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });
        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
//                設置請求、響應消息大小值默認是65535
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
//                       添加事件組
                        defaultEventExecutorGroup,
//                        註冊netty編碼器 =》
                        new NettyEncoder(),
//                        註冊netty解碼器=》
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
//                        netty鏈接管理handler=》
                        new NettyConnectManageHandler(),
//                        註冊netty client handler=》
                        new NettyClientHandler());
                }
            });
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
//                    掃描廢棄的請求=》
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

netty編碼器ide

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
//            對消息頭進行編碼,rocketmq只對消息頭進行了編碼
            ByteBuffer header = remotingCommand.encodeHeader();
//            往buf中寫消息頭
            out.writeBytes(header);
//            獲取消息體
            byte[] body = remotingCommand.getBody();
            if (body != null) {
//                寫消息體
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
//            出現異常關閉channel
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

對消息頭進行編碼,進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#encodeHeader(int)oop

public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size 消息頭長度
        int length = 4;
        // 2> header data length
        byte[] headerData;
//        消息頭數據編碼
        headerData = this.headerEncode();
        length += headerData.length;
        // 3> body data length
        length += bodyLength;
//        分配緩衝區
        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
        // length
        result.putInt(length);
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        // header data
        result.put(headerData);
        result.flip();
        return result;
    }

進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerEncode

private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
//            mq代理編碼
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
//            json編碼
            return RemotingSerializable.encode(this);
        }
    }

進入這個方法mq協議編碼org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolEncode

public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
        // String remark
        byte[] remarkBytes = null;
        int remarkLen = 0;
        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
            remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
            remarkLen = remarkBytes.length;
        }

        // HashMap<String, String> extFields
        byte[] extFieldsBytes = null;
        int extLen = 0;
        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
//            map形式的參數序列化
            extFieldsBytes = mapSerialize(cmd.getExtFields());
            extLen = extFieldsBytes.length;
        }

//        計算總長
        int totalLen = calTotalLen(remarkLen, extLen);
//        分配緩衝區
        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
        // int code(~32767)
        headerBuffer.putShort((short) cmd.getCode());
        // LanguageCode language
        headerBuffer.put(cmd.getLanguage().getCode());
        // int version(~32767)
        headerBuffer.putShort((short) cmd.getVersion());
        // int opaque
        headerBuffer.putInt(cmd.getOpaque());
        // int flag
        headerBuffer.putInt(cmd.getFlag());
        // String remark
        if (remarkBytes != null) {
            headerBuffer.putInt(remarkBytes.length);
            headerBuffer.put(remarkBytes);
        } else {
            headerBuffer.putInt(0);
        }
        // HashMap<String, String> extFields;
        if (extFieldsBytes != null) {
            headerBuffer.putInt(extFieldsBytes.length);
            headerBuffer.put(extFieldsBytes);
        } else {
            headerBuffer.putInt(0);
        }

        return headerBuffer.array();
    }

建立netty解碼器

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    private static final int FRAME_MAX_LENGTH =
        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
//            這裏經過netty LengthFieldBasedFrameDecoder解決粘包問題,能夠方便在消息頭中定義一些規則,根據必定的規則進行解碼,好比總消息長度、編解碼方式
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();
            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

進入這個方法,消息解碼org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(java.nio.ByteBuffer)

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
//        源消息頭長度
        int oriHeaderLen = byteBuffer.getInt();
//        消息頭長度
        int headerLength = getHeaderLength(oriHeaderLen);
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
//        根據消息頭中傳入的序列化類型解碼 =》
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;
        return cmd;
    }

消息頭解碼,進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerDecode

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
        case JSON: //header json形式解碼
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ: //mq代理反序列化
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }

    return null;
}

mq協議解碼,進入這個方法org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolDecode

public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
        RemotingCommand cmd = new RemotingCommand();
//        把消息頭byte數組包裝成緩衝區
        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
        // int code(~32767)
        cmd.setCode(headerBuffer.getShort());
        // LanguageCode language
        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
        // int version(~32767)
        cmd.setVersion(headerBuffer.getShort());
        // int opaque
        cmd.setOpaque(headerBuffer.getInt());
        // int flag
        cmd.setFlag(headerBuffer.getInt());
        // String remark
        int remarkLength = headerBuffer.getInt();
        if (remarkLength > 0) {
            byte[] remarkContent = new byte[remarkLength];
            headerBuffer.get(remarkContent);
            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
        }

        // HashMap<String, String> extFields
        int extFieldsLength = headerBuffer.getInt();
        if (extFieldsLength > 0) {
            byte[] extFieldsBytes = new byte[extFieldsLength];
            headerBuffer.get(extFieldsBytes);
//            map形式數據反序列化
            cmd.setExtFields(mapDeserialize(extFieldsBytes));
        }
        return cmd;
    }

未完待續。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索