rabbitmq知識庫: http://rabbitmq.org.cn/html
Rabbitmq捨棄了繁重的事務消息而使用了消息確認機制實現了分佈式事務,實在是解耦之一大神器。可是其配置起來挺麻煩,各類參數,各類調整。但國內貌似資料不多,找來找去都找不到,本身擼一發先spring
發送確認用來確保消息是否已送達消息隊列。消息一旦到達消息服務,就會觸發確認機制,能夠分爲兩種狀況:數據庫
消費確認用來確保消費者是否成功的消費了消息。一旦有消費者成功註冊到相應的消息服務,消息將會被消息服務經過basic.deliver推(push)給消費者,此時消息會包含一個deliver tag用來惟一的標識消息。若是此時是手動模式,就須要手動的確認消息已經被成功消費,不然消息服務將會重發消息(由於消息已經持久化到了硬盤上,因此不管消息服務是否是可能掛掉,都會重發消息)。並且必須確認,不管是成功或者失敗,不然會引發很是嚴重的問題緩存
有三種狀況可能進死信交換機服務器
只須要設置一個args,就ok拉網絡
channel.exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);
Channel Prefetch Setting (QoS),表示當前channel中未應答消息的數目,若是超過了,隊列中將再也不接受新的消息。這裏所謂的channel就是指從消息服務到消費者的一個通道,簡單來講就是指消息從消息隊列發送到消費者了,若是沒收到應答,就算是一個Qos併發
加大這個值會增長消息的發送速度(Throughput),可是會加劇消息隊列的內存,因此100-300之間是一個比較理想的狀態,可參考:http://next.rabbitmq.com/conf... :Channel Prefetch Setting (QoS)。分佈式
暴力的設置微100-300是存在一些問題的,若是太大,可能消息所有都壓在消費者中而得不到消費,看起來隊列是空的,實際上所有積壓在客戶端;若是過小則得不到消費,浪費資源。具體該怎麼設置要根據實際的網絡吞吐量、以及消費者的消費能力。好比果消費者很快,是內存操做,那麼你設置很大,甚至不設置均可以;可是若是消費者很慢,好比是個數據庫操做,那麼極可能將消息所有積壓到消費者而得不到響應
。能夠參考http://www.rabbitmq.com/blog/...微服務
Qos同時也會存在一個問題,一個channel是會被多個消費者的,因此必須計算出全部消費者中未應答的數目,這顯然是很是不合理的,並且很麻煩,因此能夠改成設置每一個消費者緩存(Prefetch Buf)能夠容許的最大的數目。高併發
例子:
// 1. 一會兒設置全部的queue都是10條unacknowledged Channel channel = ...; Consumer consumer = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue", false, consumer); //2. 分別設置10條 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2); //3. 分別設置channel和consume,我的不推薦 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10, false); // Per consumer limit channel.basicQos(15, true); // Per channel limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2);
當前最大容許空閒的最大channel數。若是在高併發的環境中,若是值太小的話channel會關關開開很是頻繁。因此在1.6的版本中,spring amqp將這個值從1提升到了25。同時你也能夠從RabbitMQ Admin中心觀察到channel關關開開,那麼就能夠考慮增大cache的值了。
當你遇到 connetion error的錯誤時,就能夠考慮增大channel cache size了。
通道(channel)中的消息標誌,按照正數遞增,消息隊列中用來標識消息的惟一標識
當消息服務器資源不足時,會向全部的生產者發送這個消息,咱們能夠捕獲這個消息並作處理,資源能夠是內存不足,cpu負載太重等等。
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); connection.addBlockedListener(new BlockedListener() { public void handleBlocked(String reason) throws IOException { // Connection is now blocked } public void handleUnblocked() throws IOException { // Connection is now unblocked } });
話說真出現block了也應該是在管理臺直接給出警告,不過也能夠作一下避免異常
全應答標識。若是設置爲true,一條消息應答了,那麼以前的所有消息將被應答。好比目前channel中有delivery tags爲5,6,7,8的消息,那麼一旦8被應答,那麼5,6,7將都被應答,若是設置爲false,那麼5,6,7將不會被應答。(不建議設置,畢竟一個channel中會綁定好多consumer)
當消息服務發生異常時,不會發送basic.ack,反而會發送一個basic.nack,並且不會自動requeue,此時須要消息發送方手動處理,進行重發。只有一種狀況會發送nack:「basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue」
1 mq配置的listener到什麼地方,是配置到每一個微服務、或者是配置到mq中?若是在每一個服務裏面都配置concurency,是否是隨着節點的增長,listener數量也會無限增長?2 ssl和max-queue-length的配置