rabbitMQ是由erlang語言開發的,基於AMQP協議實現的消息隊列。他是一種應用程序之間的通訊方法,在分佈式系統開發中應用很是普遍。html
rabbitMq的有點:java
AMQP(advanced Message Queuing Protocol),是一個提供統一消息服務的應用標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端中間件的產品不一樣和開發語言不一樣的限制。JMS和AMQP的區別在於:JMS是java語言專屬的消息服務標準,他是在api層定義標準,而且只能用於java應用,而AMQP是在協議層定義的標準,是能夠跨語言的。web
發送消息:spring
接受消息:api
若是不想本身下載,須要我這裏的軟件的,能夠在下面評論郵箱,我私發給你。數組
1.安裝erlang的環境,雙擊otp的運行程序,而後一路點擊下一步(next)。瀏覽器
配置環境變量併發
在path中添加erlang的路徑分佈式
2.安裝rabbitMq,雙擊rabbitmq的運行程序ide
安裝完成以後在菜單頁面能夠看到
安裝完RabbitMQ若是想要訪問管理頁面須要在rabbitmq的sbin目錄中使用cmd執行:rabbitmq-plugins.bat enable rabbitmq_management(管理員身份運行此命令)添加可視化插件。
點擊上圖中的start/stop來開啓/中止服務。而後在瀏覽器上輸入地址查看,rabbitMq的默認端口是15672。默認的用戶名和密碼都是guest
若是安裝失敗,須要卸載重裝的時候或者出現rabbitMq服務註冊失敗時,此時須要進入註冊表清理erlang(搜索rabbitMQ,erlsrv將對應的項刪除)
1.添加依賴
<!--添加rabbitMq的依賴--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
2.生產者代碼實現
package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.net.ConnectException; import java.util.concurrent.TimeoutException; /** * @className: producer * @description: rabbitmq的生產者代碼實現 * @author: charon * @create: 2021-01-03 23:10 */ public class Producer { /** * 聲明隊列名 */ private static final String QUEUE = "hello charon"; public static void main(String[] args) { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE, true, false, false, null); String message = "hello charon good evening"; // 發佈消息(交換機,RoutingKey即隊列名,額外的消息屬性,消息內容) channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("發送消息給mq:" + message); } catch (Exception e) { e.printStackTrace(); }finally { // 關閉資源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
3.消費者代碼實現
package rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Consumer * @description: 消費者的代碼實現 * @author: charon * @create: 2021-01-05 08:28 */ public class Consumer { /** * 聲明隊列名 */ private static final String QUEUE = "hello charon"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE, true, false, false, null); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消費者標籤 * @param envelope 信封,能夠獲取交換機等信息 * @param properties 消息屬性 * @param body 消費內容,字節數組,能夠轉成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String exchange = envelope.getExchange(); // long deliveryTag = envelope.getDeliveryTag(); String message = new String(body,"utf-8"); System.out.println("收到的消息是:"+message); } }; // 消費消息(隊列名,是否自動確認,消費方法) channel.basicConsume(QUEUE,true,defaultConsumer); } }
生產者將消息放入到隊列中,消費者能夠有多個,同時監聽同一個隊列。如上圖,消費者c1,c2共同爭搶當前消息隊列的內容,誰先拿到誰負責消費消息,缺點是在高併發的狀況下,默認會產品一個消息被多個消費者共同使用,能夠設置一個鎖開關,保證一條消息只能被一個消費者使用。
上面的代碼,能夠再添加一個消費者,這樣就能夠實現工做隊列的工做模式。
2.Publish/Subscribe 發佈訂閱(共享資源)
X表明rabbitMq內部組件交換機,生產者將消息放入交換機,交換機發布訂閱把消息發送到全部消息隊列中,對應的消費者拿到消息進行消費,對比工做隊列而言,發佈訂閱能夠實現工做隊列的功能,可是比工做隊列更強大。
特色:
1.每一個消費者監聽本身的隊列
2.生產者將消息發送給Broker,由交換機將消息轉發到綁定的此交換機的每一個隊列,每一個綁定交換機的隊列都將接收到消息;
生產者:
package rabbitmq.publish; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Producer * @description: 發佈訂閱的生產者 * @author: charon * @create: 2021-01-07 22:02 */ public class Producer { /**郵件的隊列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的隊列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交換機*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); // 交換機(交換機名稱,交換機類型(fanout:發佈訂閱,direct:routing,topic:主題,headers:header模式)) channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 綁定交換機(隊列名稱,交換機名稱,routingKey(發佈訂閱設置爲空)) channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); // 發送多條消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by publish"; // 指定交換機(交換機,RoutingKey即隊列名,額外的消息屬性,消息內容) channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes()); System.out.println("發送消息給mq:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { // 關閉資源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消費email的消費者:
package rabbitmq.publish; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 郵件的消息消費者 * @author: charon * @create: 2021-01-07 22:14 */ public class EmailConsumer { /**郵件的隊列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的隊列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交換機*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消費者標籤 * @param envelope 信封,能夠獲取交換機等信息 * @param properties 消息屬性 * @param body 消費內容,字節數組,能夠轉成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String exchange = envelope.getExchange(); // long deliveryTag = envelope.getDeliveryTag(); String message = new String(body,"utf-8"); System.out.println("收到的email消息是:"+message); } }; // 消費消息(隊列名,是否自動確認,消費方法) channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
消費短信的消費者:
package rabbitmq.publish; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: SmsConsumer * @description: * @author: charon * @create: 2021-01-07 22:17 */ public class SmsConsumer { /**郵件的隊列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的隊列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交換機*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消費者標籤 * @param envelope 信封,能夠獲取交換機等信息 * @param properties 消息屬性 * @param body 消費內容,字節數組,能夠轉成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的短信消息是:"+message); } }; // 消費消息(隊列名,是否自動確認,消費方法) channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
3.Routing 路由模式
生產者將消息發送給交換機按照路由判斷,交換機根據路由的key,只能匹配上路由key的對應的消息隊列,對應的消費者才能消費消息。
如上圖,rabbitMq根據對應的key,將消息發送到對應的隊列中,error通知將發送到amqp.gen-S9b上,由消費者c1消費。error,info,warning通知將發送到amqp.gen-Ag1上,由消費者c2消費。
特色:
1.每一個消費者監聽本身的隊列,而且設置路由key
2.生產者將消息發送給交換機,由交換機根據路由key來轉發消息到指定的隊列
生產者:
package rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Producer * @description: 路由模式下的生成者 * @author: charon * @create: 2021-01-07 22:34 */ public class Producer { /**郵件的隊列*/ public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email"; /**短信的隊列*/ public static final String QUEUE_ROUTING_SMS = "queue_routing_sms"; /**交換機*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 設置email的路由key */ public static final String ROUTING_EMAIL = "routing_email"; /** 設置sms的路由key */ public static final String ROUTING_SMS = "routing_sms"; public static void main(String[] args) { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null); // 交換機(交換機名稱,交換機類型(fanout:發佈訂閱,direct:routing,topic:主題,headers:header模式)) channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 綁定交換機(隊列名稱,交換機名稱,routingKey) channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL); channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS); // 發送多條消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by routing --email"; // 指定交換機(交換機,RoutingKey,額外的消息屬性,消息內容) channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes()); System.out.println("發送消息給mq:" + message); } // 發送多條消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by routing --sms"; // 指定交換機(交換機,RoutingKey,額外的消息屬性,消息內容) channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes()); System.out.println("發送消息給mq:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { // 關閉資源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消費email的消費者:
package rabbitmq.routing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 路由模式下的email消費者 * @author: charon * @create: 2021-01-07 22:40 */ public class EmailConsumer { /**郵件的隊列*/ public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email"; /**交換機*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 設置email的路由key */ public static final String ROUTING_EMAIL = "routing_email"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null); channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 綁定隊列並指明路由key channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消費者標籤 * @param envelope 信封,能夠獲取交換機等信息 * @param properties 消息屬性 * @param body 消費內容,字節數組,能夠轉成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的email消息是:"+message); } }; // 消費消息(隊列名,是否自動確認,消費方法) channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer); } }
消費短信的消費者:
package rabbitmq.routing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 路由模式下的email消費者 * @author: charon * @create: 2021-01-07 22:40 */ public class SmsConsumer { /**郵件的隊列*/ public static final String QUEUE_ROUTING_SMS = "queue_routing_sms"; /**交換機*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 設置email的路由key */ public static final String ROUTING_SMS = "routing_sms"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設置ip,端口,由於是本機,因此直接設置爲127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默認爲15672,通訊端口爲5672 connectionFactory.setPort(5672); // 設置用戶名和密碼 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置虛擬ip,默認爲/,一個rabbitmq的服務能夠設置多個虛擬機,每一個虛擬機就至關於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數好比設置存活時間等) channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 綁定隊列並指明路由key channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS); // 實現消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消費者標籤 * @param envelope 信封,能夠獲取交換機等信息 * @param properties 消息屬性 * @param body 消費內容,字節數組,能夠轉成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的短信消息是:"+message); } }; // 消費消息(隊列名,是否自動確認,消費方法) channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer); } }
4.Topic 主題模式
特色:
1.每一個消費者監聽本身的隊列,而且設置帶通配符的routingkey
2.生產者將消息發送給broker,由交換機及根據路由key來轉發消息到指定的隊列
5.Header 轉發器
取消了路由key,使用header中的key/value(鍵值對)來匹配隊列。
6.RPC 遠程調用
基於direct類型交換機實現。生產者將消息遠程發送給rpc隊列,消費者監聽rpc消息隊列的消息並消息,而後將返回結果放入到響應隊列中,生產者監聽響應隊列中的消息,拿到消費者的處理結果,實現遠程RPC遠程調用。
參考文件:
https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021