原文做者爲 Luk Perkins,來自 Splunk 團隊。
文章翻譯已得到原做者受權。
消息隊列是大多數大規模數據架構的主要組件。若是必須對數據進行實時處理,那麼使用消息隊列是很好的選擇。前端
數據處理管道會發生各類故障,數據 consumer 可能會受到延遲或徹底不能工做,網絡分區可能會暫時切斷整個 consumer 組與數據管道的鏈接等。數據庫
有些狀況必須使用消息隊列,例如:後端
下圖爲消息隊列常見的工做方式(並對故障作出響應)的示意圖:服務器
在上圖中,producer 一、二、3 和 4 經過消息 broker 將消息發送到管道,而 consumer 一、二、3 和 4 處理(而後確認)這些消息。在本示例中,當 consumer 1 出現故障時,會出現很是嚴重的問題。Producer 會繼續將數據傳送到系統中,但 consumer 1 不能繼續處理消息。Broker 應該開始存儲全部本來將會用於 consumer 1 的消息數據,直到 consumer 1 可以繼續處理消息。網絡
從這個示例能夠看出,對於堆棧中任何重要的消息隊列而言,穩定的存儲組件都必不可少。幸運的是,消息隊列與支持消息隊列的存儲系統同樣性能良好。若是存儲組件易發故障、受到損壞,或運行緩慢,於是即使僅有一個組件出現故障,也不能很好地應對,那麼強烈建議你們更換存儲部件。數據結構
通常而言,由不一樣的系統處理訂閱-發佈消息和消息隊列。例如,典型的技術棧可能使用 Apache Kafka 處理髮布-訂閱消息,使用 RabbitMQ 處理消息隊列。在這種狀況下,雖然系統工做良好,可是你須要同時部署、管理多個消息系統。架構
我最喜歡 Apache Pulsar 的一點就是,它能夠輕鬆鏈接訂閱-發佈消息和消息隊列。Pulsar 是第一個爲了同時處理訂閱-發佈消息和消息隊列而開源的消息系統。運維
由於使用 Apache BookKeeper 分佈式日誌存儲數據庫做爲存儲組件,Pulsar 能夠輕鬆地同時支持訂閱-發佈消息和消息隊列。BookKeeper 做爲日誌存儲系統,基於消息 topic 數據結構而構建,支持水平擴展(增長 「bookie」 數量便可擴展容量),且運行迅速。分佈式
Pulsar 支持兩種基本的 topic 類型:持久 topic 與非持久 topic。用戶能夠根據名稱辨別 topic 類型,由於類型即爲 topic 名稱的「schema」(相似於 https 是 URL https://google.com 的 schema)。
持久 topic 的名稱格式爲:persistent://public/default/some-topic,而非持久 topic 的名稱格式爲:non-persistent://public/default/some-topic。微服務
用戶使用持久 topic 時,Pulsar 將全部未確認消息(即未處理消息)存儲在 BookKeeper 中的多個「bookie」服務器上。
Pulsar 的確支持非持久 topic,可是咱們建議用戶只在能夠接受丟失消息的用例中,使用非持久消息。對於具備消息隊列功能的 topic,毫不應該使用非持久 topic。與將消息數據存儲在內存中相比,這種存儲方式具備不少優點。
Pulsar 無需特殊配置或調整,便可支持兩種用例,所以在使用方面具備必定的優點。重點在於如何使用 Pulsar,以下圖所示:
發佈-訂閱 producer 和 consumer 經過發佈-訂閱 topic 進行通訊,而隊列 producer 和 consumer 經過隊列 topic 進行通訊。不須要「標記」topic,也不須要預先指定 topic 爲實時 topic 或隊列 topic。
消息隊列 topic 須要 consumer 使用共享訂閱,而不能是獨佔訂閱(exclusive)或災備訂閱(failover)。另外,全部 consumer 必須使用相同的訂閱名稱,不然就不是同一訂閱。當 consumer 在 topic 上建立共享訂閱後,Pulsar 會自動在接收消息的 consumer 之間進行負載平衡,對於消息隊列來講,這是最理想的狀態。
如下代碼展現了五個 Java consumer 使用共享訂閱監聽同一 topic 的場景:
String PULSAR_SERVICE_URL = "pulsar://localhost:6650"; String MQ_TOPIC = "persistent://public/default/message-queue-topic"; String SUBSCRIPTION = "sub-1"; // Pulsar client PulsarClient client = PulsarClient.builder() .serviceUrl(PULSAR_SERVICE_URL) .build(); // Base consumer builder for instantiating multiple consumers ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer() .topic(MQ_TOPIC) .subscriptionName(SUBSCRIPTION) .subscriptionType(SubscriptionType.Shared) .messageListener(messageCallback); // Create five consumers (mq-consumer-0, mq-consumer-1, etc.) IntStream.range(0, 4).forEach(i -> { String name = String.format("mq-consumer-%d", i); consumerBuilder .consumerName(name) .subscribe(); });
吞吐量在消息隊列中尤其重要。若是消息隊列沒有足夠的吞吐量來處理周圍數據管道所須要的內容,那麼消息隊列可能不只性能不夠好,甚至會產生一些負面影響。若是使用 Pulsar 做爲消息隊列,則能夠經過調整 consumer 的配置來微調處理吞吐量。
默認狀況下,Apache Pulsar consumer 有一個接收隊列,用於一次處理多條消息。用戶能夠自行配置單個 consumer 接收隊列的大小(默認值爲 1000 條消息)。
理想狀況下,應該根據 consumer 處理消息的速度來設置接收隊列的大小。若是能夠很是快速地處理消息(只需幾毫秒),那麼建議將接收隊列的大小設置爲較大的值,由於這樣有助於最大化 consumer 的處理吞吐量。
可是若是處理消息須要較長時間,最好將接收隊列的大小設置爲較小的值。若是 consumer 正在執行的任務屬於 CPU 密集型,也就是說任務處理須要幾秒鐘甚至更久,則建議將接收隊列的大小設置爲個位數或 1,這樣負載平衡器可以在 consumer 之間合理地分發消息。
在下面這段代碼中,consumer 接收隊列比較小(Java):
Consumer<byte[]> consumer = client.newConsumer() .topic("slow-processing-topic") .subscriptionType(SubscriptionType.Shared) .subscriptionName("sub-1") .receiverQueueSize(5) .messageListener(messageCallback) .subscribe();
接收隊列的默認值適用於不少用例。可是建議用戶稍微留意一下接收隊列,以避免在後續工做中須要進行調優。
若是想在不一樣用例場景中同時運行多個消息平臺,你們能夠考慮使用 Pulsar。Pulsar 同時支持兩種主要的消息用例——發佈-訂閱消息(尤爲是持久消息)和消息隊列,而且運行速度快、可擴展,還能夠減輕運維管理負擔。