說在前面apache
請求處理 檢查事務狀態bootstrap
源碼解析緩存
進入這個方法,檢查事務的狀態,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)微信
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {// 按組選擇producerMQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {log.warn("checkTransactionState, pick producer group failed");}} else {log.warn("checkTransactionState, decode message failed");}return null; }
進入這個方法, 按組選擇producer,org.apache.rocketmq.client.impl.factory.MQClientInstance.selectProducer(String)app
public MQProducerInner selectProducer(final String group) {return this.producerTable.get(group);org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader) }
往上返回到這個方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)async
@Overridepublic void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {// 獲取事務監聽器TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();if (transactionCheckListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {// 檢查本地事務狀態=》localTransactionState = transactionCheckListener.checkLocalTransaction(message);} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}// 處理事務狀態=》this.processTransactionState(localTransactionState,group,exception);} else {log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);}}//private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {// 單向結束事務=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request); }
進入這個方法,獲取事務監聽器,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkListener()ide
@Overridepublic TransactionListener checkListener() {if (this.defaultMQProducer instanceof TransactionMQProducer) {TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;return producer.getTransactionListener();}return null; }
進入這個方法,檢查本地事務狀態,org.apache.rocketmq.example.transaction.TransactionListenerImpl.checkLocalTransaction(MessageExt)this
@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE; }
往上返回到這個方法,處理事務狀態,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(...).new Runnable() {...}.processTransactionState(LocalTransactionState, String, Throwable).net
private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {// 單向結束事務=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request); }
往上返回到這個方法,單向結束事務,org.apache.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long)debug
public void endTransactionOneway(final String addr,final EndTransactionRequestHeader requestHeader,final String remark,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);request.setRemark(remark);// 單途請求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis); }
進入這個方法,單途請求,org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(String, RemotingCommand, long)
@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 獲取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {// 執行請求執行前的鉤子方法this.rpcHook.doBeforeRequest(addr, request);}// 執行單線請求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);// 異常關閉channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);} }
進入這個方法, 獲取channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(String)
private Channel getAndCreateChannel(final String addr) throws InterruptedException {if (null == addr) {// 獲取和namesrv通訊的channel =》return getAndCreateNameserverChannel();}ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}// 根據地址建立channel=》return this.createChannel(addr); }
進入這個方法,獲取和namesrv通訊的channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel()
private Channel getAndCreateNameserverChannel() throws InterruptedException {String addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}// 從namesrvAddrChoosed中查找namesrv,若是不存在同步輪詢的方式從namesrvAddrList中取final List<String> addrList = this.namesrvAddrList.get();if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}if (addrList != null && !addrList.isEmpty()) {for (int i = 0; i < addrList.size(); i++) {int index = this.namesrvIndex.incrementAndGet();index = Math.abs(index);index = index % addrList.size();String newAddr = addrList.get(index);this.namesrvAddrChoosed.set(newAddr);log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);// 同步建立渠道=》Channel channelNew = this.createChannel(newAddr);if (channelNew != null) {return channelNew;}}}} catch (Exception e) {log.error("getAndCreateNameserverChannel: create name server channel exception", e);} finally {this.lockNamesrvChannel.unlock();}} else {log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}return null; }
進入這個方法,同步建立渠道,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)
private Channel createChannel(final String addr) throws InterruptedException {ChannelWrapper cw = this.channelTables.get(addr);// 代碼走到這裏,這裏的邏輯正常狀況下是走不到的,爲了代碼嚴謹性if (cw != null && cw.isOK()) {cw.getChannel().close();channelTables.remove(addr);}if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {boolean createNewConnection;// 爲了代碼嚴謹性,這裏又作了一次判斷cw = this.channelTables.get(addr);if (cw != null) {if (cw.isOK()) {cw.getChannel().close();this.channelTables.remove(addr);createNewConnection = true;// 若是channel還在用,不讓建立} else if (!cw.getChannelFuture().isDone()) {createNewConnection = false;} else {this.channelTables.remove(addr);createNewConnection = true;}} else {createNewConnection = true;}if (createNewConnection) {// 從新創建鏈接ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);// 創建的channel也放到本次緩存中this.channelTables.put(addr, cw);}} catch (Exception e) {log.error("createChannel: create channel exception", e);} finally {this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {// 對channel再次判斷if (cw.isOK()) {log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());return cw.getChannel();} else {log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());}} else {log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),channelFuture.toString());}}return null; }
往上返回到這個方法, 根據地址建立channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)前面介紹過了。
往上返回到這個方法,執行單線請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeOnewayImpl(Channel, RemotingCommand, long)前面介紹過了。
往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣