使用Java API建立(create),查看(describe),列舉(list),刪除(delete)Kafka主題(Topic)--轉載

原文:http://blog.csdn.net/changong28/article/details/39325079java

使用Kafka的同窗都知道,咱們每次建立Kafka主題(Topic)的時候能夠指定分區數和副本數等信息,若是將這些屬性配置到server.properties文件中,之後調用Java API生成的主題將使用默認值,先改變須要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,咱們也但願將此過程在Producer調用以前經過API的方式進行設定,無需在以前或以後使用腳本進行操做,因此才了這篇文章。查看源碼發現,其實內部全部的實現都是經過TopicCommand的main方法,在此記錄兩種方式:spa

一、建立主題(Topic).net

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=ycode

【JAVA API方式】:server

  1. String[] options = new String[]{  
  2.     "--create",  
  3.     "--zookeeper",  
  4.     "zk_host:port/chroot",  
  5.     "--partitions",  
  6.     "20",  
  7.     "--topic",  
  8.     "my_topic_name",  
  9.     "--replication-factor",  
  10.     "3",  
  11.     "--config",  
  12.     "x=y"  
  13. };  
  14. TopicCommand.main(options);  

二、查看全部主題blog

 

【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181ci

【JAVA API方式】:文檔

  1. String[] options = new String[]{  
  2.     "--list",  
  3.     "--zookeeper",  
  4.     "localhost:2181"  
  5. };  
  6. TopicCommand.main(options);  


三、查看指定主題:get

 

【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topickafka

【JAVA API方式】: 

  1. String[] options = new String[]{  
  2.     "--describe",  
  3.     "--zookeeper",  
  4.     "localhost:2181",  
  5.     "--topic",  
  6.     "my-replicated-topic",  
  7. };  
  8. TopicCommand.main(options);  


四、修改主題:

 

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:

 
  1. String[] options = new String[]{  
  2.     "--alter",  
  3.     "--zookeeper",  
  4.     "zk_host:port/chroot",  
  5.     "--topic",  
  6.     "my_topic_name",  
  7.     "--deleteConfig",  
  8.     "x"  
  9. };  
  10. TopicCommand.main(options);  



五、刪除出題:

   【命令方式】:無

   【JAVA API方式】:

 
    1. String[] options = new String[]{  
    2.     "--zookeeper",  
    3.     "zk_host:port/chroot",  
    4.     "--topic",  
    5.     "my_topic_name"  
    6. };  
    7. DeleteTopicCommand.main(options);  

另:下文kafka刪除topic的方法(出自 「菜光光的博客」 博客,出處http://caiguangguang.blog.51cto.com/1652935/1548069)

0.8的官方文檔提供了一個刪除topic的命令:

kafka-topics.sh --delete 可是在運行時會報錯找不到這個方法。

kafka-topics.sh最終是運行了kafka.admin.TopicCommand這個類,在0.8的源碼中這個類中沒有找到有delete topic相關的代碼。

在kafka的admin包下,提供了一個DeleteTopicCommand的類,能夠實現刪除topic的功能。 

kafka.admin.DeleteTopicCommand 

其中刪除topic的具體實現代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import  org.I0Itec.zkclient.ZkClient
import  kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
     val topic = options.valueOf(topicOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
     var zkClient: ZkClient =  null
     try  {
       zkClient =  new  ZkClient(zkConnect,  30000 30000 , ZKStringSerializer)
       zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))   //其實最終仍是經過刪除zk裏面對應的路徑來實現刪除topic的功能
       println( "deletion succeeded!" )
     }
     catch  {
       case  e: Throwable =>
         println( "delection failed because of "  + e.getMessage)
         println(Utils.stackTrace(e))
     }
     finally  {
       if  (zkClient !=  null )
         zkClient.close()
     }

由於這個命令只會刪除zk裏面的信息,真實的數據仍是沒有刪除,因此須要登陸各個broker,把對應的topic的分區數據目錄刪除,也可能正由於這一點,delete命令纔沒有集成到kafka.admin.TopicCommand這個類。

相關文章
相關標籤/搜索