目錄java
在公司裏一直在用RabbitMQ,因爲api已經封裝的很簡單,關於RabbitMQ自己還有封裝的實現沒有了解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文檔,實現了RabbitMQ和spring的集成,對着本身平時的疑惑作了一些總結。
關於RabbitMQ基礎不在詳細講解(本文不適合RabbitMq零基礎),RabbitMQ實戰的1,2,4三章講的很是不錯。由於書中講的都是Python和Php的例子,因此本身結合SpringBoot文檔和朱小廝的博客作了一些總結,寫了一些Springboot的例子。git
SpringAMQP項目對RabbitMQ作了很好的封裝,能夠很方便的手動聲明隊列,交換器,綁定。以下:github
/** * 隊列 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue() { return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true); } /** * 交換器 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange() { return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true); } /** * 聲明綁定關係 * @return */ @Bean Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange, @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY); } /** * 聲明簡單的消費者,接收到的都是原始的{@link Message} * * @param connectionFactory * * @return */ @Bean SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageListener(message -> log.info("simple receiver,message:{}", message)); container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE); return container; }
消費者和生產者均可以聲明,交換器這種通常常常建立,能夠手動建立。須要注意對於沒有路由到隊列的消息會被丟棄。spring
若是是Spring的話還須要聲明鏈接:api
@Bean ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.username}") String userName, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setVirtualHost(vhost); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(isConfirm); }
在配置類使用@EnableRabbit
的狀況下,也能夠基於註解進行聲明,在Bean的方法上加上@RabbitListener
,以下:服務器
/** * 能夠直接經過註解聲明交換器、綁定、隊列。可是若是聲明的和rabbitMq中已經存在的不一致的話 * 會報錯便於測試,我這裏都是不使用持久化,沒有消費者以後自動刪除 * {@link RabbitListener}是能夠重複的。而且聲明隊列綁定的key也能夠有多個. * * @param headers * @param msg */ @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = DKEY ), //手動指明消費者的監聽容器,默認Spring爲自動生成一個SimpleMessageListenerContainer containerFactory = "container", //指定消費者的線程數量,一個線程會打開一個Channel,一個隊列上的消息只會被消費一次(不考慮消息從新入隊列的狀況),下面的表示至少開啓5個線程,最多10個。線程的數目須要根據你的任務來決定,若是是計算密集型,線程的數目就應該少一些 concurrency = "5-10" ) public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); } /** * {@link Queue#ignoreDeclarationExceptions}聲明隊列會忽略錯誤不聲明隊列,這個消費者仍然是可用的 * * @param headers * @param msg */ @RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT)) public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); }
這個比較簡單,默認採用了Java序列化,咱們通常使用的Json格式,因此配置了Jackson,根據本身的狀況來,直接貼代碼:app
@Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
若是是同一個隊列多個消費類型那麼就須要針對每種類型提供一個消費方法,不然找不到匹配的方法會報錯,以下:負載均衡
@Component @Slf4j @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.MULTIPART_HANDLE_KEY ) ) @Profile(SpringConstant.MULTIPART_PROFILE) public class MultipartConsumer { /** * RabbitHandler用於有多個方法時可是參數類型不能同樣,不然會報錯 * * @param msg */ @RabbitHandler public void process(ExampleEvent msg) { log.info("param:{msg = [" + msg + "]} info:"); } @RabbitHandler public void processMessage2(ExampleEvent2 msg) { log.info("param:{msg2 = [" + msg + "]} info:"); } /** * 下面的多個消費者,消費的類型不同沒事,不會被調用,可是若是缺了相應消息的處理Handler則會報錯 * * @param msg */ @RabbitHandler public void processMessage3(ExampleEvent3 msg) { log.info("param:{msg3 = [" + msg + "]} info:"); } }
在上面也看到了@Payload
等註解用於注入消息。這些註解有:框架
這裏有一點主要注意,若是是com.rabbitmq.client.Channel
,org.springframework.amqp.core.Message
和org.springframework.messaging.Message
這些類型,能夠不加註解,直接能夠注入。
若是不是這些類型,那麼不加註解的參數將會被當作消息體。不能多於一個消息體。以下方法ExampleEvent就是默認的消息體:異步
public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);
RabbitMq消費者能夠選擇手動和自動確認兩種模式,若是是自動,消息已到達隊列,RabbitMq對無腦的將消息拋給消費者,一旦發送成功,他會認爲消費者已經成功接收,在RabbitMq內部就把消息給刪除了。另一種就是手動模式,手動模式須要消費者對每條消息進行確認(也能夠批量確認),RabbitMq發送完消息以後,會進入到一個待確認(unacked)的隊列,以下圖紅框部分:
若是消費者發送了ack,RabbitMq將會把這條消息從待確認中刪除。若是是nack而且指明不要從新入隊列,那麼該消息也會刪除。可是若是是nack且指明瞭從新入隊列那麼這條消息將會入隊列,而後從新發送給消費者,被從新投遞的消息消息頭amqp_redelivered屬性會被設置成true,客戶端能夠依靠這點來判斷消息是否被確認,能夠好好利用這一點,若是每次都從新回隊列會致使同一消息不停的被髮送和拒絕。消費者在確認消息以前和RabbitMq失去了鏈接那麼消息也會被從新投遞。因此手動確認模式很大程度上提升可靠性。自動模式的消息能夠提升吞吐量。
spring手動確認消息須要將SimpleRabbitListenerContainerFactory
設置爲手動模式:
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
手動確認的消費者代碼以下:
@SneakyThrows @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.CONFIRM_KEY), containerFactory = "containerWithConfirm") public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag, @Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) { try { log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head); channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("consume confirm error!", e); //這一步千萬不要忘記,不會會致使消息未確認,消息到達鏈接的qos以後便不能再接收新消息 //通常重試確定的有次數,這裏簡單的根據是否已經重發過來來決定重發。第二個參數表示是否從新分發 channel.basicReject(deliveryTag, !redelivered); //這個方法我知道的是比上面多一個批量確認的參數 // channel.basicNack(deliveryTag, false,!redelivered); } }
關於spring的AcknowledgeMode須要說明,他一共有三種模式:NONE,MANUAL,AUTO,默認是AUTO模式。這比RabbitMq原生多了一種。這一點很容易混淆,這裏的NONE對應其實就是RabbitMq的自動確認,MANUAL是手動。而AUTO其實也是手動模式,只不過是Spring的一層封裝,他根據你方法執行的結果自動幫你發送ack和nack。若是方法未拋出異常,則發送ack。若是方法拋出異常,而且不是AmqpRejectAndDontRequeueException
則發送nack,而且從新入隊列。若是拋出異常時AmqpRejectAndDontRequeueException
則發送nack不會從新入隊列。我有一個例子專門測試NONE,見CunsumerWithNoneTest
。
還有一點須要注意的是消費者有一個參數prefetch,它表示的是一個
考慮這樣一個場景:你發送了一個消息給RabbitMq,RabbitMq接收了可是存入磁盤以前服務器就掛了,消息也就丟了。爲了保證消息的投遞有兩種解決方案,最保險的就是事務(和DB的事務沒有太大的可比性), 可是由於事務會極大的下降性能,會致使生產者和RabbitMq之間產生同步(等待確認),這也違背了咱們使用RabbitMq的初衷。因此通常不多采用,這就引入第二種方案:發送者確認模式。
發送者確認模式是指發送方發送的消息都帶有一個id,RabbitMq會將消息持久化到磁盤以後通知生產者消息已經成功投遞,若是由於RabbitMq內部的錯誤會發送nack。注意這裏的發送者和RabbitMq之間是異步的,因此相較於事務機制性能大大提升。其實不少操做都是不能保證絕對的百分之一百的成功,哪怕採用了事務也是如此,可靠性和性能不少時候須要作一些取捨,想不少互聯網公司吹噓的5個9,6個9也是同樣的道理。若是不是重要的消息如:性能計數器,徹底能夠不採用發送者確認模式。
這裏有一點我當時糾結了好久,我一直覺得發送者確認模式的回調是客戶端的ack觸發的,這裏是大大的誤解!發送者確認模式和消費者沒有一點關係,消費者確認也和發送者沒有一點關係,二者都是在和RabbitMq打交道,發送者不會管消費者有沒有收到,只要消息到了RabbitMq而且已經持久化便會通知生產者,這個ack是RabbitMq自己發出的,和消費者無關
發送者確認模式須要將Channel設置成Confirm模式,這樣纔會收到通知。Spring中須要將鏈接設置成Confirm模式:
connectionFactory.setPublisherConfirms(isConfirm);
而後在RabbitTemplate中設置確認的回調,correlationData是消息的id,以下(只是簡單打印下):
// 設置RabbitTemplate每次發送消息都會回調這個方法 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));
發送時須要給出惟一的標識(CorrelationData
):
rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY, new ExampleEvent(i, "confirm message id:" + i), new CorrelationData(Integer.toString(i)));
還有一個參數須要說下:mandatory。這個參數爲true表示若是發送消息到了RabbitMq,沒有對應該消息的隊列。那麼會將消息返回給生產者,此時仍然會發送ack確認消息。
設置RabbitTemplate的回調以下:
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外若是是RabbitMq內部的錯誤,不會調用該方法。因此若是消息特別重要,對於未確認的消息,生產者應該在內存用保存着,在確認時候根據返回的id刪除該消息。若是是nack能夠將該消息記錄專門的日誌或者轉發到相應處理的邏輯進行後續補償。RabbitTemplate也能夠配置RetryTemplate,發送失敗時直接進行重試,具體仍是要結合業務。
最後關於發送者確認須要提的是spring,由於spring默認的Bean是單例的,因此針對不一樣的確認方案(其實有不一樣的確認方案是比較合理的,不少消息不須要確認,有些須要確認)須要配置不一樣的bean.
上面也提到了若是消費者拋出異常時默認的處理邏輯。另外咱們還能夠給消費者配置RetryTemplate,若是是採用SpringBoot的話,能夠在application.yml配置中配置以下:
spring: rabbitmq: listener: retry: # 重試次數 max-attempts: 3 # 開啓重試機制 enabled: true
如上,若是消費者失敗的話會進行重試,默認是3次。注意這裏的重試機制RabbitMq是爲感知的!到達3次以後會拋出異常調用MessageRecoverer
。默認的實現爲RejectAndDontRequeueRecoverer,也就是打印異常,發送nack,不會從新入隊列。
我想既然配置了重試機制消息確定是很重要的,消息確定不能丟,僅僅是日誌可能會由於日誌滾動丟失並且信息不明顯,因此咱們要講消息保存下來。能夠有以下這些方案:
我比較推薦前兩種。這裏說下死信隊列,死信隊列其實就是普通的隊列,只不過一個隊列聲明的時候指定的屬性,會將死信轉發到該交換器中。聲明死信隊列方法以下:
@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT, arguments = { @Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE), @Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY) }), key = RabbitMQConstant.DEFAULT_KEY ))
其實也就只是在聲明的時候多加了兩個參數x-dead-letter-exchange和x-dead-letter-routing-key。這裏一開始踩了一個坑,由於@QueueBinding
註解中也有arguments屬性,我一開始將參數聲明到@QueueBinding
中,致使一直沒綁定成功。若是綁定成功能夠在控制檯看到queue的Featrues有DLX(死信隊列交換器)和DLK(死信隊列綁定)。以下:
咱們用到的就是第一種。
原本生產者和消費者是沒有耦合的,可是能夠經過一些屬性產生耦合。在早期版本中,若是一個生產者想要收到消費者的回覆,實現方案是生產者在消息頭中加入reply-to屬性也就是隊列(通常是私有,排他,用完即銷燬)的名字,而後在這個隊列上進行監聽,消費者將回復發送到這個隊列中。RabbitMq3.3以後有了改進,就是不用沒有都去建立一個臨時隊列,這樣很耗費性能,能夠採用drect-to模式,省去了每次建立隊列的性能損耗,可是仍是要建立一次隊列。如今Spring默認的就是這個模式。RabbitTemplate中有一系列的sendAndReceiveXX
方法。默認等待5秒,超時返回null。用
法和不帶返回的差很少。
消費者的方法經過返回值直接返回消息(下面的方法是有返回值的):
public String receive(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:"); return REPLY; }
這裏的提一下最後一個註解@SendTo
,用在消費方法上,指明返回值的目的地,默認不用的話就是返回給發送者,能夠經過這個註解改變這種行爲。以下代碼:
@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.REPLY_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.REPLY_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.REPLY_KEY ) ) @SendTo("queue.reply.s") public ExampleEvent log(ExampleEvent event) { log.info("log receive message:O{}", event); return new ExampleEvent(1, "log result"); }
上面的代碼就是會將消息直接發送到默認交換器,而且以queue.reply.s做爲路由鍵。@SendTo的格式爲exchange/routingKey用法以下:
這裏還須要提一下,由於默認全部的隊列都會綁定到空交換器,而且以隊列名字做爲Routekey, 因此SendTo裏面能夠直接填寫隊列名字機會發送到相應的隊列.如日誌隊列。由於RPC模式不經常使用,專業的東西作專業的事,就像咱們通常不用Redis來作消息隊列同樣(雖然他也能夠實現),通常公司都有特定的技術棧,確定有更合適的RPC通訊框架。固然若是要跨語言的集成這個方案也是一種不錯的方案,能夠繼續考慮採用異步發送AsyncRabbitTemplate
來下降延遲等優化方案!
RabbitMQ底層的消費模型有兩種Push和Pull。我在網上查閱資料的時候發現有不少教程採用了pull這種模式。RabbitMq實戰和
RabbitMQ之Consumer消費模式(Push & Pull)都指出這種模式性能低,會影響消息的吞吐量,增長沒必要要的IO,因此除非有特殊的業務需求,不要採用這種方案。Spring的封裝就是採用了push的方案。
這裏講的是消費者的,生產者沒什麼好講的。先看消息流轉圖:
圖中橢圓表示線程,矩形是隊列。消息到達AMQP的鏈接線程,而後分發到client線程池,隨後分發到監聽器。注意除了監聽器的線程,其餘都是在com.rabbitmq.client.impl.AMQConnection
中建立的線程,咱們對線程池作一些修改。鏈接線程名字不能修改就是AMQP Connection打頭。心跳線程能夠設置setConnectionThreadFactory來設置名字。以下:
connectionFactory.setConnectionThreadFactory(new ThreadFactory() { public final AtomicInteger id = new AtomicInteger(); @Override public Thread newThread(Runnable r) { return new Thread(r, MessageFormat.format("amqp-heart-{0}", id.getAndIncrement())); } });
client線程池見:com.rabbitmq.client.impl.ConsumerWorkService
構造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。
final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() { public final AtomicInteger id = new AtomicInteger(); @Override public Thread newThread(Runnable r) { return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement())); } });
listener的線程設置以下:
simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));
注意:SimpleAsyncTaskExecutor每次執行一個任務都會新建一個線程,對於生命週期很短的任務不要使用這個線程池(如client線程池的任務), 這裏的消費者線程生命週期直到SimpleMessageListenerContainer中止因此沒有適合這個場景
修改過以後的線程以下:
消息投遞過程以下:
public void startMainLoop() { MainLoop loop = new MainLoop(); final String name = "AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); }
上面的是默認的消費者監聽器。SpringAMQP 2.0引入了一個新的監聽器實現DirectMessageListenerContainer
。這個實現最大的變化在於消費者的處理邏輯不是在本身的線程池中執行而是直接在client線程池中處理,這樣最明顯的是省去了線程的上下文切換的開銷,並且設計上也變得更爲直觀。因此若是採用這個監聽器須要覆蓋默認的線程池加大Connection的線程池。採用這個監聽器只須要設置@RabbitListener
的containerFactory屬性。聲明方法以下:
@Bean DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory(); directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors()); directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory); directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); directRabbitListenerContainerFactory.setConsumersPerQueue(10); return directRabbitListenerContainerFactory; }
這時的消息流轉圖以下:
還有一些關於監聽器的例子和Springboot配置我放在了源碼裏,這裏再也不講述。