rocketmq源碼解析請求處理檢查事務狀態

說在前面apache

請求處理 檢查事務狀態bootstrap

 

源碼解析緩存

進入這個方法,檢查事務的狀態,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)微信

public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {//                按組選擇producerMQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {log.warn("checkTransactionState, pick producer group failed");}} else {log.warn("checkTransactionState, decode message failed");}return null;    }

進入這個方法, 按組選擇producer,org.apache.rocketmq.client.impl.factory.MQClientInstance.selectProducer(String)app

public MQProducerInner selectProducer(final String group) {return this.producerTable.get(group);org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)    }

往上返回到這個方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)async






@Overridepublic void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {//                獲取事務監聽器TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();if (transactionCheckListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {//                        檢查本地事務狀態=》localTransactionState = transactionCheckListener.checkLocalTransaction(message);} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}//                    處理事務狀態=》this.processTransactionState(localTransactionState,group,exception);} else {log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);}}//private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {//                    單向結束事務=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);    }

進入這個方法,獲取事務監聽器,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkListener()ide

@Overridepublic TransactionListener checkListener() {if (this.defaultMQProducer instanceof TransactionMQProducer) {TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;return producer.getTransactionListener();}return null;    }

進入這個方法,檢查本地事務狀態,org.apache.rocketmq.example.transaction.TransactionListenerImpl.checkLocalTransaction(MessageExt)this

@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;    }

往上返回到這個方法,處理事務狀態,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.checkTransactionState(...).new Runnable() {...}.processTransactionState(LocalTransactionState, String, Throwable).net




private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {//                    單向結束事務=》DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);    }

往上返回到這個方法,單向結束事務,org.apache.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long)debug

public void endTransactionOneway(final String addr,final EndTransactionRequestHeader requestHeader,final String remark,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);request.setRemark(remark);//        單途請求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);    }

進入這個方法,單途請求,org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(String, RemotingCommand, long)

@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {//        獲取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {//                    執行請求執行前的鉤子方法this.rpcHook.doBeforeRequest(addr, request);}//               執行單線請求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);//                異常關閉channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}    }

進入這個方法, 獲取channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(String)


private Channel getAndCreateChannel(final String addr) throws InterruptedException {if (null == addr) {//           獲取和namesrv通訊的channel =》return getAndCreateNameserverChannel();}ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}//        根據地址建立channel=》return this.createChannel(addr);    }

進入這個方法,獲取和namesrv通訊的channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel()




private Channel getAndCreateNameserverChannel() throws InterruptedException {String addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}//        從namesrvAddrChoosed中查找namesrv,若是不存在同步輪詢的方式從namesrvAddrList中取final List<String> addrList = this.namesrvAddrList.get();if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {addr = this.namesrvAddrChoosed.get();if (addr != null) {ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}}if (addrList != null && !addrList.isEmpty()) {for (int i = 0; i < addrList.size(); i++) {int index = this.namesrvIndex.incrementAndGet();index = Math.abs(index);index = index % addrList.size();String newAddr = addrList.get(index);this.namesrvAddrChoosed.set(newAddr);log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);//                        同步建立渠道=》Channel channelNew = this.createChannel(newAddr);if (channelNew != null) {return channelNew;}}}} catch (Exception e) {log.error("getAndCreateNameserverChannel: create name server channel exception", e);} finally {this.lockNamesrvChannel.unlock();}} else {log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}return null;    }

進入這個方法,同步建立渠道,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)





private Channel createChannel(final String addr) throws InterruptedException {ChannelWrapper cw = this.channelTables.get(addr);//        代碼走到這裏,這裏的邏輯正常狀況下是走不到的,爲了代碼嚴謹性if (cw != null && cw.isOK()) {cw.getChannel().close();channelTables.remove(addr);}if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {boolean createNewConnection;//                爲了代碼嚴謹性,這裏又作了一次判斷cw = this.channelTables.get(addr);if (cw != null) {if (cw.isOK()) {cw.getChannel().close();this.channelTables.remove(addr);createNewConnection = true;//                        若是channel還在用,不讓建立} else if (!cw.getChannelFuture().isDone()) {createNewConnection = false;} else {this.channelTables.remove(addr);createNewConnection = true;}} else {createNewConnection = true;}if (createNewConnection) {//                    從新創建鏈接ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);//                    創建的channel也放到本次緩存中this.channelTables.put(addr, cw);}} catch (Exception e) {log.error("createChannel: create channel exception", e);} finally {this.lockChannelTables.unlock();}} else {log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);}if (cw != null) {ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {//                對channel再次判斷if (cw.isOK()) {log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());return cw.getChannel();} else {log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());}} else {log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),channelFuture.toString());}}return null;    }

往上返回到這個方法, 根據地址建立channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient.createChannel(String)前面介紹過了。

 

往上返回到這個方法,執行單線請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeOnewayImpl(Channel, RemotingCommand, long)前面介紹過了。

 

往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor.checkTransactionState(ChannelHandlerContext, RemotingCommand)結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索