RabbitMq是一個消息代理:它接收、存儲、轉發消息。它由3個組件構成,生產者、隊列、消費者。ide
生產者:向隊列中發送消息。函數
隊列:存儲生產者發送過來的消息,並轉發給消費者。spa
消費者:接收到隊列轉發過來的消息,消費處理。3d
1.簡單隊列模型圖代理
2.實現生產者code
① 聲明對列名blog
private final static String QUEUE_NAME = "hello.rabbitmq";
② 建立鏈接---鏈接RabbitMQrabbitmq
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = null; try { connection = factory.newConnection(); }catch (Exception e){ e.printStackTrace(); }
③ 建立通道隊列
Channel channel = connection.createChannel();
④ 通道中聲明隊列內存
channel.queueDeclare(QUEUE_NAME ,true,false,false,null);
⑤ 發送消息
String mssage = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
⑥ 關閉鏈接
channel.close();
connection.close();
3.實現消費者
方式一:
①-④:前4步驟與生產者徹底相同
⑤:收到消息以後的回調函數
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); };
⑥:建立消費者。從指定隊列中消費消息
channel.basicConsume(QUEUE_NAME, true,deliverCallback,consumerTag->{});
①-④:與生產這徹底相同。
⑤:定義消費者,該消費者內包含 handleDelivery()方法,當消息到達隊列以後,就會觸發這個方法。
//定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { //獲取到達的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println(message); //手動回執 channel.basicAck(envelope.getDeliveryTag(),false); }
};
⑥:監聽隊列
channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,consumer);
● 輪詢模式(默認)
public void selectAll(){ //MQUtil的具體實現參照簡單隊列生產者中第二步驟 Connection connection = MQUtil.getConnection(); Channel channel = null; try {
//得到信道 channel = connection.createChannel();for(int i=0;i< 50;i++){ String message="消息:"+i;
//聲明隊列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null); //發送消息
channel.basicPublish("",UserQueues.HELLO_RABBITMQ_SELECT_ALL,null,message.getBytes()); Thread.sleep(1000); } channel.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } }
● 公平分發模式
public void selectAll(){ Connection connection = MQUtil.getConnection(); Channel channel = null; try { channel = connection.createChannel(); //限制每次只給同一個消費者發送一條消息,收到消息確認以後再次發送第二條消息
channel.basicQos(1); for(int i=0;i< 50;i++){ String message="消息:"+i; channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null); channel.basicPublish("",UserQueues.HELLO_RABBITMQ_SELECT_ALL,null,message.getBytes()); Thread.sleep(1000); } channel.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } }
● 輪詢模式(默認)public void selectAllUser(byte[] bytes){ Connection connection = MQUtil.getConnection(); try { Channel channel = connection.createChannel(); //定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { //獲取到達的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String message = new String(body); System.out.println(message); } }; //監聽隊列
boolean autoack = true;
channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,autoack,consumer); }catch (Exception e){ e.printStackTrace(); } }
● 公平分發模式
public void selectAllUser(byte[] bytes){ Connection connection = MQUtil.getConnection(); try { Channel channel = connection.createChannel(); channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { //獲取到達的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println(message); //手動回執,處理完消息以後,回饋給隊列 channel.basicAck(envelope.getDeliveryTag(),false); } }; //監聽隊列,第二個參數改成false,意思是關閉自動應答,採用手動應答的模式
boolean autoack = false; channel.basicConsume(UserQueues.HELLO_RABBITMQ_SELECT_ALL,autoack,consumer); }catch (Exception e){ e.printStackTrace(); } }
以上兩種消費者,輪詢模式:採用自動確認模式。一旦rabbitmq將消息發送到消費者,消息就會從內存中刪除,此時若是消費者還沒有消費完成,可是由於某種緣由,消費者掛掉,還沒有消費完成的消息就會丟失。
公平分發模式:採用的手動確認模式。rabbitmq發送消息給消費者,若是此消費者發生異常,該消息將交給另外一個正常的消費者去消費,消費完成以後,消費這發送ack消息告訴rabbitmq。本身消費完成,rabbitmq從內存中刪除該消息。
消息應答主要是爲了解決消費者發生異常時的處理方法,可是當Rabbitmq集羣發生異常時,也會形成消息的丟失。爲了應對這種狀況,咱們須要將RabbitMQ中的消息持久化,將消息保存在磁盤中。
設置方式:
在生產者中實現,在生命隊列時,將第二個參數設置爲true便可;
channel.queueDeclare(UserQueues.HELLO_RABBITMQ_SELECT_ALL,true,false,false,null);
①:將生產者的消息,發送到隊列當中,供消費者消費。
②:在rabbitmq中只有隊列具備存儲能力,交換機是不具有存儲能力的。
③:交換機的幾種類型:
● Fanout(不處理路由鍵)
特色:路由鍵爲空字符串,交換機將消息裝發到全部與之綁定的隊列中。
● Direct(處理路由鍵)
特色:生產者與消費者都有定義的路由鍵,轉發器將消息轉發到與之綁定的而且與生產者有相同路由鍵的隊列當中。
●
P:生產者
X:交換機(轉發器),生產者發送的消息通過交換機,轉發到多個消息隊列當中。
目的:向交換機中發送消息
步驟:
①:經過ConnectionFactory類,與rabbitmq獲取鏈接。
②:經過該鏈接建立一個通道,接下來的工做都是在該通道內部完成。
③:在通道內聲明交換機。
④:向交換機發送消息。
實現:
/** * 向交換機中發送消息 * */ public class Send { public static void main(String[] args) throws Exception{ String message = "rabbitmq的交換機"; //與mq得到鏈接 Connection connection = MQUtil.getConnection(); //建立一個通道 Channel channel = connection.createChannel(); //聲明交換機,第二個參數表明交換機的類型 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,"fanout"); System.out.println(message); //向交換機中發送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,"",null,message.getBytes()); //斷開鏈接 channel.close(); connection.close(); } }
目的:從隊列中消費消息
步驟:
①:經過ConnectionFactory類,與rabbitmq獲取鏈接。
②:經過該鏈接建立信道。
③:在信道內聲明隊列。
④:將隊列綁定到交換機。
⑤:定義消費者。
⑥:監聽隊列。
實現:
public class Reve1 { public static void main(String[] args) throws Exception{ Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_EXCHANGE11,true,false,false,null); //綁定隊列到交換機 channel.queueBind(UserQueues.HELLO_RABBITMQ_EXCHANGE11,MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_FANOUT,""); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("隊列1:"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; //監聽隊列 channel.basicConsume(UserQueues.HELLO_RABBITMQ_EXCHANGE11,false,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }
目的: 聲明Direct類型的交換機,將消息發送到交換機,向交換機發送消息時,設置路由鍵
實現:
public class Send { public static void main(String[] args) throws Exception{ String message="rabbitmq路由模式"; Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,"direct"); System.out.println(message); String routingKey="info"; //向交換機發送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,routingKey,null,message.getBytes()); channel.close(); connection.close(); } }
目的:從隊列中消費消息
實現:
public class Rece1 { public static void main(String[] args) throws Exception{ Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1,true,false,false,null); String routingKey="error"; //將隊列綁定到交換機 channel.queueBind(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1, MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_DIRECT,routingKey); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String(body); System.out.println("消費者1:"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(UserQueues.HELLO_RABBITMQ_ROUTING_EXCHANGE1,false,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }
①:路由模式中交換機與隊列之間使用的是固定的routingKey來綁定的,主題模式中可使用統配符。
②:# 匹配多個,* 匹配單個。
/** * 主題模式: * 第三種交換機:通配符匹配交換機類型 * #:匹配所有 * *:匹配單個 */ public class Send { public static void main(String[] args) throws Exception{ String message="主題模式交換機"; String routingKey="order.save"; System.out.println(message); Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,"topic"); //向交換機發送消息 channel.basicPublish(MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,routingKey,null,message.getBytes()); channel.close(); connection.close(); } }
public class Reve1 { public static void main(String[] args) throws IOException,InterruptedException, TimeoutException { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,true,false,false,null); //將隊列綁定到交換機 channel.queueBind(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,MessageExchange.RABBITMQ_MESSAGE_EXCHANGE_TOPIC,"order.#"); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消費者1"+message); channel.basicAck(envelope.getDeliveryTag(),false); } }; //定義監聽 boolean autoAck=false; channel.basicConsume(UserQueues.HELLO_RABBITMQ_TOPIC_EXCHANGE1,autoAck,consumer); Thread.sleep(2000); channel.close(); connection.close(); } }