說在前面apache
管理請求 UNLOCK_BATCH_MQ 批量解鎖消息隊列緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#unlockBatchMQ批量解鎖消息隊列this
private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); // =》 this.brokerController.getRebalanceLockManager().unlockBatch( requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#unlockBatch按消費組、消息隊列集合、clientId解鎖消息隊列code
public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { try { this.lock.lockInterruptibly(); try { // 從緩存中獲取鎖定的消息隊列 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null != groupValue) { for (MessageQueue mq : mqs) { LockEntry lockEntry = groupValue.get(mq); if (null != lockEntry) { if (lockEntry.getClientId().equals(clientId)) { groupValue.remove(mq); log.info("unlockBatch, Group: {} {} {}", group, mq, clientId); } else { log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", lockEntry.getClientId(), group, mq, clientId); } } else { log.warn("unlockBatch, but mq not locked, Group: {} {} {}", group, mq, clientId); } } } else { log.warn("unlockBatch, group not exist, Group: {} {}", group, clientId); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#unlockBatchMQ結束blog
說在最後隊列
本次解析僅表明我的觀點,僅供參考。rem
加入技術微信羣get
釘釘技術羣源碼