Apache Pulsar 2.6.0 重磅發佈,新特性獨家解讀

在 Pulsar 2.5.2 版本發佈後的 1 個月,2020 年 6 月 18 日,Apache Pulsar 正式發佈了 2.6.0 版本!java

Pulsar 2.6.0 版本新增了諸多功能,並修復了大量漏洞,覆蓋存儲端、Broker 端、客戶端、Pulsar Functions、Pulsar IO、Pulsar SQL、Pulsar proxy 和安全等多方面,一如既往地豐富和完善了 Pulsar 做爲一個雲原生流數據平臺的能力。git

2.6.0 版本總共接受了來自社區約 450 個 commits,愈來愈多的代碼貢獻來自於中國開發者,中國力量愈加迅猛。github

如下是 2.6.0 版本重要新增功能的詳細信息。shell

Pulsar 核心

[PIP-37] 支持傳輸大消息體的消息

經過將大消息體的消息拆分紅多個 chunk,該 PIP 支持生產和消費大消息體的消息。目前,該功能僅對 non-shared subscription 有效,並對客戶端有改動。如需使用該功能,你須要將 Pulsar 客戶端升級至 2.6.0。使用該特性能夠在生產端啓用消息 trunking 機制。apache

client.newProducer()
	.topic("my-topic")
	.enableChunking(true)
	.create();
複製代碼
  • 更多關於 PIP-37 的信息,參閱這裏api

  • 更多關於該功能的代碼實現細節, 參閱 PR-4440緩存

[PIP-39] 新增 system topic,用於存儲 namespace 更改事件

在 Pulsar 2.6.0 之前,Pulsar 只能設置 namespace 級策略,屬於同一 namespace 的全部 topic 都遵循相同的 namespace 策略,但許多用戶但願能設置 topic 級別策略。另外,不使用和 namespace 級策略的實現方式是由於更多的 Topic 策略會加劇 ZooKeeper 負擔,而 system topic 的設計初衷是但願將 topic 策略存儲在 topic(而不是 ZooKeeper)中。該 PIP 是實現 topic 級策略的第一步,基於此,將來能實現更多相關功能。安全

  • 更多關於 PIP-39 的信息,參閱這裏bash

  • 更多關於該功能的代碼實現細節, 參閱 PR-4955app

[PIP-45] 支持可插拔元數據接口

該 PIP 支持 Pulsar 使用其餘 metastore 服務(而不是 ZooKeeper),並支持 ManagedLedger 使用 MetadataStore 接口。經過 MetadataStore 接口,能較容易地增長其餘元數據服務(例如 etcd)。

  • 更多關於 PIP-45 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-5358

[PIP-54] 支持在 batch index 級確認消息

在 Pulsar 2.6.0 之前,broker 僅在 batch message 級追蹤消息確認狀態。若是部分批量消息中已被確認,消息從新發送給 consumer 時仍會收到「部分批量消息已確認」的信息。該 PIP 支持在 batch index 級確認消息。默認狀況下,該功能未開啓。如需開啓,你能夠在 broker.conf 文件中設置。

batchIndexAcknowledgeEnable=true
複製代碼
  • 更多關於 PIP-54 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-6052

[PIP-58] 支持 consumer 設置自定義消息重發延時

對於許多在線業務系統而言,業務邏輯處理時常出現各類異常,所以須要從新消費消息,且用戶但願能夠自定義設置延遲時間。在 Pulsar 2.6.0 以前,當客戶端發送 nack 至 broker,Pulsar 會馬上重發消息。從 Pulsar 2.6.0 開始,你能夠爲每條消息設置重發延時。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                	.maxRedeliverCount(maxRedeliveryCount)
                   .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscribe();

consumer.reconsumeLater(message, 10 , TimeUnit.SECONDS);
複製代碼
  • 更多關於 PIP-58 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-6449

[PIP-60] 支持 SNI 路由,以支持更多 proxy server

在 Pulsar 2.6.0 以前,Pulsar 不支持使用其餘 proxy(例如,Apache Traffic Server、HAProxy、Nginx 和 Envoy)的 SNI 路由,這些 proxy 的擴展性和安全性更高,而且大多支持 SNI 路由。SNI 路由能在不中斷 SSL 鏈接的狀況下,將流量路由至目的地。

  • 更多關於 PIP- 60 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-6566

[PIP-61] 支持多個 advertised address

該 PIP 容許 broker 暴露多個 advertised listeners,並支持內網和外網流量分離。你能夠在 broker.conf 文件中指定多個 advertised listeners。

advertisedListeners=internal:pulsar://192.168.1.11:6660,external:pulsar://110.95.234.50:6650
複製代碼

你也能夠爲客戶端指定 listener 名稱。

PulsarClient.builder()
.serviceUrl(url)
.listenerName("internal")
.build();
複製代碼
  • 更多關於 PIP-61 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-6903

[PIP-65] Pulsar IO sources 支持 BatchSource

該 PIP 新增瞭如下功能:新增 BatchSource 接口,用於開發基於 batch 的 connector;新增 BatchSourceTriggerer 接口,用於觸發 BatchSource 收集數據;提供了 BatchSourceExecutor 的系統實現。

  • 更多關於 PIP-65 的信息,參閱這裏

  • 更多關於該功能的代碼實現細節, 參閱 PR-7090

[Load balancer][PR-6772] 添加 ThresholdShedder 策略

ThresholdShedder 策略比 LoadSheddingStrategy 策略更靈活。ThresholdShedder 策略計算 broker 的平均資源使用狀況。每一個 broker 資源使用狀況會與該平均值進行對比。若是超過(平均值+閾值),則觸發 namespace bundle 轉移至其餘低負載 broker 。你能夠在 broker.conf 文件中啓用該功能。

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
複製代碼

你也能夠自定義 ThresholdShedder 策略的參數。

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average resource usage,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only takes effect in ThresholdSheddler strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in ThresholdSheddler strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
# It only takes effect in ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-6772

[Key Shared][PR-6791] 在 Key_Shared 訂閱中增長一致性 hashing 分配

在 Pulsar 2.6.0 以前,Key_Shared 訂閱是經過使用 hash range 自動分裂來實現,該方法基於在新 consumer 加入或離開時分裂現有已分配 hash range。

Pulsar 2.6.0 爲 Key_Shared 訂閱新增一致性 hash 分配。你能夠在 broker.conf 文件中啓用該功能。自動分裂(auto split)方法仍默認開啓。

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
# consistent hashing to reassign keys to new consumers
subscriptionKeySharedUseConsistentHashing=false

# On KeyShared subscriptions, number of points in the consistent-hashing ring.
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100
複製代碼

後續 Pulsar 版本計劃默認開啓一致性 hash 分配功能。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6791

[Key Shared][PR-7106][PR-7108] 解決了新增 consumer 時,KeyShared dispatcher 出現的亂序問題

該 PR 對 Key_Shared 訂閱功能很是重要。在 Pulsar 2.6.0 以前,當新增 consumer(c2)進入且現有 consumer(c1)退出時,順序在 Key_Shared dispatcher 中不能保證。這是由於以前分配至 c1 的消息和 key 可能路由至 c2,這可能致使 Key_Shared 訂閱中同一個 key 的消息順序分發保證失效。

該 PR 作出瞭如下改動:爲了保證消息按序分配,在以前的消息被確認以前,新增 consumer 會變成「暫停」狀態。

若是你仍想使用鬆散的順序分發保證,能夠在 consumer 端進行如下設置。

pulsarClient.newConsumer()
	.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()。setAllowOutOfOrderDelivery(true))
	.subscribe();
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-7106 and PR-7108

[Key Shared][PR-5928] 支持 key hash range reader

該 PR 支持 reader 讀取某幾個 hash range 的消息。Broker 僅發送 key 的 hash 在 hash range 範圍內的消息。

另外,你也能夠爲 reader 指定多個 key hash range。

pulsarClient.newReader()
                    .topic(topic)
                    .startMessageId(MessageId.earliest)
                    .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
                    .create();
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-5928

[PR-5390] 將基於 JNI 的庫替換成 AirCompressor(Java 壓縮庫)

在 Pulsar 2.6.0 以前,基於 JNI 的庫用於壓縮數據,但這些庫有容量開銷,且會影響 JNI 開銷(一般它在壓縮較小 payload 時計算)。

該 PR 將 LZ四、ZStd 和 Snappy 的壓縮庫替換成 AirCompressor,它是 Presto 使用的純 Java 壓縮庫。

  • 更多關於該功能的代碼實現細節, 參閱 PR-5390

[PR-5985] 支持多個 Pulsar 集羣使用相同的 BookKeeper 集羣

該 PR 容許多個 Pulsar 集羣使用指定 BookKeeper 集羣(經過指定 BookKeeper 客戶端至 BookKeeper 集羣的 ZooKeeper 鏈接字符串)。

該 PR 新增配置項 bookkeeperMetadataServiceUri,用於發現 BookKeeper 集羣元數據存儲和使用元數據服務 URI,以初始化 BookKeeper 客戶端。

# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
# and resolving its metadata service location.
# This value can be fetched using `bookkeeper shell whatisinstanceid` command in BookKeeper cluster.
# For example: zk+hierarchical://localhost:2181/ledgers
# The metadata service uri list can also be semicolon separated values like below:
# zk+hierarchical://zk1:2181;zk2:2181;zk3:2181/ledgers
bookkeeperMetadataServiceUri=
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-5985

[PR-6077] 支持在全部 subscription 已消費至最新消息後,刪除非活躍 topic

在 Pulsar 2.6.0 以前,Pulsar 支持刪除非活躍 topic(這些 topic 不包括 producer 和 subscription)。該 PR 支持當全部 topic 的 subscription 已消費至最新消息後且不存在活躍 producer 或 consumer 時,刪除非活躍 topic。

你能夠在 broker.conf 文件中設置使用該功能。計劃未來能在 namespace 級使用該功能。

# Set the inactive topic delete mode. Default is delete_when_no_subscriptions
# 'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers
# 'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)
# and no active producers/consumers
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-6077

[PR-6634] 新增 flag,用於在內存溢出時忽略 broker 宕機

Topic 出現高 dispatch rate 時,可能致使 broker 短暫出現 OOM。一旦內存被釋放,broker 能夠在幾分鐘內恢復正常。但在 Pulsar 2.4.0(更多信息,參閱 PR-4196)中,內存溢出時重啓 broker 會致使集羣不穩定(topic 會在 broker 之間移動)、重啓多個 broker 並擾亂其它 topic。該 PR 新增 flag,在內存溢出時忽略 broker 宕機,避免集羣不穩定。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6634

[PR-6668] 支持配置 ZooKeeper 緩存失效時間

在 Pulsar 2.6.0 以前,沒法設置 ZooKeeper 緩存失效時間,但有許多場景須要設置該值。

如今,你能夠在 broker.conf 文件中設置 ZooKeeper 緩存失效時間。

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-6668

[PR-6719] 優化了 consumer 獲取批量消息功能

當 consumer 向 broker 發送獲取消息請求時,該請求包括獲取消息數量(告知 broker 須要發送多少條消息至 consumer)。若是 producer 使用批量功能生產消息,broker 會根據 entry(而不是單條消息)將數據存儲在 BookKeeper 或 broker 緩存。處理 consumer 獲取請求時,消息數量和 entry 數量之間的映射存在問題。

該 PR 新增了 avgMessagesPerEntry 變量,用於記錄存儲在每條 entry 中的平均消息數量,並在 broker 發送消息至 consumer 時更新平均消息數量。處理 consumer 獲取請求時,avgMessagesPerEntry 變量映射獲取請求數量至 entry 數量。另外,該 PR 向 consumer stats 中增長了 avgMessagePerEntry 指標信息。

你能夠在 broker.conf 中啓用 preciseDispatcherFlowControl

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-6719

[PR-7078] 爲 topic 新增精確發佈速率限制

在 Pulsar 2.6.0 以前,Pulsar 支持發佈速率限制,但它並不是精確控制。而如今,你能夠在 broker.conf 文件中啓用 topic 精確發佈速率限制功能。

preciseTopicPublishRateLimiterEnable=true
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-7078

[PR-7154] 支持 entry 檢查延遲

在 Pulsar 2.6.0 以前,新增 entry 檢查延遲是 10 ms(且用戶沒法設置該值)。如今,對於消費延遲敏感的場景,你能夠在 broker.conf 文件中將新增 entry 檢查延遲設置成更小值(可能下降消費吞吐)或 0。

managedLedgerNewEntriesCheckDelayInMillis=10
複製代碼
  • 更多關於該功能的代碼實現細節, 參閱 PR-7154

[Schema] [PR-7139] KeyValue schema 支持 null key 和 null value。

  • 更多關於該功能的代碼實現細節, 參閱 PR-7139

[PR-7116] 支持設置 maxLedgerRolloverTimeMinutes 參數,用於觸發 ledger 切換

該 PR 實現了一個監測線程,用於檢查當前 topic ledger 是否知足 managedLedgerMaxLedgerRolloverTimeMinutes 條件並觸發 ledger 切換使配置生效。若是觸發 ledger 切換,你能夠關閉當前 ledger 並釋放當前 ledger 的存儲空間。對於不常使用的 topic,當前 ledger 數據可能失效,且當前切換操做僅適用於新增 entry 時。很顯然,這會浪費磁盤空間。

監測線程在固定間隔時間檢查 ledger 是否須要切換。你能夠經過 managedLedgerMaxLedgerRolloverTimeMinutes 設置該時間間隔。

  • 更多關於該功能的代碼實現細節, 參閱 PR-7116

Proxy

[PR-6473] 新增用於獲取 connection 和 topic stats 的 REST API

在 Pulsar 2.6.0 以前,Pulsar proxy 沒有獲取 proxy 內部信息的 stats,例如,有效鏈接、topic stats(log 級)和其它信息。

該 PR 新增用於獲取 connection 和 topic stats 的 REST API。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6473

Admin

[PR-6331] 新增 get-message-by-id 命令

該 PR 新增 get-message-by-id 命令,支持經過 ledger ID 和 entry ID 檢查單條信息。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6331

[PR-6383] 新加強制刪除 subscription 功能

該 PR 新增 deleteForcefully 方法,用於強制刪除 subscription。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6383

Functions

[PR-6895] 內置函數

支持像建立內置 connector 同樣建立內置 function。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6895

[PR-6031] 新增 Go function 心跳和 gRPC 服務

  • 更多關於該功能的代碼實現細節, 參閱 PR-6031

[PR-6348] 支持自定義系統配置

該 PR 支持在提交 function 時自定義系統變量。該功能能夠經過系統變量傳遞認證信息。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6348

[PR-6602] 分離 function worker 和 broker 的 TLS 配置

  • 更多關於該功能的代碼實現細節, 參閱 PR-6602

[PR-6954] 支持在 function 和 source 中建立 consumer

在 Pulsar 2.6.0 以前,你能經過 function context 和 source context 建立 publisher,但不能建立 consumer。該 PR 修復了這一問題。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6954

Pulsar SQL

[PR-6325] 支持 KeyValue schema

在 Pulsar 2.6.0 以前,Pulsar SQL 沒法讀取 KeyValue schema 數據。

該 PR 支持 KeyValue schema,併爲 key field 名稱新增前綴 key.,爲 value field 名稱新增前綴 value.

  • 更多關於該功能的代碼實現細節, 參閱 PR-6325

[PR-4847] 支持多個 Avro schema 版本

在 Pulsar 2.6.0 以前,若是 topic 有多個 Avro schema,使用 Pulsar SQL 查詢 topic 的數據會引發一些問題。從 Pulsar 2.6.0 開始,如需查詢 topic 中的數據,能夠更新 schema,該 schema 能夠兼容 topic 中的全部 schema。

  • 更多關於該功能的代碼實現細節, 參閱 PR-4847

Java client

[PR-6648] 新增 API,用於在關閉 producer 時,Pulsar 客戶端會繼續等待正在傳輸的消息傳輸完成

在 Pulsar 2.6.0 以前,當關閉 producer 時,Pulsar 客戶端會馬上讓正在傳輸的消息失效(即便消息已在 broker 持久化)。大部分狀況下,用戶但願能在關閉 producer 前等待正在傳輸的消息完成(而不是使這些消息失效),但 Pulsar 客戶端 lib 未實現該功能。

該 PR 支持經過 flag(控制是否等待正在傳輸的消息)關閉 API。你能夠在關閉 producer 以前等待正在傳輸的消息,Pulsar 客戶端不會當即使這些消息失效。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6648

[PR-6760] 支持從輸入流中動態加載 TLS certs/key

在 Pulsar 2.6.0 以前,Pulsar 客戶端提供 TLS 認證功能,默認 TLS provider AuthenticationTls 的值爲 cert 和 key 文件的文件路徑,但在有些應用場景中很難爲 TLS 認證存儲 cert 或 key。

該 PR 爲 AuthenticationTls 新增流支持,以提供 X509Certs 和 PrivateKey(當指定 provider 出現流變化時,PrivateKey 會自動更新)。

  • 更多關於該功能的代碼實現細節, 參閱 PR-6760

[PR-6825] 異步發送消息時,若是程序拋出異常,系統會返回 sequence ID

在 Pulsar 2.6.0 以前,若是異步發送消息失敗,程序會拋出異常,但並不顯示哪些消息不正常,用戶也沒法瞭解須要重試哪些消息。

這次改動更新了客戶端。當拋出異常時,程序會設置 sequenceId org.apache.pulsar.client.api.PulsarClientException

  • 更多關於該功能的代碼實現細節, 參閱 PR-6825

參考信息

Pulsar

任何問題或建議,歡迎經過 Pulsar mailing list 或 Slack 聯繫咱們:

期待你爲 Pulsar 的發展添磚加瓦。

若是你對 Pulsar 示例、demo、工具或擴展感興趣,歡迎查閱 StreamNative GitHub

相關文章
相關標籤/搜索