public class Provider { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立連接工廠對象 ConnectionFactory factory = new ConnectionFactory(); // 2.設置RabbitMQ服務主機地址,默認localhost factory.setHost("localhost"); // 3.設置RabbitMQ服務端口,默認5672 factory.setPort(5672); // 4.設置虛擬主機名字,默認/ factory.setVirtualHost("/demo1"); // 5.設置用戶鏈接名,默認guest // factory.setUsername("guest"); // 6.設置連接密碼,默認guest // factory.setPassword("guest"); // 7.建立一個新連接 Connection connection = factory.newConnection(); // 8.建立消息通道 Channel channel = connection.createChannel(); // 9.建立隊列 channel.queueDeclare("simple_queue",true,false,false,null); // 10.建立消息 String msg="simple queue demo"; // 11.消息發送 channel.basicPublish("","simple_queue",null,msg.getBytes()); // 12.關閉資源 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立連接工廠對象 ConnectionFactory factory = new ConnectionFactory(); // 2.設置RabbitMQ服務主機地址,默認localhost factory.setHost("localhost"); // 3.設置RabbitMQ服務端口,默認5672 factory.setPort(5672); // 4.設置虛擬主機名字,默認/ factory.setVirtualHost("/demo1"); // 5.設置用戶鏈接名,默認guest // 6.設置連接密碼,默認guest // 7.建立一個新連接 Connection connection = factory.newConnection(); // 8.建立消息通道 Channel channel = connection.createChannel(); // 9.建立隊列 channel.queueDeclare("simple_queue",true,false,false,null); // 10.建立消費者,並設置消息處理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由 String routingKey = envelope.getRoutingKey(); // 交換機 String exchange = envelope.getExchange(); // 消息id long deliveryTag = envelope.getDeliveryTag(); // 消息體 String message = new String(body, "UTF-8"); System.out.println("路由:" + routingKey + ",交換機:" + exchange + ",消息id:" + deliveryTag + ",消息體:" + message); super.handleDelivery(consumerTag, envelope, properties, body); } }; // 11.消息監聽 channel.basicConsume("simple_queue", true, consumer); // 12.關閉資源(不建議關閉,建議一直監聽消息) } }
// 與上面同樣,不過就是建立多個消息消費者去監聽同一個消息隊列,消息分配爲輪詢式
此模式下呢多出一個概念exchange(交換機)能夠在交換機上綁定多個消息隊列,而如今消息產生者將消息發送到交換機,由交換機調度給他下面的消息隊列,此模式下交換機將會將消息發給全部與他綁定的隊列java
// 9.建立隊列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交換機:arg0,交換機名稱 arg1,交換機類型(廣播) channel.exchangeDeclare("demo3Exchange", BuiltinExchangeType.FANOUT); // 將隊列綁定到交換機 channel.queueBind("simple_queue1","demo3Exchange",""); channel.queueBind("simple_queue2","demo3Exchange",""); // 10.建立消息 ....
class Consumer1 ... // 11.監聽 消息隊列simple_queue1 channel.basicConsume("simple_queue1", true, consumer); ... class Consumer2 ... // 11.監聽 消息隊列simple_queue2 channel.basicConsume("simple_queue2", true, consumer); ...
此模式下多出概念路由,基於上一模式,上一模式交換機會將接收到的消息發給全部綁定了的隊列,此模式下接收到的消息會多一個參數Routing,會將消息轉發到對應Routing的隊列api
// 9.建立隊列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交換機:arg0,交換機名稱 arg1,交換機類型(路由對應) channel.exchangeDeclare("demo4Exchange", BuiltinExchangeType.DIRECT); // 將隊列綁定到交換機 // simple_queue1只會接收到routingKey 爲error的消息 channel.queueBind("simple_queue1","demo4Exchange","error"); // simple_queue2會接收到routingKey 爲info,warning,error的消息 channel.queueBind("simple_queue2","demo4Exchange","info"); channel.queueBind("simple_queue2","demo4Exchange","warning"); channel.queueBind("simple_queue2","demo4Exchange","error"); // 10.建立消息 // 11.消息發送 for (int i = 0; i < 100; i++) { // 建立消息 String message = "routing_key:" + i; String routingKey = ""; if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8 routingKey = "error"; }else if (i%5 == 0){ // routing_key_queue2:5 routingKey = "info"; }else { // 0、一、5 routingKey = "warning"; } message += "--->" + routingKey; // 消息發送 channel.basicPublish("demo4Exchange", routingKey, null, message.getBytes()); }
// 同上 消息消費者只須要監聽對應的消息隊列便可
此模式下在上面的基礎上將routingKey改成可使用通配符的模式
通配符規則:
多個單詞之間以」.」分割
‘#’:匹配一個或多個詞
:匹配很少很多剛好1個詞
舉例:
item.#:可以匹配item.insert.abc 或者 item.insert
item.:只能匹配item.insertide
// 9.建立隊列 channel.queueDeclare("simple_queue1",true,false,false,null); channel.queueDeclare("simple_queue2",true,false,false,null); // 建立交換機:arg0,交換機名稱 arg1,交換機類型(主題) channel.exchangeDeclare("demo5Exchange", BuiltinExchangeType.TOPIC); // 將隊列綁定到交換機 channel.queueBind("simple_queue1","demo5Exchange","#"); channel.queueBind("simple_queue2","demo5Exchange","www.#"); channel.queueBind("simple_queue2","demo5Exchange","*.com"); // 10.建立消息 // 11.消息發送 for (int i = 0; i < 100; i++) { // 建立消息 String message = "routing_key:" + i; String routingKey = ""; if (i%2 == 0){ // routing_key_queue一、routing_key_queue2 0、二、四、六、8 routingKey = "www.baidu.com"; }else if (i%5 == 0){ // routing_key_queue2:5 routingKey = "jd.com"; }else { // 0、一、5 routingKey = "dqdwwfwevweevwe21e13r23dr2gerfdqw.dqefw122e.23f.2f.23.f.2.f2.f.24.ff2wf2qef.2"; } message += "--->" + routingKey; // 消息發送 channel.basicPublish("demo5Exchange", routingKey, null, message.getBytes()); }
// 同上 消息消費者只須要監聽對應的消息隊列便可