原文:http://blog.csdn.net/changong28/article/details/39325079java
使用Kafka的同窗都知道,咱們每次建立Kafka主題(Topic)的時候能夠指定分區數和副本數等信息,若是將這些屬性配置到server.properties文件中,之後調用Java API生成的主題將使用默認值,先改變須要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,咱們也但願將此過程在Producer調用以前經過API的方式進行設定,無需在以前或以後使用腳本進行操做,因此才了這篇文章。查看源碼發現,其實內部全部的實現都是經過TopicCommand的main方法,在此記錄兩種方式:spa
一、建立主題(Topic).net
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=ycode
【JAVA API方式】:server
二、查看全部主題blog
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181ci
【JAVA API方式】:文檔
三、查看指定主題:get
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topickafka
【JAVA API方式】:
四、修改主題:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
五、刪除出題:
【命令方式】:無
【JAVA API方式】:
另:下文kafka刪除topic的方法(出自 「菜光光的博客」 博客,出處http://caiguangguang.blog.51cto.com/1652935/1548069)
0.8的官方文檔提供了一個刪除topic的命令:
kafka-topics.sh --delete 可是在運行時會報錯找不到這個方法。
kafka-topics.sh最終是運行了kafka.admin.TopicCommand這個類,在0.8的源碼中這個類中沒有找到有delete topic相關的代碼。
在kafka的admin包下,提供了一個DeleteTopicCommand的類,能夠實現刪除topic的功能。
kafka.admin.DeleteTopicCommand
其中刪除topic的具體實現代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import
org.I0Itec.zkclient.ZkClient
import
kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient =
null
try
{
zkClient =
new
ZkClient(zkConnect,
30000
,
30000
, ZKStringSerializer)
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
//其實最終仍是經過刪除zk裏面對應的路徑來實現刪除topic的功能
println(
"deletion succeeded!"
)
}
catch
{
case
e: Throwable =>
println(
"delection failed because of "
+ e.getMessage)
println(Utils.stackTrace(e))
}
finally
{
if
(zkClient !=
null
)
zkClient.close()
}
|
由於這個命令只會刪除zk裏面的信息,真實的數據仍是沒有刪除,因此須要登陸各個broker,把對應的topic的分區數據目錄刪除,也可能正由於這一點,delete命令纔沒有集成到kafka.admin.TopicCommand這個類。