譯文|如何將 Pulsar 用做消息隊列

原文做者爲 Luk Perkins,來自 Splunk 團隊。
文章翻譯已得到原做者受權。

消息隊列是大多數大規模數據架構的主要組件。若是必須對數據進行實時處理,那麼使用消息隊列是很好的選擇。前端

image

數據處理管道會發生各類故障,數據 consumer 可能會受到延遲或徹底不能工做,網絡分區可能會暫時切斷整個 consumer 組與數據管道的鏈接等。數據庫

有些狀況必須使用消息隊列,例如:後端

  • 開發拼車應用程序,不考慮高峯時段的使用峯值,須要確保每一個乘車請求最終只匹配到一位司機
  • 金融級事務交易管道須要同步請求處理,以防止數據丟失
  • 搭建基於微服務的處理管道,前端爲具備多個寫入端點的 REST API(每秒進行數千次運算),須要確保即便後端微服務出現故障,全部的工做對象都保留在系統中

消息隊列如何工做

下圖爲消息隊列常見工做方式(並對故障作出響應)的示意圖:服務器

image

在上圖中,producer 一、二、3 和 4 經過消息 broker 將消息發送到管道,而 consumer 一、二、3 和 4 處理(而後確認)這些消息。在本示例中,當 consumer 1 出現故障時,會出現很是嚴重的問題。Producer 會繼續將數據傳送到系統中,但 consumer 1 不能繼續處理消息。Broker 應該開始存儲全部本來將會用於 consumer 1 的消息數據,直到 consumer 1 可以繼續處理消息。網絡

從這個示例能夠看出,對於堆棧中任何重要的消息隊列而言,穩定的存儲組件都必不可少。幸運的是,消息隊列與支持消息隊列的存儲系統同樣性能良好。若是存儲組件易發故障、受到損壞,或運行緩慢,於是即使僅有一個組件出現故障,也不能很好地應對,那麼強烈建議你們更換存儲部件。數據結構

引入 Apache Pulsar

通常而言,由不一樣的系統處理訂閱-發佈消息和消息隊列。例如,典型的技術棧可能使用 Apache Kafka 處理髮布-訂閱消息,使用 RabbitMQ 處理消息隊列。在這種狀況下,雖然系統工做良好,可是你須要同時部署、管理多個消息系統。架構

我最喜歡 Apache Pulsar 的一點就是,它能夠輕鬆鏈接訂閱-發佈消息和消息隊列。Pulsar 是第一個爲了同時處理訂閱-發佈消息和消息隊列而開源的消息系統。運維

由於使用 Apache BookKeeper 分佈式日誌存儲數據庫做爲存儲組件,Pulsar 能夠輕鬆地同時支持訂閱-發佈消息和消息隊列。BookKeeper 做爲日誌存儲系統,基於消息 topic 數據結構而構建,支持水平擴展(增長 「bookie」 數量便可擴展容量),且運行迅速。分佈式

Pulsar 支持兩種基本的 topic 類型:持久 topic 與非持久 topic。用戶能夠根據名稱辨別 topic 類型,由於類型即爲 topic 名稱的「schema」(相似於 https 是 URL https://google.com 的 schema)。
持久 topic 的名稱格式爲:persistent://public/default/some-topic,而非持久 topic 的名稱格式爲:non-persistent://public/default/some-topic。微服務

用戶使用持久 topic 時,Pulsar 將全部未確認消息(即未處理消息)存儲在 BookKeeper 中的多個「bookie」服務器上。

Pulsar 的確支持非持久 topic,可是咱們建議用戶只在能夠接受丟失消息的用例中,使用非持久消息。對於具備消息隊列功能的 topic,毫不應該使用非持久 topic。與將消息數據存儲在內存中相比,這種存儲方式具備不少優點。

如何將 Apache Pulsar 用做消息隊列

Pulsar 無需特殊配置或調整,便可支持兩種用例,所以在使用方面具備必定的優點。重點在於如何使用 Pulsar,以下圖所示:

image

發佈-訂閱 producer 和 consumer 經過發佈-訂閱 topic 進行通訊,而隊列 producer 和 consumer 經過隊列 topic 進行通訊。不須要「標記」topic,也不須要預先指定 topic 爲實時 topic 或隊列 topic。

消息隊列 topic 須要 consumer 使用共享訂閱,而不能是獨佔訂閱(exclusive)或災備訂閱(failover)。另外,全部 consumer 必須使用相同的訂閱名稱,不然就不是同一訂閱。當 consumer 在 topic 上建立共享訂閱後,Pulsar 會自動在接收消息的 consumer 之間進行負載平衡,對於消息隊列來講,這是最理想的狀態。

如下代碼展現了五個 Java consumer 使用共享訂閱監聽同一 topic 的場景:

String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
String MQ_TOPIC = "persistent://public/default/message-queue-topic";
String SUBSCRIPTION = "sub-1";
// Pulsar client
PulsarClient client = PulsarClient.builder()
 .serviceUrl(PULSAR_SERVICE_URL)
 .build();
// Base consumer builder for instantiating multiple consumers
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
 .topic(MQ_TOPIC)
 .subscriptionName(SUBSCRIPTION)
 .subscriptionType(SubscriptionType.Shared)
 .messageListener(messageCallback);
// Create five consumers (mq-consumer-0, mq-consumer-1, etc.)
IntStream.range(0, 4).forEach(i -> {
 String name = String.format("mq-consumer-%d", i);
 consumerBuilder
 .consumerName(name)
 .subscribe();
});

控制消息調度

吞吐量在消息隊列中尤其重要。若是消息隊列沒有足夠的吞吐量來處理周圍數據管道所須要的內容,那麼消息隊列可能不只性能不夠好,甚至會產生一些負面影響。若是使用 Pulsar 做爲消息隊列,則能夠經過調整 consumer 的配置來微調處理吞吐量

默認狀況下,Apache Pulsar consumer 有一個接收隊列,用於一次處理多條消息。用戶能夠自行配置單個 consumer 接收隊列的大小(默認值爲 1000 條消息)。

理想狀況下,應該根據 consumer 處理消息的速度來設置接收隊列的大小。若是能夠很是快速地處理消息(只需幾毫秒),那麼建議將接收隊列的大小設置爲較大的值,由於這樣有助於最大化 consumer 的處理吞吐量。

可是若是處理消息須要較長時間,最好將接收隊列的大小設置爲較小的值。若是 consumer 正在執行的任務屬於 CPU 密集型,也就是說任務處理須要幾秒鐘甚至更久,則建議將接收隊列的大小設置爲個位數或 1,這樣負載平衡器可以在 consumer 之間合理地分發消息。

在下面這段代碼中,consumer 接收隊列比較小(Java):

Consumer<byte[]> consumer = client.newConsumer()
 .topic("slow-processing-topic")
 .subscriptionType(SubscriptionType.Shared)
 .subscriptionName("sub-1")
 .receiverQueueSize(5)
 .messageListener(messageCallback)
 .subscribe();

接收隊列的默認值適用於不少用例。可是建議用戶稍微留意一下接收隊列,以避免在後續工做中須要進行調優。

一個消息平臺,兩種用例場景

若是想在不一樣用例場景中同時運行多個消息平臺,你們能夠考慮使用 Pulsar。Pulsar 同時支持兩種主要的消息用例——發佈-訂閱消息(尤爲是持久消息)和消息隊列,而且運行速度快、可擴展,還能夠減輕運維管理負擔。

相關文章
相關標籤/搜索