新建一個maven項目,在pom.xml文件加入如下依賴html
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>新建一個P1類
package com.rabbitMQ.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @create 2019/11/20-11:23 */ public class P1 { public static void main(String[] args) throws IOException, TimeoutException { //消息隊列名字 String queueName="queue"; //實例鏈接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置地址 connectionFactory.setHost("192.168.128.233"); //設置端口 connectionFactory.setPort(5672); //設置用戶名 connectionFactory.setUsername("mowen"); //設置密碼 connectionFactory.setPassword("123456"); //獲取鏈接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //聲明隊列。 //參數1:隊列名 //參數2:持久化 (true表示是,隊列將在服務器重啓時依舊存在) //參數3:獨佔隊列(建立者能夠使用的私有隊列,斷開後自動刪除) //參數4:當全部消費者客戶端鏈接斷開時是否自動刪除隊列 //參數5:隊列的其餘參數 channel.queueDeclare(queueName,true,false,false,null); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本發佈消息 // 第一個參數爲交換機名稱、 // 第二個參數爲隊列映射的路由key、 // 第三個參數爲消息的其餘屬性、 // 第四個參數爲發送信息的主體 channel.basicPublish("",queueName,null,msg.getBytes()); } channel.close(); connection.close(); } }
運行後再瀏覽器進入RabbitMQ的控制檯,切換到queue看到java
新建一個C1類瀏覽器
package com.rabbitMQ.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @create 2019/11/20-13:12 */ public class C1 { public static void main(String[] args) throws IOException, TimeoutException { //消息隊列名字 String queueName="queue"; //實例鏈接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置地址 connectionFactory.setHost("192.168.128.233"); //設置端口 connectionFactory.setPort(5672); //設置用戶名 connectionFactory.setUsername("mowen"); //設置密碼 connectionFactory.setPassword("123456"); //獲取鏈接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); // 建立一個消費者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消費收到消息的時候調用的回調 System.out.println("C3接收到:" + new String(body)); } }; //把消費着綁定到指定隊列 //第一個是隊列名 //第二個是 是否自動確認 //第三個是消費者 channel.basicConsume(queueName,true,consumer); } }
運行後輸出爲服務器
消費者通常都不會關閉,會一直等待隊列消息,能夠手動關閉程序。maven
channel.basicConsume(queueName,true,consumer);中的true爲收到消息後自動確認,改成false取消自動確認。ide
在handleDelivery方法最後面用性能
// 手動確認
// 確認收到消息
channel.basicAck(envelope.getDeliveryTag(),false);
來收到手動確認消息。消費者能夠有多個而且能夠同時消費一個隊列;ui
當有多個消費者同時消費同一個隊列時,收到的消息是平均分配的(消費者沒收到以前已經確認每一個消費者受到的消息),3d
但當其中一個消費者性能差的話,會影響其餘的消費者,由於還要等它收完消息,這樣會拖累其餘消費者。code
能夠設置channel 的basicQos方法
//設置最多接受消息數量 // 設置了這個參數以後要吧自動確認關掉 channel.basicQos(1);
扇形交換機是基本的交換機類型,會把收到的消息以廣播的形式發送到綁定的隊列裏,由於不須要通過條件篩選,因此它的速度最快。
在生產者項目新建一個fanout類
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class fanout { public static void main(String[] args) throws IOException, TimeoutException { //交換機名字 String exchangeName="fanout"; //交換機名字類型 String exchangeType="fanout"; //消息隊列名字 String queueName1="fanout.queue1"; String queueName2="fanout.queue2"; String queueName3="fanout.queue3"; //實例鏈接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置地址 connectionFactory.setHost("192.168.128.233"); //設置端口 connectionFactory.setPort(5672); //設置用戶名 connectionFactory.setUsername("mowen"); //設置密碼 connectionFactory.setPassword("123456"); //獲取鏈接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //聲明隊列。 //參數1:隊列名 //參數2:持久化 (true表示是,隊列將在服務器重啓時依舊存在) //參數3:獨佔隊列(建立者能夠使用的私有隊列,斷開後自動刪除) //參數4:當全部消費者客戶端鏈接斷開時是否自動刪除隊列 //參數5:隊列的其餘參數 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //聲明交換機 channel.exchangeDeclare(exchangeName,exchangeType); //隊列綁定到交換機 channel.queueBind(queueName1,exchangeName,""); channel.queueBind(queueName2,exchangeName,""); channel.queueBind(queueName3,exchangeName,""); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本發佈消息 // 第一個參數爲交換機名稱、 // 第二個參數爲隊列映射的路由key、 // 第三個參數爲消息的其餘屬性、 // 第四個參數爲發送信息的主體 channel.basicPublish(exchangeName,"",null,msg.getBytes()); } channel.close(); connection.close(); } }
運行後在RabbitMQ網頁管理後臺的queue會看到
切換到Exchanges會看到一個
就是咱們聲明的交換機,點擊會看到咱們綁定的隊列
直連交換機會帶路由功能,隊列經過routing_key與直連交換機綁定,發送消息須要指定routing_key,交換機收到消息時,交換機會根據routing_key發送到指定隊列裏,一樣的routing_key能夠支持多個隊列。
在生產者項目新建direct類
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class direct { public static void main(String[] args) throws IOException, TimeoutException { String exchangeName="direct"; String exchangeType="direct"; //消息隊列名字 String queueName1="direct.queue1"; String queueName2="direct.queue2"; String queueName3="direct.queue3"; //實例鏈接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置地址 connectionFactory.setHost("192.168.128.233"); //設置端口 connectionFactory.setPort(5672); //設置用戶名 connectionFactory.setUsername("mowen"); //設置密碼 connectionFactory.setPassword("123456"); //獲取鏈接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //聲明隊列。 //參數1:隊列名 //參數2:持久化 (true表示是,隊列將在服務器重啓時依舊存在) //參數3:獨佔隊列(建立者能夠使用的私有隊列,斷開後自動刪除) //參數4:當全部消費者客戶端鏈接斷開時是否自動刪除隊列 //參數5:隊列的其餘參數 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //聲明交換機 channel.exchangeDeclare(exchangeName,exchangeType); //隊列綁定到交換機並指定rouing_key channel.queueBind(queueName1,exchangeName,"key1"); channel.queueBind(queueName2,exchangeName,"key2"); channel.queueBind(queueName3,exchangeName,"key1"); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本發佈消息 // 第一個參數爲交換機名稱、 // 第二個參數爲隊列映射的路由key、 // 第三個參數爲消息的其餘屬性、 // 第四個參數爲發送信息的主體 channel.basicPublish(exchangeName,"key1",null,msg.getBytes()); } channel.close(); connection.close(); } }
運行後到後臺的queue會看到
切換到Exchanges會看到
點擊進去
主題交換機的routing_key能夠有必定的規則,交換機和隊列的routing_key須要採用*.#.*…..的格式
每一個部分用.分開
*表明一個單詞(不是字符)
#表明任意數量(0或n個)單詞
在生產者項目新進topic類
package com.rabbitMQ.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author mowen * @date 2019/11/20-11:23 */ public class topic { public static void main(String[] args) throws IOException, TimeoutException { String exchangeName="topic"; String exchangeType="topic"; //消息隊列名字 String queueName1="topic.queue1"; String queueName2="topic.queue2"; String queueName3="topic.queue3"; //實例鏈接工廠 ConnectionFactory connectionFactory=new ConnectionFactory(); //設置地址 connectionFactory.setHost("192.168.128.233"); //設置端口 connectionFactory.setPort(5672); //設置用戶名 connectionFactory.setUsername("mowen"); //設置密碼 connectionFactory.setPassword("123456"); //獲取鏈接(跟jdbc很像) Connection connection = connectionFactory.newConnection(); //建立通道 Channel channel = connection.createChannel(); //聲明隊列。 //參數1:隊列名 //參數2:持久化 (true表示是,隊列將在服務器重啓時依舊存在) //參數3:獨佔隊列(建立者能夠使用的私有隊列,斷開後自動刪除) //參數4:當全部消費者客戶端鏈接斷開時是否自動刪除隊列 //參數5:隊列的其餘參數 channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); channel.queueDeclare(queueName3,true,false,false,null); //聲明交換機 channel.exchangeDeclare(exchangeName,exchangeType); //隊列綁定到交換機並指定rouing_key channel.queueBind(queueName1,exchangeName,"com.aaa.*"); channel.queueBind(queueName2,exchangeName,"com.*.topic"); channel.queueBind(queueName3,exchangeName,"com.bbb.*"); for (int i = 0; i < 10; i++) { String msg="msg"+i; // 基本發佈消息 // 第一個參數爲交換機名稱、 // 第二個參數爲隊列映射的路由key、 // 第三個參數爲消息的其餘屬性、 // 第四個參數爲發送信息的主體 channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes()); } channel.close(); connection.close(); } }
運行後,到後臺queue會看到
切換到Exchanges會看到
點擊進入會看到