rabbitmq分片插件rabbitmq-sharding,你會了嗎?

RabbitMQ是很是流行的消息中間件,你們都知道經過集羣可以增大它的吞吐量,那麼針對單個隊列,集羣能增大他的吞吐量嗎?若是不能,咱們要怎麼作呢?java

南山遠眺

問題

RabbitMQ是很是流行的消息中間件,你們都知道經過集羣可以增大它的吞吐量,那麼針對單個隊列,集羣能增大他的吞吐量嗎?若是不能,咱們要怎麼作呢?node

答案是集羣並不能的增長單個隊列的吞吐量,這是由於RabbitMQ的普通集羣只是共享元數據信息,達到將整個集羣規模的隊列擴大以增長吞吐量的目的。普通集羣甚至不能保證消息數據的高可用,任意一個broker宕機,都會致使這個broker上的隊列不可用。nginx

而鏡像隊列也僅僅只是保證了實現鏡像複製的隊列的高可用。消費者並不能併發消費複製出來的隊列。git

那麼RabbitMQ是否也能提升相似Kafka的topic分區的機制,來加大單個主題隊列的吞吐量呢?github

經過使用 RabbitMQ Sharding 或Consistent-hash Sharding Exchange 插件來更加靈活地動態均衡隊列壓力,能夠更從容地達到百萬併發的性能。web

這裏我重點介紹下,RabbitMQ Sharding 插件,有興趣的夥伴能夠本身研究下Consistent-hash Sharding Exchange,二者的基本思路一致,都是根據Routeing Key的hash值將消息分發到分片隊列中。spring

原理介紹

官網:https://github.com/rabbitmq/rabbitmq-shardingswift

rabbitmq sharding插件爲您自動對隊列進行分區,也就是說,一旦您將一個exchange 定義爲sharded,那麼在每一個集羣節點上自動建立支持隊列,並在它們之間共享消息。rabbitmq sharding向使用者顯示了一個隊列,但它多是後臺運行在它後面的多個隊列。rabbitmq sharding插件爲您提供了一個集中的位置,經過向集羣中的其餘節點添加隊列,您能夠將消息以及跨多個節點的負載平衡發送到該位置。bash

插件安裝

查看當前插件
find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list
微信

若是有沒有對應的插件,本身下載後複製插件到指定的目錄
手動下載安裝,https://www.rabbitmq.com/community-plugins/

RabbitMQ的有些插件沒有集成在初始的安裝中,它們須要額外安裝,這些文件的後綴爲.ez,安裝時須要將.ez文件拷貝到安裝的插件目錄。如下是不一樣系統中默認安裝的插件目錄路徑:


插件安裝完成後能夠經過命令sudo rabbitmq-plugins list查看已有插件列表,eg:

安裝插件:
./rabbitmq-plugins enable rabbitmq_sharding


重啓生效:
service rabbitmq-server restart

隊列分片

1.配置策略

   
   
    
    
             
    
    

   
   
    
    
             
    
    
find / -name rabbitmqctl
cd /usr/sbin/
./rabbitmqctl set_policy history-shard "^history" \ '{"shards-per-node": 2, "routing-key": "1234"}' \ --apply-to exchanges

說明:
經過rabbitmqctl set_policy設置新增策略,策略名稱爲history-shard,
匹配規則爲^history,shards-per-node表示每一個節點上分片出2個分片隊列,
routing-key爲1234,應用到全部交換器exchanges上去匹配執行。

二、在管理界面,手動建立名稱爲history的交換器,交換器類型選擇x-consistent-hash


如上圖所示,說明咱們的消息分片已經成功。

可是,咱們怎麼去消費history交換器下的消息呢?

代碼實戰

一、引入rabbitmq的依賴包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、生產者

public class ProductTest {
    private static final String EXCHANGE_NAME = "history";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("wanli");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
        for (int i = 0; i < 10000; i++) {
            //第一個參數是交換器名稱,第二個參數是routeing key ,注意這裏的routeing key必定要是隨機的,否則消息都會發送到同一個隊列中
            channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
        }
        channel.waitForConfirmsOrDie(10000);

        TimeUnit.SECONDS.sleep(5);

        channel.close();
        connection.close();
    }
}

消費者向名稱爲history的交換器,發送10000條routeing key爲0~10000,內容爲hello的消息

三、消費者

public class ConsumerTest {
    private static final String QUEUE_NAME = "history";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("wanli");
        factory.setPort(5672);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();

        //每次抓取的消息數量
        channel.basicQos(32);
        for (int i = 0; i < 10; i++) {
            Consumer consumer = new MyConsumer(channel);
            channel.basicConsume(QUEUE_NAME,consumer);
        }
        TimeUnit.SECONDS.sleep(120);

        channel.close();
        connection.close();

    }

    private  static class MyConsumer extends DefaultConsumer{
        public MyConsumer(Channel channel) {
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收到的消息爲:" + new String(body));
            super.getChannel().basicAck(envelope.getDeliveryTag(),false);
        }
    }
}

10個消費者去消費名稱爲history的隊列,消費者均勻分佈在每一個隊列上(每一個隊列上綁定了5個),每次抓取32個消息去消費。

總結

經過rabbitmq-sharding插件,將本來單個隊列history的分紅了2個隊列,可是對消費者來講,仍是消費的原來的history隊列,而不用管底層實際對應的物理隊列。極大的提升了單個隊列在大併發下的吞吐量。

感謝每一次關注和點贊


本文分享自微信公衆號 - 跟着老萬學java(douzhe_2019)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索