由Flink與Kafka實踐探究Kafka的兩個問題

筆者在某次實踐過程當中,搭建了一個Flink監控程序,監控wikipedia編輯,對編輯者編輯的字節數進行實時計算,最終把數據sink到kafka的消費者中展現出來,監控程序自己比較簡單,只要在程序中指定好WikipediaEditsSource源並配置好sink與kafka關聯就能夠,相似一個略微複雜版的wordcount,按照網絡上的教程,在實踐的最後,開啓zookeeper服務和kafka服務,接着用bootstrap

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

這條命令建立一個名爲wiki-resulttopic,而後運行監控程序,最後用緩存

kafka-console-consumer --bootstrap-server --zookeeper localhost: 9092--topic wiki-result

啓動消費者,就能夠在終端窗口裏觀察到源源不斷的wikipedia數據網絡

由Flink與Kafka實踐探究Kafka的兩個問題

當筆者次日再次跑這個監控程序時,發現上次執行的命令ide

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

是生產者命令,然而此例中的生產者其實是Fink監控程序,那麼原做者爲什麼使用kafka-console-producer命令去建立topic而不是用kafka-topics命令呢?命令行

kafka-console-producer --topic wiki-result  --broker-list localhost:9092

命令是生產者指定topic,是否自動建立了topic呢?線程

筆者嘗試把現有的topic:wiki-result刪掉,而後從新建立topic,提示以下,並無真正刪除,爲此筆者去查了下相關資料,將topic建立與刪除的原理完全弄懂了。3d

由Flink與Kafka實踐探究Kafka的兩個問題

在 Kafka 中,Topic 是一個存儲消息的邏輯概念,不一樣的topic在物理上來講是分開存儲的,能夠有多個producer向他push消息,也能夠有多個consumer去pull消息,每一個 Topic 能夠劃分多個分區,每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。每一個消息在被添加到分區時,都會被分配一個連續的序列號 offset,它是消息在此分區中的惟一編號,Kafka 經過 offset 保證消息在分區內的順序,offset 的順序不跨分區,即 Kafka 只保證在同一個分區內的消息是有序的。code

由Flink與Kafka實踐探究Kafka的兩個問題

經過命令server

kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test-topic

建立了1個名爲test-topic的topic,擁有1個分區,每一個分區分配2個副本。建立邏輯如圖,總的來講就是後臺邏輯會監聽zookeeper下對應的目錄節點,一旦發起topic建立命令,該命令會建立新的數據節點從而觸發後臺的建立邏輯。對象

由Flink與Kafka實踐探究Kafka的兩個問題

命令行部分比較直白,無非就是一些基本校驗,分配副本(儘量保證分區的副本平均分配到每一個broker上),把分配方案持久化到zookeeper的/brokers/topics/<topic>節點下。

後臺邏輯部分主要由controller負責,controller內部保存了不少信息,其中有一個分區狀態機,用於記錄topic各個分區的狀態。這個狀態機內部註冊了一些zookeeper監聽器。Controller在啓動的時候會建立這些監聽器。其中一個監聽器(TopicChangeListener)就是用於監聽zookeeper的/brokers/topics目錄的子節點變化的。一旦該目錄子節點數發生變化就會調用這個監聽器的處理方法。TopicChangeListener監聽器一方面會更新controller的緩存信息(好比更新集羣當前全部的topic列表以及更新新增topic的分區副本分配方案緩存等),另外一方面就是建立對應的分區及其副本對象併爲每一個分區肯定leader副本及ISR。至此,整個topic的建立就完成了!

除了使用kafka-topics –create建立topic外,還能夠使用kafka-console-producer發佈消息時建立,kafka第一步先獲取topic的leader信息,當發現不可用的時候,在去建立此topic。

Kafka 刪除topic的命令:

kafka-topics.sh --zookeeper localhost:2181 --delete –topic test-topic

然而此命令不能真正刪除topic,只是在zookeeper的/admin/delete_topics下建立一個臨時節點。

Kafka controller在啓動的時候會註冊對於Zookeeper節點/admin/delete_topics的子節點變動監聽器,並建立一個單獨的線程,執行topic刪除的操做,監聽器捕獲到刪除時建立的臨時節點,馬上觸發刪除邏輯,查詢test-topic是否正在被使用,根據其狀態決定是否刪除。

那麼何時線程會真正刪除此topic呢?只有當在server.properties配置了delete.topic.enable=true時並從新啓動Kafka,此Topic纔會被真正刪除。

至此Topic的建立和刪除原理已經清楚了,而對於在實踐過程當中遇到的問題也清晰了。

相關文章
相關標籤/搜索