RabbitMQ是很是流行的消息中間件,你們都知道經過集羣可以增大它的吞吐量,那麼針對單個隊列,集羣能增大他的吞吐量嗎?若是不能,咱們要怎麼作呢?java
問題
RabbitMQ是很是流行的消息中間件,你們都知道經過集羣可以增大它的吞吐量,那麼針對單個隊列,集羣能增大他的吞吐量嗎?若是不能,咱們要怎麼作呢?node
答案是集羣並不能的增長單個隊列的吞吐量,這是由於RabbitMQ的普通集羣只是共享元數據信息,達到將整個集羣規模的隊列擴大以增長吞吐量的目的。普通集羣甚至不能保證消息數據的高可用,任意一個broker宕機,都會致使這個broker上的隊列不可用。nginx
而鏡像隊列也僅僅只是保證了實現鏡像複製的隊列的高可用。消費者並不能併發消費複製出來的隊列。git
那麼RabbitMQ是否也能提升相似Kafka的topic分區的機制,來加大單個主題隊列的吞吐量呢?github
經過使用 RabbitMQ Sharding 或Consistent-hash Sharding Exchange 插件來更加靈活地動態均衡隊列壓力,能夠更從容地達到百萬併發的性能。web
這裏我重點介紹下,RabbitMQ Sharding 插件,有興趣的夥伴能夠本身研究下Consistent-hash Sharding Exchange,二者的基本思路一致,都是根據Routeing Key的hash值將消息分發到分片隊列中。spring
原理介紹
官網:https://github.com/rabbitmq/rabbitmq-shardingswift
rabbitmq sharding插件爲您自動對隊列進行分區,也就是說,一旦您將一個exchange 定義爲sharded,那麼在每一個集羣節點上自動建立支持隊列,並在它們之間共享消息。rabbitmq sharding向使用者顯示了一個隊列,但它多是後臺運行在它後面的多個隊列。rabbitmq sharding插件爲您提供了一個集中的位置,經過向集羣中的其餘節點添加隊列,您能夠將消息以及跨多個節點的負載平衡發送到該位置。bash
插件安裝
查看當前插件
find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list
微信
若是有沒有對應的插件,本身下載後複製插件到指定的目錄
手動下載安裝,https://www.rabbitmq.com/community-plugins/
RabbitMQ的有些插件沒有集成在初始的安裝中,它們須要額外安裝,這些文件的後綴爲.ez,安裝時須要將.ez文件拷貝到安裝的插件目錄。如下是不一樣系統中默認安裝的插件目錄路徑:
插件安裝完成後能夠經過命令sudo rabbitmq-plugins list查看已有插件列表,eg:
安裝插件:
./rabbitmq-plugins enable rabbitmq_sharding
service rabbitmq-server restart
隊列分片
1.配置策略
find / -name rabbitmqctl
cd /usr/sbin/
./rabbitmqctl set_policy history-shard "^history" \ '{"shards-per-node": 2, "routing-key": "1234"}' \ --apply-to exchanges
說明:
經過rabbitmqctl set_policy設置新增策略,策略名稱爲history-shard,
匹配規則爲^history,shards-per-node表示每一個節點上分片出2個分片隊列,
routing-key爲1234,應用到全部交換器exchanges上去匹配執行。
二、在管理界面,手動建立名稱爲history的交換器,交換器類型選擇x-consistent-hash
如上圖所示,說明咱們的消息分片已經成功。
可是,咱們怎麼去消費history交換器下的消息呢?
代碼實戰
一、引入rabbitmq的依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、生產者
public class ProductTest {
private static final String EXCHANGE_NAME = "history";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 10000; i++) {
//第一個參數是交換器名稱,第二個參數是routeing key ,注意這裏的routeing key必定要是隨機的,否則消息都會發送到同一個隊列中
channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(10000);
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
消費者向名稱爲history的交換器,發送10000條routeing key爲0~10000,內容爲hello的消息
三、消費者
public class ConsumerTest {
private static final String QUEUE_NAME = "history";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//每次抓取的消息數量
channel.basicQos(32);
for (int i = 0; i < 10; i++) {
Consumer consumer = new MyConsumer(channel);
channel.basicConsume(QUEUE_NAME,consumer);
}
TimeUnit.SECONDS.sleep(120);
channel.close();
connection.close();
}
private static class MyConsumer extends DefaultConsumer{
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息爲:" + new String(body));
super.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
}
10個消費者去消費名稱爲history的隊列,消費者均勻分佈在每一個隊列上(每一個隊列上綁定了5個),每次抓取32個消息去消費。
總結
經過rabbitmq-sharding插件,將本來單個隊列history的分紅了2個隊列,可是對消費者來講,仍是消費的原來的history隊列,而不用管底層實際對應的物理隊列。極大的提升了單個隊列在大併發下的吞吐量。
感謝每一次關注和點贊
本文分享自微信公衆號 - 跟着老萬學java(douzhe_2019)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。