Rabbitmq各參數詳解

rabbitmq知識庫: http://rabbitmq.org.cn/html

前言:

Rabbitmq捨棄了繁重的事務消息而使用了消息確認機制實現了分佈式事務,實在是解耦之一大神器。可是其配置起來挺麻煩,各類參數,各類調整。但國內貌似資料不多,找來找去都找不到,本身擼一發先spring

1 發送確認

發送確認用來確保消息是否已送達消息隊列。消息一旦到達消息服務,就會觸發確認機制,能夠分爲兩種狀況:數據庫

  1. 對於沒法被路由的消息,一旦無法找到一個隊列來消費它,就會觸發確認沒法消費,此時ack=false。旦有一種狀況例外,就是連exchange都無法找到,若是設置了mandatory, 此時就會先觸發basic.return,就是會先觸發returncallback回調。
  2. 對於能夠被路由的消息,當消息被(全部的?)queue接受時,會觸發ack=true;對於設置了持久化(persistent)的消息,當消息成功的持久化到硬盤上纔會觸發;對於設置了鏡像(mirror)的消息,那麼是當全部的mirror接受到這個消息。

2 消費確認(Delivery Acknowledgements)

消費確認用來確保消費者是否成功的消費了消息。一旦有消費者成功註冊到相應的消息服務,消息將會被消息服務經過basic.deliver推(push)給消費者,此時消息會包含一個deliver tag用來惟一的標識消息。若是此時是手動模式,就須要手動的確認消息已經被成功消費,不然消息服務將會重發消息(由於消息已經持久化到了硬盤上,因此不管消息服務是否是可能掛掉,都會重發消息)。並且必須確認,不管是成功或者失敗,不然會引發很是嚴重的問題緩存

3 死信交換機(Dead Letter Exchanges)

有三種狀況可能進死信交換機服務器

  1. 被reject或者nack,而且requeue設置爲false
  2. 消息最大存活時間(TTL)超時
  3. 消息數量超過最大隊列長度

只須要設置一個args,就ok拉網絡

channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

4 Qos

clipboard.png

Channel Prefetch Setting (QoS),表示當前channel中未應答消息的數目,若是超過了,隊列中將再也不接受新的消息。這裏所謂的channel就是指從消息服務到消費者的一個通道,簡單來講就是指消息從消息隊列發送到消費者了,若是沒收到應答,就算是一個Qos併發

加大這個值會增長消息的發送速度(Throughput),可是會加劇消息隊列的內存,因此100-300之間是一個比較理想的狀態,可參考:http://next.rabbitmq.com/conf... :Channel Prefetch Setting (QoS)。分佈式

暴力的設置微100-300是存在一些問題的,若是太大,可能消息所有都壓在消費者中而得不到消費,看起來隊列是空的,實際上所有積壓在客戶端;若是過小則得不到消費,浪費資源。具體該怎麼設置要根據實際的網絡吞吐量、以及消費者的消費能力。好比果消費者很快,是內存操做,那麼你設置很大,甚至不設置均可以;可是若是消費者很慢,好比是個數據庫操做,那麼極可能將消息所有積壓到消費者而得不到響應
。能夠參考http://www.rabbitmq.com/blog/...微服務

Qos同時也會存在一個問題,一個channel是會被多個消費者的,因此必須計算出全部消費者中未應答的數目,這顯然是很是不合理的,並且很麻煩,因此能夠改成設置每一個消費者緩存(Prefetch Buf)能夠容許的最大的數目。高併發

clipboard.png

例子:

// 1. 一會兒設置全部的queue都是10條unacknowledged
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

//2. 分別設置10條
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

//3. 分別設置channel和consume,我的不推薦
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

5 channelCacheSize

當前最大容許空閒的最大channel數。若是在高併發的環境中,若是值太小的話channel會關關開開很是頻繁。因此在1.6的版本中,spring amqp將這個值從1提升到了25。同時你也能夠從RabbitMQ Admin中心觀察到channel關關開開,那麼就能夠考慮增大cache的值了。

當你遇到 connetion error的錯誤時,就能夠考慮增大channel cache size了。

6 其它參數和配置

delivery tags

通道(channel)中的消息標誌,按照正數遞增,消息隊列中用來標識消息的惟一標識

Blocked Connection Notifications

當消息服務器資源不足時,會向全部的生產者發送這個消息,咱們能夠捕獲這個消息並作處理,資源能夠是內存不足,cpu負載太重等等。

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
connection.addBlockedListener(new BlockedListener() {
    public void handleBlocked(String reason) throws IOException {
        // Connection is now blocked
    }

    public void handleUnblocked() throws IOException {
        // Connection is now unblocked
    }
});

話說真出現block了也應該是在管理臺直接給出警告,不過也能夠作一下避免異常

Multiple

全應答標識。若是設置爲true,一條消息應答了,那麼以前的所有消息將被應答。好比目前channel中有delivery tags爲5,6,7,8的消息,那麼一旦8被應答,那麼5,6,7將都被應答,若是設置爲false,那麼5,6,7將不會被應答。(不建議設置,畢竟一個channel中會綁定好多consumer)

basic.nack

當消息服務發生異常時,不會發送basic.ack,反而會發送一個basic.nack,並且不會自動requeue,此時須要消息發送方手動處理,進行重發。只有一種狀況會發送nack:「basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue」

1 mq配置的listener到什麼地方,是配置到每一個微服務、或者是配置到mq中?若是在每一個服務裏面都配置concurency,是否是隨着節點的增長,listener數量也會無限增長?2 ssl和max-queue-length的配置

相關文章
相關標籤/搜索