在上文中介紹瞭如何搭建一個Kafka服務,那麼在開發中咱們要如何去訪問、集成Kafka呢?這就須要使用到本文將要介紹的Kafka客戶端API。下圖是官方文檔中的一個圖,形象表示了能與Kafka集成的客戶端類型:java
這些客戶端經過API與Kafka進行集成,Kafka的五類客戶端API類型以下:web
在接下來的篇章中將會演示AdminClient API的具體使用,其他的API則會在後續的文章中進行介紹。首先,咱們在IDEA中建立一個Spring Boot工程,該工程的pom.xml
文件內容以下:spring
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zj.study</groupId> <artifactId>kafka-study</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-study</name> <description>Kafka study project for Spring Boot</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Kafka 客戶端依賴 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
經常使用的AdminClient API對象以下:apache
顯然,操做AdminClient API的前提是須要建立一個AdminClient
實例。代碼示例:api
/** * 配置並建立AdminClient */ public static AdminClient adminClient() { Properties properties = new Properties(); // 配置Kafka服務的訪問地址及端口號 properties.setProperty(AdminClientConfig. BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 建立AdminClient實例 return AdminClient.create(properties); }
建立了AdminClient
的實例對象後,咱們就能夠經過它提供的方法操做Kafka,經常使用的方法以下:bash
方法名稱 | 做用 |
---|---|
createTopics |
建立一個或多個Topic |
listTopics |
查詢Topic列表 |
deleteTopics |
刪除一個或多個Topic |
describeTopics |
查詢Topic的描述信息 |
describeConfigs |
查詢Topic、Broker等的全部配置項信息 |
alterConfigs |
用於修改Topic、Broker等的配置項信息(該方法在新版本中被標記爲已過時) |
incrementalAlterConfigs |
一樣也是用於修改Topic、Broker等的配置項信息,但功能更多、更靈活,用於代替alterConfigs |
createPartitions |
用於調整Topic的Partition數量,只能增長不能減小或刪除,也就是說新設置的Partition數量必須大於等於以前的Partition數量 |
Tips:maven
describeTopics
和describeConfigs
的意義主要是在監控上,不少用於監控Kafka的組件都會使用到這兩個API,由於經過這兩個API能夠獲取到Topic自身和周邊的詳細信息使用createTopics
方法能夠建立Topic,傳入的參數也與kafka-topics.sh
命令腳本的參數同樣。代碼示例:ide
/** * 建立topic */ public static void createTopic() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); // topic的名稱 String name = "MyTopic3"; // partition數量 int numPartitions = 1; // 副本數量 short replicationFactor = 1; NewTopic topic = new NewTopic(name, numPartitions, replicationFactor); CreateTopicsResult result = adminClient.createTopics(List.of(topic)); // 避免客戶端鏈接太快斷開而致使Topic沒有建立成功 Thread.sleep(500); // 獲取topic設置的partition數量 System.out.println(result.numPartitions(name).get()); }
listTopics
方法用於查詢Topic列表,經過傳入ListTopicsOptions
參數能夠設置一些可選項。代碼示例:spring-boot
/** * 查詢Topic列表 */ public static void topicLists() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); ListTopicsResult result1 = adminClient.listTopics(); // 打印Topic的名稱 System.out.println(result1.names().get()); // 打印Topic的信息 System.out.println(result1.listings().get()); ListTopicsOptions options = new ListTopicsOptions(); // 是否列出內部使用的Topic options.listInternal(true); ListTopicsResult result2 = adminClient.listTopics(options); System.out.println(result2.names().get()); }
關於listInternal
選項:性能
listInternal
選項在Kafka 0.x的版本里是沒有的,由於在0.x版本中Kafka是將consumer的offset信息存儲在Zookeeper裏,但因爲Zookeeper同步consumer的offset信息比較慢,因而在1.x後就遷移到Kafka的Topic中進行存儲了,這也是爲了提升吞吐量和性能
deleteTopics
方法能夠刪除一個或多個Topic,代碼示例:
/** * 刪除Topic */ public static void delTopics() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); DeleteTopicsResult result = adminClient.deleteTopics(List.of("MyTopic1")); System.out.println(result.all().get()); }
一個Topic會有自身的描述信息,例如:partition
的數量,副本集的數量,是否爲internal
等等。AdminClient
中提供了describeTopics
方法來查詢這些描述信息。代碼示例:
/** * 查詢Topic的描述信息 */ public static void describeTopics() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); DescribeTopicsResult result = adminClient.describeTopics(List.of("MyTopic")); Map<String, TopicDescription> descriptionMap = result.all().get(); descriptionMap.forEach((key, value) -> System.out.println("name: " + key + ", desc: " + value)); }
輸出的內容以下:
name: MyTopic, desc: (name=MyTopic, internal=false, partitions=(partition=0, leader=127.0.0.1:9092 (id: 0 rack: null), replicas=127.0.0.1:9092 (id: 0 rack: null), isr=127.0.0.1:9092 (id: 0 rack: null)), authorizedOperations=null)
除了Kafka自身的配置項外,其內部的Topic也會有很是多的配置項,咱們能夠經過describeConfigs
方法來獲取某個Topic中的配置項信息。代碼示例:
/** * 查詢Topic的配置信息 */ public static void describeConfig() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); ConfigResource configResource = new ConfigResource( ConfigResource.Type.TOPIC, "MyTopic" ); DescribeConfigsResult result = adminClient.describeConfigs(List.of(configResource)); Map<ConfigResource, Config> map = result.all().get(); map.forEach((key, value) -> System.out.println("name: " + key.name() + ", desc: " + value)); }
輸出的內容以下,會輸出全部的配置信息,內容比較多:
name: ConfigResource(type=TOPIC, name='MyTopic'), desc: Config(entries=[ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.5-IV0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
除了能夠查看Topic的配置項信息外,AdminClient
還提供了相關方法來修改Topic配置項的值。在早期版本中,使用alterConfigs
方法來修改配置項。代碼示例:
/** * 修改Topic的配置信息 */ public static void alterConfig() throws Exception { // 指定ConfigResource的類型及名稱 ConfigResource configResource = new ConfigResource( ConfigResource.Type.TOPIC, "MyTopic" ); // 配置項以ConfigEntry形式存在 Config config = new Config(List.of( new ConfigEntry("preallocate", "true") )); AdminClient adminClient = adminClient(); Map<ConfigResource, Config> configMaps = new HashMap<>(); configMaps.put(configResource, config); AlterConfigsResult result = adminClient.alterConfigs(configMaps); System.out.println(result.all().get()); } public static void main(String[] args) throws Exception { alterConfig(); describeConfig(); }
執行以上代碼,控制檯輸出以下,能夠看到成功將preallocate
配置項的值改成了true
:
在新版本中則是使用incrementalAlterConfigs
方法來修改Topic的配置項,該方法使用起來相對於alterConfigs
要略微複雜一些,但所以功能更多、更靈活。代碼示例:
/** * 修改Topic的配置信息 */ public static void incrementalAlterConfig() throws Exception { // 指定ConfigResource的類型及名稱 ConfigResource configResource = new ConfigResource( ConfigResource.Type.TOPIC, "MyTopic" ); // 配置項一樣以ConfigEntry形式存在,只不過增長了操做類型 // 以及可以支持操做多個配置項,相對來講功能更多、更靈活 Collection<AlterConfigOp> configs = List.of( new AlterConfigOp( new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET ) ); AdminClient adminClient = adminClient(); Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>(); configMaps.put(configResource, configs); AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps); System.out.println(result.all().get()); } public static void main(String[] args) throws Exception { incrementalAlterConfig(); describeConfig(); }
incrementalAlterConfigs
方法可能會存在些問題,對單實例的Kafka支持得不是很好,會出現沒法成功修改配置項的狀況,此時就可使用alterConfigs
方法來代替,這也是爲何這裏要介紹兩種方法的使用方式執行以上代碼,控制檯輸出以下,能夠看到成功將preallocate
配置項的值改成了false
:
在建立Topic時咱們須要設定Partition的數量,但若是以爲初始設置的Partition數量太少了,那麼就可使用createPartitions
方法來調整Topic的Partition數量,可是須要注意在Kafka中Partition只能增長不能減小。代碼示例:
/** * 增長Partition數量,目前Kafka不支持刪除或減小Partition */ public static void incrPartitions() throws ExecutionException, InterruptedException { AdminClient adminClient = adminClient(); Map<String, NewPartitions> newPartitions = new HashMap<>(); // 將MyTopic的Partition數量調整爲2 newPartitions.put("MyTopic", NewPartitions.increaseTo(2)); CreatePartitionsResult result = adminClient.createPartitions(newPartitions); System.out.println(result.all().get()); } public static void main(String[] args) throws Exception { incrPartitions(); describeTopics(); }
執行以上代碼,控制檯輸出以下,能夠看到成功爲該Topic增長了一個Partition:
partition=0
,第二個partition=1