在 Pulsar 中,Function、Source 和 Sink 都是運行在 Function Worker 上的,關於 Function 的內容能夠參考一篇文章瞭解 Pulsar Functions,關於 Source 和 Sink 的使用能夠參考web
本文介紹了 Functions Worker 的選舉機制。閱讀本文能夠對 Functions Worker 有更加深刻的理解。docker
如下是 Pulsar Functions Worker 的架構圖apache
架構圖中的各個組件 Metadata Mananger、Scheduler Manager、Runtime Manager 和Membership Manager 都是運行在 Worker 上的。bash
咱們來了解下 Worker 的選舉。架構
2.啓動時,每一個 Worker 內部都會啓動一個 Consumer,該 Consumer 用來進行選舉,每一個 Worker 的 id,主機名以及端口號會和該 Consumer 綁定。
3.該 Consumer 基於 Failover 模式啓動(關於 Consumer 的 Failover 訂閱模式能夠參考[這裏](http://pulsar.apache.org/docs...)。鏈接到同一個 Topic,具備相同訂閱名稱的 Consumer 中同一時刻只有一個處於活躍狀態。
4.使用該 Consumer 的 Worker 就是 Leader,它負責進行調度並處理一些其餘操做。eclipse
Worker Service1 和 Worker Service2 爲一個集羣,同時鏈接到 Topic1, 具備相同的訂閱名稱 sub,Leader 會在它們兩個 Worker 中產生。
Worker Service3 爲另外一個集羣,鏈接到 Topic2,訂閱名稱爲 sub3,由於只有一個 Worker,因此它自己就是 Leader。測試
下面咱們來實踐選舉過程。spa
在搭建本次試驗前,須要在電腦上安裝如下依賴,本次試驗是在 Mac 系統上進行的測試。3d
拉取 apachepulsar/pulsar:2.3.1 的鏡像,而後使用 docker run
命令進行啓動,-d
參數使該服務運行在後臺模式,-it
以交互模式運行容器,併爲容器分配一個僞輸入終端。--name
指定該容器使用 pulsar-standalone-function-leader
這個名字。日誌
docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone-function-leader apachepulsar/pulsar:2.3.1 bin/pulsar standalone
docker logs -f 11:17:17.363 [pulsar-web-55-4] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public] 11:17:17.369 [pulsar-web-55-4] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default 11:17:17.370 [pulsar-web-55-4] INFO org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [18/May/2019:11:17:17 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.3.1" 10 11:17:17.377 [pulsar-web-55-12] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
啓動 Pulsar 後出現上面的日誌說明服務啓動成功。
Worker 的選舉是基於 Consumer 的 Failover 模式,所以在本次測試中,直接啓動 Consumer 來模擬選舉。
如上選舉圖所示,本次測試會啓動三個 Failover 模式的 Consumer,其中兩個 Consumer 鏈接到 Topic1,使用的訂閱名稱爲 sub,它們表明一個 Worker 集羣;另外一個 Consumer 鏈接到 Topic2,使用訂閱名稱 sub3 表明另外一個 Worker 集羣。
爲了本次測試,須要開啓四個窗口,分別命名爲 window1,window2,window3,window4。
window4 用來進行驗證。
window1
以 Failover 模式啓動一個消費者,使用訂閱名稱 sub,訂閱到 public/default/topic1
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
window2
以 Failover 模式啓動一個消費者,使用訂閱名稱 sub,訂閱到 public/default/topic1
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover
window3
以 Failover 模式啓動一個消費者,使用訂閱名稱 sub3,訂閱到 public/default/topic2
docker exec -it pulsar-standalone-function-leader /bin/bash ./bin/pulsar-client consume persistent://public/default/topic2 --num-messages 0 --subscription-name sub3 -t Failover
三個窗口的操做都使用了相同的訂閱模式 Failover,window1 和 window2 使用相同的訂閱名稱訂閱到同一個 Topic。
window4
經過命令 topics stats 來獲取 topic1 的統計信息
./bin/pulsar-admin topics stats topic1 { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "averageMsgSize" : 0.0, "storageSize" : 0, "publishers" : [ ], "subscriptions" : { "sub" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "383dc", "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "383dc", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "address" : "/127.0.0.1:50014", "connectedSince" : "2019-05-18T11:30:34.161Z", "clientVersion" : "2.3.1" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "51911", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "address" : "/127.0.0.1:50018", "connectedSince" : "2019-05-18T11:30:42.742Z", "clientVersion" : "2.3.1" } ] } }, "replication" : { }, "deduplicationStatus" : "Disabled" }
在 Topic1 下出現了一個訂閱,兩個 Consumer,當前的 activeConsumerName 是 383dc
,正是 window1 中的訂閱。這說明當前持有該訂閱的 Worker 爲 Leader。
當把 window1 的訂閱關掉後,再看一下。在 window1 中,使用 CTRL + C 關掉該 Consumer,再回到 window4 查看訂閱。
./bin/pulsar-admin topics stats topic1 { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "averageMsgSize" : 0.0, "storageSize" : 0, "publishers" : [ ], "subscriptions" : { "sub" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "51911", "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "51911", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-05-18T11:30:42.742Z", "clientVersion" : "2.3.1", "address" : "/127.0.0.1:50018" } ] } }, "replication" : { }, "deduplicationStatus" : "Disabled" }
能夠看到 window2 中啓動的訂閱被成功激活,當前的 activeConsumerName 是 51911,這時持有該 Consumer 的 Worker 便成爲了 Leader。
以上是在 Connector 運行 Worker 的選舉機制,能夠看到其很是巧妙的運用了 Consumer 的 Failover 模式,來實現 Worker 的高可用機制。