以前寫過如何用服務器端的API代碼來獲取訂閱某topic的全部consumer group,參見這裏。使用服務器端的API須要用到kafka.admin.AdminClient類,可是這個類在0.11.0.0版本已經被標記爲不推薦使用了,故目前最合適的方式仍是經過客戶端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人問這個問題,我就嘗試寫了一個。使用以前你須要引入kafka client包依賴(以2.2.0版本爲例)html
Maven:apache
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>安全
Gradle:服務器
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.0'函數
下面是代碼:spa
1 private static List<String> getGroupsForTopic(String brokerServers, String topic) 2 throws ExecutionException, InterruptedException, TimeoutException { 3 Properties props = new Properties(); 4 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers); 5
6 try (AdminClient client = AdminClient.create(props)) { 7 List<String> allGroups = client.listConsumerGroups() 8 .valid() 9 .get(10, TimeUnit.SECONDS) 10 .stream() 11 .map(ConsumerGroupListing::groupId) 12 .collect(Collectors.toList()); 13
14 Map<String, ConsumerGroupDescription> allGroupDetails =
15 client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS); 16
17 final List<String> filteredGroups = new ArrayList<>(); 18 allGroupDetails.entrySet().forEach(entry -> { 19 String groupId = entry.getKey(); 20 ConsumerGroupDescription description = entry.getValue(); 21 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment) 22 .map(MemberAssignment::topicPartitions) 23 .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet())) 24 .anyMatch(tps -> tps.contains(topic)); 25 if (topicSubscribed) 26 filteredGroups.add(groupId); 27 }); 28 return filteredGroups; 29 } 30 }
我會假設你的集羣中沒有配置安全認證和受權機制或者發起此AdminClient的用戶是合法用戶且有CLUSTER以及GROUP的DESCRIBE權限。code
另外值得注意的是,上面這個函數沒法獲取非運行中的consumer group,即雖然一個group訂閱了某topic,可是若它全部的consumer成員都關閉的話這個函數是不會返回該group的。htm