消息隊列解決了什麼問題java
# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq
# docker exec -it 容器號 /bin/bash
# rabbitmqctl add_user [user_name] [pwd]
# rabbitmqctl list_users
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*" # rabbitmqctl list_permissions -p /
# rabbitmqctl set_user_tags asdf administrator
# rabbitmqctl delete_user guest
# rabbitmq-plugins enable rabbitmq_management
# http://IP:15672
Java操做RabbitMQweb
<dependencies> <!-- 引入隊列依賴 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies>
public class connectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址[運行rabbitMQ的地址] factory.setHost("192.168.168.130"); //AMQ的端口號 factory.setPort(5672); //vHost factory.setVirtualHost("lgz"); factory.setUsername("lgz"); factory.setPassword("pwd123456"); return factory.newConnection(); } }
//測試發送信息 public class ProducerSend { private static final String QUEUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("cs"); //獲取連接 Connection connection = connectionUtil.getConnection(); //從連接中獲取通道 Channel channel = connection.createChannel(); //隊列聲明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); String msg="hello simple"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("test success"); channel.close(); connection.close(); } }
public class comsumerGain { private static final String QUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連接 Connection connection = connectionUtil.getConnection(); //建立通道 Channel channel = connection.createChannel(); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { //獲取到達的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body, "utf-8"); System.out.println(msg); } }; //監聽隊列 channel.basicConsume(QUE_NAME,true,consumer); } }
public class WorkQueue { public static final String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //建立鏈接 Connection connection = connectionUtil.getConnection(); //獲取channel連接 Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i <50 ; i++) { String msg="No"+i; System.out.println(msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); } }
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連接 Connection connection = connectionUtil.getConnection(); //獲取channel Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定義一個消費者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //消息觸發 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("結束"); } } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); }
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連接 Connection connection = connectionUtil.getConnection(); //獲取channel Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定義一個消費者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //消息觸發 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("結束"); } } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
public class WorkQueue { public static final String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //建立鏈接 Connection connection = connectionUtil.getConnection(); //獲取channel連接 Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 每一個消費者發送確認消息以前,消息隊列不發送下一個消息到消費者。【一次只處理一個消息】 * */ int prefetchCount=1; channel.basicQos(prefetchCount); for (int i = 0; i <50 ; i++) { String msg="No"+i; System.out.println(msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); } }
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連接 Connection connection = connectionUtil.getConnection(); //獲取channel final Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); //一次只分發一個 //定義一個消費者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //消息觸發 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("結束"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
public class WorkReceive { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連接 Connection connection = connectionUtil.getConnection(); //獲取channel final Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //保證每次只分發一個 channel.basicQos(1); //定義一個消費者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //消息觸發 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("【start】:"); //手動回執一個消息 channel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
若是這種狀況下,殺死正在執行的消費者,就會形成正在處理的信息丟失。算法
//聲明隊列 channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null); //durable:持久化,
將程序中的durable的false改稱爲true,也是不能夠的。由於定義的QUEUE_NAME表明這個queue是未持久化的,rabbitmq不許從新定義一個已存在的隊列spring
public class Send { public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發 String msg="hello_exchange"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("send:"+msg); channel.close(); connection.close(); } }
public class Receive1 { public static final String QUEUE_NAME="test_queue_exchange"; public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1);//保證每次只能分發一個 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); } }
public class Receive2{ public static final String QUEUE_NAME="email_queue_exchange"; public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1);//保證每次只能分發一個 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive2"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok2]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); } }
一方面接收生產者的消息,另外一方面向隊列發送消息sql
Fanout(不處理路由鍵)docker
Direct(處理路由鍵)api
public class Send { public static final String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String msg=new String("Hello direct"); String routingKey="warning"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("send"+msg); channel.close(); connection.close(); } }
public class Receive1 { public static final String EXCHANGE_NAME="test_exchange_direct"; public static final String QUEUE_NAME="queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); //定義一個消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[done]"+msg); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false;//自動應答false channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
public class Receive2 { public static final String EXCHANGE_NAME="test_exchange_direct"; public static final String QUEUE_NAME="queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); //定義一個消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[done]"+msg); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false;//自動應答false channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
相似於sql中的模糊查詢,springboot
將路由鍵routing key和某個模式盤匹配bash
public class Send { private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String msg="商品"; channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes()); System.out.println("---send"+msg); channel.close(); connection.close(); } }
public class Receive1 { public static final String QUEUE_NAME="test_queue_topic_1"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add"); channel.basicQos(1);//保證每次只能分發一個 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); } }
public class Receive2 { public static final String QUEUE_NAME="test_queue_topic_1=2"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#"); channel.basicQos(1);//保證每次只能分發一個 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); } }
在MQ中能夠經過持久化數據解決rabbitmq服務器異常的數據丟失問題服務器
生產者將消息發送出去之後,如何知道消息到底有沒有到達rabbitmq服務器?
txSelect :用戶將當前channel設置成transaction模式
txCommit:用於提交事務
txRollback:回滾事務
public class TxSend { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String msg="hello autoCommit"; channel.queueDeclare(QUEUE_NAME,false,false,false,null); try { channel.txSelect(); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); channel.txCommit(); int i=1/0; System.out.println(msg); } catch (IOException e) { channel.txRollback(); System.out.println("error"); e.printStackTrace(); } channel.close(); connection.close(); } }
public class TxReceive1 { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("receive"+new String(body,"utf-8")); } }); } }
public class Send1 { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //設置爲confirm模式 channel.confirmSelect(); String msg="confirm_text"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); if (!channel.waitForConfirms()){ System.out.println("send message failed"); }else { System.out.println("success"); } channel.close(); connection.close(); } }
public class SendMore { private static final String QUEUE_NAME="confirm_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //設置爲confirm模式 channel.confirmSelect(); String msg="confirm_text"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); //主要進行遍歷發送,串行的【發送完以後在進行確認】 for (int i = 0; i <10 ; i++) { channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } if (!channel.waitForConfirms()){ System.out.println("send message failed"); }else { System.out.println("success"); } channel.close(); connection.close(); } }
//綁定channel與消息隊列 //參數一:隊列名稱【若是不存在該隊列,則自動建立】 //參數二:durable【是否要進行持久化】 //參數三:exclusive【是否獨佔隊列】 //參數四:是否在消費完成以後是否要當即刪除隊列【true:自動刪除】【false:不自動刪除】 //參數五:額外參數 channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args); channel.queueDeclare("helloWorld",false,false,false,null);
//發佈消息 /* * 參數1:【exchange】交換機名稱 * 參數2:【routingKey】隊列名稱 * 參數3:【props】傳遞消息的額外設置 * 參數4:傳遞消息的具體內容【byte類型】 * * */ channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());
//消費信息 /* * 參數一:隊列名稱 * 參數二:開啓消息的自動確認機制 * 參數三:消費時的回調接口 * */ channel.basicConsume("helloWorld",true,consumer);
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.6.RELEASE</version> </dependency>
spring: application: name: rabbit-springboot rabbitmq: host: 192.168.168.130 port: 5672 virtual-host: / username: lgz password: pwd123456
@Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("hello","helloWorld"); System.out.println(1); }
@SpringBootTest class BootRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("hello","helloWorld"); System.out.println(1); } @Test public void testWork(){ for (int i = 0; i <5 ; i++) { rabbitTemplate.convertAndSend("work","work模型"); } } @Test public void testFanout(){ rabbitTemplate.convertAndSend("logs","","Fanout model"); } @Test public void testRouting(){ rabbitTemplate.convertAndSend("directs","info","info_key_routing_information"); } @Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save","user.save exchange"); } }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name = "topics"), key = {"product.save","product.*"} ) }) public void receive(String msg){ System.out.println("consumer1"+msg); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name = "topics"), key = {"user.save","user.*"} ) }) public void receive2(String msg){ System.out.println("consumer2"+msg); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "directs",type = "direct"), key = {"info","error"} ) }) public void receive1(String msg){ System.out.println(msg); }
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive(String message){ System.out.println("message:["+message+"]"); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, //建立臨時隊列 exchange =@Exchange(value = "logs", type = "fanout") //綁定的交換機 ) }) public void receive1(String msg){ System.out.println("["+msg+"]"); }
@Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class Hello { @RabbitHandler public void receive1(String message){ System.out.println("message"+message); } }