Pulsar Functions Worker 的選舉機制

摘要

在 Pulsar 中,Function、Source 和 Sink 都是運行在 Function Worker 上的,關於 Function 的內容能夠參考一篇文章瞭解 Pulsar Functions,關於 Source 和 Sink 的使用能夠參考web

本文介紹了 Functions Worker 的選舉機制。閱讀本文能夠對 Functions Worker 有更加深刻的理解。docker

Function Worker 架構

如下是 Pulsar Functions Worker 的架構圖
在這裏插入圖片描述apache

架構圖中的各個組件 Metadata Mananger、Scheduler Manager、Runtime Manager 和Membership Manager 都是運行在 Worker 上的。bash

Worker 的選舉機制

在這裏插入圖片描述

  • 藍色背景的是 Worker
  • 橢圓淺綠色的是訂閱,也是一個 Consumer
  • 方形綠色背景的是 Broker
  • 方形紅色背景的是 Topic

咱們來了解下 Worker 的選舉。架構

  1. 圖中啓動了三個 Worker,名稱分別是 Worker Service1,Worker Service2,Worker Service3。這些 Worker 能夠在同一臺機器上,也能夠在不一樣的機器。

2.啓動時,每一個 Worker 內部都會啓動一個 Consumer,該 Consumer 用來進行選舉,每一個 Worker 的 id,主機名以及端口號會和該 Consumer 綁定。
3.該 Consumer 基於 Failover 模式啓動(關於 Consumer 的 Failover 訂閱模式能夠參考[這裏](http://pulsar.apache.org/docs...)。鏈接到同一個 Topic,具備相同訂閱名稱的 Consumer 中同一時刻只有一個處於活躍狀態。
4.使用該 Consumer 的 Worker 就是 Leader,它負責進行調度並處理一些其餘操做。eclipse

Worker Service1 和 Worker Service2 爲一個集羣,同時鏈接到 Topic1, 具備相同的訂閱名稱 sub,Leader 會在它們兩個 Worker 中產生。
Worker Service3 爲另外一個集羣,鏈接到 Topic2,訂閱名稱爲 sub3,由於只有一個 Worker,因此它自己就是 Leader。測試

測試選舉過程

下面咱們來實踐選舉過程。spa

在搭建本次試驗前,須要在電腦上安裝如下依賴,本次試驗是在 Mac 系統上進行的測試。3d

啓動單機 Pulsar 服務

拉取 apachepulsar/pulsar:2.3.1 的鏡像,而後使用 docker run 命令進行啓動,-d 參數使該服務運行在後臺模式,-it 以交互模式運行容器,併爲容器分配一個僞輸入終端。
--name 指定該容器使用 pulsar-standalone-function-leader 這個名字。日誌

docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone-function-leader apachepulsar/pulsar:2.3.1 bin/pulsar standalone
docker logs -f 
11:17:17.363 [pulsar-web-55-4] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
11:17:17.369 [pulsar-web-55-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
11:17:17.370 [pulsar-web-55-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.2 - - [18/May/2019:11:17:17 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.3.1" 10
11:17:17.377 [pulsar-web-55-12] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]

啓動 Pulsar 後出現上面的日誌說明服務啓動成功。

模擬選舉過程

Worker 的選舉是基於 Consumer 的 Failover 模式,所以在本次測試中,直接啓動 Consumer 來模擬選舉。
如上選舉圖所示,本次測試會啓動三個 Failover 模式的 Consumer,其中兩個 Consumer 鏈接到 Topic1,使用的訂閱名稱爲 sub,它們表明一個 Worker 集羣;另外一個 Consumer 鏈接到 Topic2,使用訂閱名稱 sub3 表明另外一個 Worker 集羣。

爲了本次測試,須要開啓四個窗口,分別命名爲 window1,window2,window3,window4。
window4 用來進行驗證。

window1

以 Failover 模式啓動一個消費者,使用訂閱名稱 sub,訂閱到 public/default/topic1

docker exec -it pulsar-standalone-function-leader /bin/bash
./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover

window2
以 Failover 模式啓動一個消費者,使用訂閱名稱 sub,訂閱到 public/default/topic1

docker exec -it pulsar-standalone-function-leader /bin/bash
./bin/pulsar-client consume persistent://public/default/topic1 --num-messages 0 --subscription-name sub -t Failover

window3
以 Failover 模式啓動一個消費者,使用訂閱名稱 sub3,訂閱到 public/default/topic2

docker exec -it pulsar-standalone-function-leader /bin/bash
./bin/pulsar-client consume persistent://public/default/topic2 --num-messages 0 --subscription-name sub3 -t Failover

三個窗口的操做都使用了相同的訂閱模式 Failover,window1 和 window2 使用相同的訂閱名稱訂閱到同一個 Topic。

window4
經過命令 topics stats 來獲取 topic1 的統計信息

./bin/pulsar-admin topics stats topic1
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 0,
  "publishers" : [ ],
  "subscriptions" : {
    "sub" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "383dc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "383dc",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "address" : "/127.0.0.1:50014",
        "connectedSince" : "2019-05-18T11:30:34.161Z",
        "clientVersion" : "2.3.1"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "51911",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "address" : "/127.0.0.1:50018",
        "connectedSince" : "2019-05-18T11:30:42.742Z",
        "clientVersion" : "2.3.1"
      } ]
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}

在 Topic1 下出現了一個訂閱,兩個 Consumer,當前的 activeConsumerName 是 383dc,正是 window1 中的訂閱。這說明當前持有該訂閱的 Worker 爲 Leader。
當把 window1 的訂閱關掉後,再看一下。在 window1 中,使用 CTRL + C 關掉該 Consumer,再回到 window4 查看訂閱。

./bin/pulsar-admin topics stats topic1
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 0,
  "publishers" : [ ],
  "subscriptions" : {
    "sub" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "51911",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "51911",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-05-18T11:30:42.742Z",
        "clientVersion" : "2.3.1",
        "address" : "/127.0.0.1:50018"
      } ]
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}

能夠看到 window2 中啓動的訂閱被成功激活,當前的 activeConsumerName 是 51911,這時持有該 Consumer 的 Worker 便成爲了 Leader。

總結

以上是在 Connector 運行 Worker 的選舉機制,能夠看到其很是巧妙的運用了 Consumer 的 Failover 模式,來實現 Worker 的高可用機制。

相關文章
相關標籤/搜索