轉 https://blog.csdn.net/leixiaotao_java/article/details/78924863java
一、maven依賴
-
-
<groupId>commons-lang</groupId>
-
<artifactId>commons-lang</artifactId>
-
-
-
-
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>
3.4.1</version>
-
二、RabbitMQ重要方法介紹(基本經常使用的)
2.一、建立鏈接
-
-
ConnectionFactory cf =
new ConnectionFactory();
-
-
-
-
-
-
-
-
cf.setPort(AMQP.PROTOCOL.PORT);
-
-
connection = cf.newConnection();
-
-
channel = connection.createChannel();
-
-
-
-
2.二、聲明隊列
-
-
-
-
-
-
-
-
-
-
-
channel.queueDeclare(
"testQueue", true, false, false, null);
此方法通常由Producer調用建立消息隊列。若是由Consumer建立隊列,有可能Producer發佈消息的時候Queue尚未被建立好,會形成消息丟失的狀況。web
2.三、聲明Exchange
-
-
-
-
-
-
-
-
-
-
-
channel.exchangeDeclare(
"leitao","topic", true,false,null);
2.四、將queue和Exchange進行綁定(Binding)
-
-
-
-
-
-
-
-
-
channel.queueBind(
"testQueue", "leitao", "testRoutingKey");
2.五、發佈消息
-
-
-
-
-
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
當exchange的值爲空字符串或者是amq.direct時,此時的交換器類型默認是direct類型,能夠不用單獨聲明Exchange,也不用單獨進行Binding,系統默認將queue名稱做爲RoutingKey進行了綁定。服務器
兩個傳入參數的含義網絡
mandatory異步
當mandatory標誌位設置爲true時,若是exchange根據自身類型和消息routeKey沒法找到一個符合條件的queue,那麼會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置爲false時,出現上述情形broker會直接將消息扔掉。maven
immediateide
當immediate標誌位設置爲true時,若是exchange在將消息路由到queue(s)時發現對於的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的全部queue(一個或者多個)都沒有消費者時,該消息會經過basic.return方法返還給生產者。函數
歸納來講,mandatory標誌告訴服務器至少將該消息route到一個隊列中,不然將消息返還給生產者;immediate標誌告訴服務器若是該消息關聯的queue上有消費者,則立刻將消息投遞給它,若是全部queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。工具
注意:在RabbitMQ3.0之後的版本里,去掉了immediate參數的支持,發送帶immediate=true標記的publish會返回以下錯誤:性能
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。
爲何取消支持:immediate標記會影響鏡像隊列性能,增長代碼複雜性,並建議採用「TTL」和「DLX」等方式替代。
2.6、接收消息
-
-
-
-
-
-
-
channel.basicQos(
10,false);
-
-
channel.basicQos(
15,true);
-
-
-
-
-
-
-
-
-
-
-
channel.basicConsume(queueName,
false, Consumer);
2.七、Consumer處理消息
-
-
-
-
-
-
-
-
-
void handleDelivery(String consumerTag,
-
-
AMQP.BasicProperties properties,
-
-
三、Producer消息確認機制
3.一、什麼是生產者消息確認機制?
沒有消息確認模式時,生產者不知道消息是否是已經到達了Broker服務器,這對於一些業務嚴謹的系統來講將是災難性的。消息確認模式能夠採用AMQP協議層面提供的事務機制實現(此文沒有這種實現方式),可是會下降RabbitMQ的吞吐量。RabbitMQ自身提供了一種更加高效的實現方式:confirm模式。
消息生產者經過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。一旦信道被設置成confirm模式,該信道上的全部消息都會被指派一個惟一的ID(從1開始),一旦消息被對應的Exchange接收,Broker就會發送一個確認給生產者(其中deliveryTag就是此惟一的ID),這樣消息生產者就知道消息已經成功到達Broker。
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。
在channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack 。
3.二、開啓confirm模式
如上所說生產者經過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。
注意:已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的。
3.三、普通confirm模式
普通confirm模式是串行的,即每次發送了一次消息,生產者都要等待Broker的確認消息,而後根據確認標記權衡消息重發仍是繼續發下一條。因爲是串行的,在效率上是比較低下的。
(1)重點方法
-
-
-
-
-
-
-
boolean waitForConfirms() throws InterruptedException;
(2)部分使用代碼以下:
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
-
-
if(channel.waitForConfirms())
-
System.out.println(
"send success!");
-
-
System.out.println(
"send error!");
-
-
3.四、批量confirm模式
批量confirm模式是異步的方式,效率要比普通confirm模式高許多,可是此種方式也會形成線程阻塞,想要進行失敗重發就必需要捕獲異常。網絡上還有采用waitForConfirms()實現批量confirm模式的,可是隻要一條失敗了,就必須把這批次的消息通通再重發一次,很是的消耗性能,所以此文不予考慮。
(1)重點代碼
-
-
-
-
-
-
-
void waitForConfirmsOrDie() throws IOException, InterruptedException;
(2)部分代碼以下:
-
-
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
-
-
-
channel.waitForConfirmsOrDie();
3.五、ConfirmListener監聽器模式
RabbitMQ提供了一個ConfirmListener接口專門用來進行確認監聽,咱們能夠實現ConfirmListener接口來建立本身的消息確認監聽。ConfirmListener接口中包含兩個回調方法:
-
-
-
-
void handleAck(long deliveryTag, boolean multiple) throws IOException;
-
-
-
-
-
void handleNack(long deliveryTag, boolean multiple) throws IOException;
其中deliveryTag是Broker給每條消息指定的惟一ID(從1開始);multiple表示是否接收全部的應答消息,好比multiple=true時,發送100條消息成功事後,咱們並不會收到100次handleAck方法調用。
(1)重要方法
-
-
channel.addConfirmListener(
new MyConfirmListener());
(2)部分使用代碼以下:
-
-
-
-
channel.addConfirmListener(
new MyConfirmListener());
-
-
channel.addReturnListener(
new MyReturnListener());
-
-
-
-
channel.basicPublish(
"",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.
-
-
-
-
-
public class MyConfirmListener implements ConfirmListener{
-
-
-
-
-
-
-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
-
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
-
-
-
-
-
-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)失敗!服務器broker丟失了消息");
-
-
-
-
-
-
-
-
-
public class MyReturnListener implements ReturnListener {
-
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
-
BasicProperties properties, byte[] body) throws IOException {
-
System.out.println(
"消息發送到隊列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文本:"+replyText+";失敗消息對象:"+SerializationUtils.deserialize(body));
-
-
四、Consumer消息確認機制
爲了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在註冊消費者時,能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(或磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它。
當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分紅了兩部分:一部分是等待投遞給消費者的消息(web管理界面上的Ready狀態);一部分是已經投遞給消費者,可是尚未收到消費者ack信號的消息(web管理界面上的Unacked狀態)。若是服務器端一直沒有收到消費者的ack信號,而且消費此消息的消費者已經斷開鏈接,則服務器端會安排該消息從新進入隊列,等待投遞給下一個消費者(也可能仍是原來的那個消費者)。
(1)重要方法
-
-
-
-
-
-
-
-
-
-
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
-
-
-
-
-
-
-
-
-
-
void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
-
-
-
-
-
-
-
-
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
-
-
-
-
-
-
void basicReject(long deliveryTag, boolean requeue) throws IOException;
(2)部分使用代碼以下:
-
-
channel.basicConsume(queueName,
false, this);
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
-
-
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"接收並處理消息:"+tagId);
-
-
channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
五、Demo項目總體代碼
此demo就是向RabbitMQ服務器上面發送20個消息,消息體是map,裏面裝的是tagId=數字。而後註冊了兩個消費者,分別處理奇數和偶數。
5.一、鏈接工具類
-
-
-
-
public class ConnectionUtil {
-
-
-
-
-
-
public ConnectionUtil(String queueName) throws IOException {
-
this.queueName = queueName;
-
-
ConnectionFactory cf =
new ConnectionFactory();
-
-
-
-
-
-
-
cf.setPort(AMQP.PROTOCOL.PORT);
-
-
connection = cf.newConnection();
-
-
channel = connection.createChannel();
-
-
-
-
-
-
-
-
-
-
-
channel.queueDeclare(queueName,
true, false, false, null);
-
-
-
public void close() throws IOException{
-
-
-
-
5.二、具體生產者
-
-
-
-
public class MessageProducer {
-
-
private ConnectionUtil connectionUtil;
-
-
public MessageProducer(ConnectionUtil connectionUtil){
-
this.connectionUtil=connectionUtil;
-
-
-
-
-
public void sendMessage(Serializable object) throws IOException{
-
-
-
-
-
-
-
-
-
-
connectionUtil.channel.basicPublish(
"", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
-
System.out.println(
"MessageProducer發送了一條消息:"+object);
-
-
5.三、公共消費者父類
-
-
-
-
public class MessageConsumer implements Consumer {
-
-
protected String consumerTag;
-
-
protected ConnectionUtil connectionUtil;
-
-
public MessageConsumer(ConnectionUtil connectionUtil){
-
this.connectionUtil=connectionUtil;
-
-
-
public void basicConsume(){
-
-
-
-
-
-
-
-
connectionUtil.channel.basicQos(
10,false);
-
connectionUtil.channel.basicQos(
15,true);
-
-
-
-
-
-
-
-
-
-
connectionUtil.channel.basicConsume(connectionUtil.queueName,
false, this);
-
}
catch (IOException e) {
-
-
-
-
-
-
-
-
public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
-
-
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"消費者:"+consumerTag+",註冊成功!");
-
-
-
-
-
-
-
public void handleCancelOk(String consumerTag) {
-
System.out.println(consumerTag+
" 手動取消消費者註冊成功!");
-
-
-
-
-
-
public void handleCancel(String consumerTag) throws IOException {
-
System.out.println(
"由於外部緣由消費者:"+consumerTag+" 取消註冊!");
-
-
-
-
-
-
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
-
System.out.println(
"通道或基礎鏈接被關閉");
-
-
-
-
-
-
-
-
-
-
public void handleRecoverOk(String consumerTag) {
-
-
-
5.四、具體的消費者
-
-
-
-
public class EvenConsumer extends MessageConsumer {
-
-
public EvenConsumer(ConnectionUtil connectionUtil) {
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"EvenConsumer消費者:"+consumerTag+",註冊成功!");
-
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"EvenConsumer接收並處理消息:"+tagId);
-
-
connectionUtil.channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
connectionUtil.channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
-
-
-
-
-
public class OddConsumer extends MessageConsumer {
-
-
public OddConsumer(ConnectionUtil connectionUtil) {
-
-
-
-
-
public void handleConsumeOk(String consumerTag) {
-
this.consumerTag=consumerTag;
-
System.out.println(
"OddConsumer消費者:"+consumerTag+",註冊成功!");
-
-
-
-
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
-
-
-
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
-
int tagId = (Integer) map.get("tagId");
-
-
-
System.out.println(
"OddConsumer接收並處理消息:"+tagId);
-
-
connectionUtil.channel.basicAck(envelope.getDeliveryTag(),
false);
-
-
-
connectionUtil.channel.basicReject(envelope.getDeliveryTag(),
true);
-
-
-
5.五、監聽器
-
-
-
-
public class MyConfirmListener implements ConfirmListener{
-
-
-
-
-
-
-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
-
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
-
-
-
-
-
-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println(
"Exchange接收消息:"+deliveryTag+"(deliveryTag)失敗!服務器broker丟失了消息");
-
-
-
-
-
-
-
-
-
public class MyReturnListener implements ReturnListener {
-
-
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
-
BasicProperties properties, byte[] body) throws IOException {
-
System.out.println(
"消息發送到隊列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文本:"+replyText+";失敗消息對象:"+SerializationUtils.deserialize(body));
-
-
5.六、客戶端
-
-
-
public static void main(String[] args) {
-
-
-
-
-
-
-
-
-
-
}
catch (IOException e) {
-
-
}
catch (InterruptedException e) {
-
-
-
-
-
public void publishMessage() throws IOException, InterruptedException{
-
ConnectionUtil connectionUtil=
new ConnectionUtil("testqueue");
-
MessageProducer producer=
new MessageProducer(connectionUtil);
-
connectionUtil.channel.confirmSelect();
-
-
connectionUtil.channel.addConfirmListener(
new MyConfirmListener());
-
connectionUtil.channel.addReturnListener(
new MyReturnListener());
-
-
-
HashMap<String, Object> map=
new HashMap<String, Object>();
-
-
producer.sendMessage(map);
-
-
-
-
-
public void addConsumer() throws IOException{
-
ConnectionUtil connectionUtil=
new ConnectionUtil("testqueue");
-
OddConsumer odd=
new OddConsumer(connectionUtil);
-
-
EvenConsumer even=
new EvenConsumer(connectionUtil);
-
-
-
-
5.七、測試結果
-
MessageProducer發送了一條消息:{tagId=
1}
-
MessageProducer發送了一條消息:{tagId=
2}
-
MessageProducer發送了一條消息:{tagId=
3}
-
Exchange接收消息:
1(deliveryTag)成功!multiple=false
-
Exchange接收消息:
2(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
4}
-
Exchange接收消息:
3(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
5}
-
Exchange接收消息:
4(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
6}
-
Exchange接收消息:
5(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
7}
-
Exchange接收消息:
6(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
8}
-
Exchange接收消息:
7(deliveryTag)成功!multiple=false
-
Exchange接收消息:
8(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
9}
-
Exchange接收消息:
9(deliveryTag)成功!multiple=false
-
MessageProducer發送了一條消息:{tagId=
10}
-
Exchange接收消息:
10(deliveryTag)成功!multiple=false
-
OddConsumer消費者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,註冊成功!
-
-
-
-
-
-
EvenConsumer消費者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,註冊成功!
-
-
-
-
-
六、Demo完整源碼下載地址
Java使用RabbitMQ完整項目源碼.rar