說在前面apache
管理請求 LOCK_BATCH_MQ 批量鎖定消息隊列緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#lockBatchMQ批量鎖定消息隊列this
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); // 獲取可鎖定的消息集合=》 Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( requestBody.getConsumerGroup(), requestBody.getMqSet(), requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); responseBody.setLockOKMQSet(lockOKMQSet); response.setBody(responseBody.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch按消費組、消息隊列集合、clientId鎖定消息隊列code
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); for (MessageQueue mq : mqs) { // 當前消息隊列是否鎖定=》 if (this.isLocked(group, mq, clientId)) { lockedMqs.add(mq); } else { notLockedMqs.add(mq); } } if (!notLockedMqs.isEmpty()) { try { this.lock.lockInterruptibly(); try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); // 若是這個組的消息隊列鎖已釋放 if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", group, clientId, mq); } if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); continue; } String oldClientId = lockEntry.getClientId(); if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); lockedMqs.add(mq); continue; } log.warn( "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } return lockedMqs; }
進入這個方法org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#isLocked按消費組、消息隊列、clientId判斷消息隊列是否是已鎖定blog
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { // 按組從緩存中查找到消息隊列集合 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (groupValue != null) { // 找到消息隊列的鎖信息 LockEntry lockEntry = groupValue.get(mq); if (lockEntry != null) { // 若是仍是鎖狀態 boolean locked = lockEntry.isLocked(clientId); if (locked) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); } return locked; } } return false; }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#lockBatchMQ結束隊列
說在最後get
本次解析僅表明我的觀點,僅供參考。源碼
加入技術微信羣消息隊列
釘釘技術羣