ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、權限、角色等概念,那在RocketMQ中上述會對應哪些對象呢?java
另外,RocketMQ還支持按照客戶端IP進行白名單設置。數組
在講解如何使用ACL以前,咱們先簡單看一下RocketMQ ACL的請求流程: bash
對於上述具體的實現,將在後續文章中重點講解,本文的目的只是但願給讀者一個大概的瞭解。acl默認的配置文件名:plain_acl.yml,須要放在${ROCKETMQ_HOME}/store/config目錄下。下面對其配置項一一介紹。ide
全局白名單,其類型爲數組,即支持多個配置。其支持的配置格式以下:spa
配置用戶信息,該類型爲數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。3d
登陸用戶名,長度必須大於6個字符。code
登陸密碼。長度必須大於6個字符。cdn
用戶級別的IP地址白名單。其類型爲一個字符串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。中間件
boolean類型,設置是不是admin。以下權限只有admin=true時纔有權限執行。對象
默認topic權限。該值默認爲DENY(拒絕)。
默認消費組權限,該值默認爲DENY(拒絕),建議值爲SUB。
設置topic的權限。其類型爲數組,其可選擇值在下節介紹。
設置消費組的權限。其類型爲數組,其可選擇值在下節介紹。能夠爲每一消費組配置不同的權限。
上面定義了全局白名單、用戶級別的白名單,用戶級別的權限,爲了更好的配置ACL權限規則,下面給出權限匹配邏輯。
首先,須要在broker.conf文件中,增長參數aclEnable=true。並拷貝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目錄。
broker.conf的配置文件以下:
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10915
storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
aclEnable=true
複製代碼
plain_acl.yml文件內容以下:
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
# the group should convert to retry topic
- oms_consumer_group=DENY
- accessKey: admin
secretKey: 12345678
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
複製代碼
從上面的配置可知,用戶RocketMQ只能發送TopicTest的消息,其餘topic無權限發送;拒絕oms_consumer_group消費組的消息消費,其餘消費組默承認消費。
public class AclProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}
}
複製代碼
運行效果如圖所示:
public class AclConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}
}
複製代碼
發現並不沒有消費消息,符合預期。
關於RocketMQ ACL的使用就介紹到這裏了,下一篇將介紹RocketMQ ACL實現原理。
做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。