說在前面java
接上次apache
源碼解析json
返回到這個方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#startbootstrap
@Override public void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST://服務只啓動,不建立 this.serviceState = ServiceState.START_FAILED; this.defaultMQAdminExt.changeInstanceNameToPID(); // 建立mqclient對象 =》 this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); // 註冊管理服務處理器=》 boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + "] has created already, specifed another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 啓動mqclient =》 mqClientInstance.start(); log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The AdminExt service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
進入這個方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)建立mqclient對象數組
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); // 從本地緩存中獲取client對象,簡單的通常會concurrentHashMap當本地緩存,性能很高 MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
進入到這個方法org.apache.rocketmq.client.impl.factory.MQClientInstance#registerAdminExt註冊管理服務緩存
public boolean registerAdminExt(final String group, final MQAdminExtInner admin) { if (null == group || null == admin) { return false; } MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin); if (prev != null) { log.warn("the admin group[{}] exist already.", group); return false; } return true; }
進入這個方法啓動mqclient,org.apache.rocketmq.client.impl.factory.MQClientInstance#start微信
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST://僅建立不啓動 this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server 若是啓動的時候命令行沒有指定name server的地址,就去獲取 if (null == this.clientConfig.getNamesrvAddr()) { // 監測鏈接是否可用 this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel 啓動請求響應的channel =》 this.mQClientAPIImpl.start(); // Start various schedule tasks 啓動調度任務 this.startScheduledTask(); // Start pull service 啓動pull服務 this.pullMessageService.start(); // Start rebalance service 啓動負載均衡服務 this.rebalanceService.start(); // Start push service 啓動push服務 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
啓動mqclient,進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#start負載均衡
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 設置請求、響應消息大小值默認是65535 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( // 添加事件組 defaultEventExecutorGroup, // 註冊netty編碼器 =》 new NettyEncoder(), // 註冊netty解碼器=》 new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), // netty鏈接管理handler=》 new NettyConnectManageHandler(), // 註冊netty client handler=》 new NettyClientHandler()); } }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 掃描廢棄的請求=》 NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
netty編碼器ide
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { // 對消息頭進行編碼,rocketmq只對消息頭進行了編碼 ByteBuffer header = remotingCommand.encodeHeader(); // 往buf中寫消息頭 out.writeBytes(header); // 獲取消息體 byte[] body = remotingCommand.getBody(); if (body != null) { // 寫消息體 out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } // 出現異常關閉channel RemotingUtil.closeChannel(ctx.channel()); } } }
對消息頭進行編碼,進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#encodeHeader(int)oop
public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size 消息頭長度 int length = 4; // 2> header data length byte[] headerData; // 消息頭數據編碼 headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; // 分配緩衝區 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerEncode
private byte[] headerEncode() { this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { // mq代理編碼 return RocketMQSerializable.rocketMQProtocolEncode(this); } else { // json編碼 return RemotingSerializable.encode(this); } }
進入這個方法mq協議編碼org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolEncode
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap<String, String> extFields byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { // map形式的參數序列化 extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } // 計算總長 int totalLen = calTotalLen(remarkLen, extLen); // 分配緩衝區 ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap<String, String> extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); }
建立netty解碼器
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { // 這裏經過netty LengthFieldBasedFrameDecoder解決粘包問題,能夠方便在消息頭中定義一些規則,根據必定的規則進行解碼,好比總消息長度、編解碼方式 frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; } }
進入這個方法,消息解碼org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(java.nio.ByteBuffer)
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); // 源消息頭長度 int oriHeaderLen = byteBuffer.getInt(); // 消息頭長度 int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); // 根據消息頭中傳入的序列化類型解碼 =》 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
消息頭解碼,進入這個方法org.apache.rocketmq.remoting.protocol.RemotingCommand#headerDecode
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: //header json形式解碼 RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: //mq代理反序列化 RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
mq協議解碼,進入這個方法org.apache.rocketmq.remoting.protocol.RocketMQSerializable#rocketMQProtocolDecode
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { RemotingCommand cmd = new RemotingCommand(); // 把消息頭byte數組包裝成緩衝區 ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap<String, String> extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); // map形式數據反序列化 cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }
未完待續。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣