最近由於工做緣由使用到RabbitMQ,以前也接觸過其餘的mq消息中間件,從實際使用感受來看,卻不太同樣,正好趁着週末,能夠好好看一下RabbitMQ的相關知識點;但願能夠經過一些學習,能夠搞清楚如下幾點java
<!-- more -->git
相關博文,歡迎查看:github
在開始以前,先得搭建基本的環境,由於我的主要是mac進行的開發,全部寫了一篇mac上如何安裝rabbitmq的教程,能夠經過 《mac下安裝和測試rabbitmq》 查看服務器
下面簡單說一下Linux系統下,能夠如何安裝ide
Centos 系統:學習
# 安裝erlang rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm yum install erlang # 安裝RabbitMQ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
啓動和查看的命令測試
# 完成後啓動服務: service rabbitmq-server start # 能夠查看服務狀態: service rabbitmq-server status
rabbitmq-plugins enable rabbitmq_management
, 默認的端口號爲15672直接使用amqp-client客戶端作基本的數據讀寫,先不考慮Spring容器的場景,咱們能夠怎樣進行塞數據,而後又怎樣能夠從裏面獲取數據;ui
在實際使用以前,有必要了解一下RabbitMQ的幾個基本概念,即什麼是Queue,Exchange,Binding,關於這些基本概念,能夠參考博文:3d
首先是創建鏈接,通常須要設置服務器的IP,端口號,用戶名密碼之類的,公共代碼以下code
public class RabbitUtil { public static ConnectionFactory getConnectionFactory() { //建立鏈接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); return factory; } }
要使用,基本的就須要一個消息投遞和一個消息消費兩方,線看消息生產者的通常寫法
public class MsgProducer { public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立鏈接 Connection connection = factory.newConnection(); //建立消息通道 Channel channel = connection.createChannel(); // 聲明exchange中的消息爲可持久化,不自動刪除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 發佈消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); channel.close(); connection.close(); } }
針對上面的代碼,結合RabbitMQ的基本概念進行分析
疑問:
結合上面的代碼和分析,大膽的預測下消費者的流程
下面給出一個mq推數據的消費過程
public class MsgConsumer { public static void consumerMsg(String exchange, String queue, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //建立鏈接 Connection connection = factory.newConnection(); //建立消息信道 final Channel channel = connection.createChannel(); //消息隊列 channel.queueDeclare(queue, true, false, false, null); //綁定隊列到交換機 channel.queueBind(queue, exchange, routingKey); System.out.println("[*] Waiting for message. To exist press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 取消自動ack channel.basicConsume(queue, false, consumer); } }
直接在前面的基礎上進行測試,咱們定義一個新的exchange名爲direct.exchange
,而且制定ExchangeType爲直接路由方式 (先無論這種寫法的合理性)
public class DirectProducer { private static final String EXCHANGE_NAME = "direct.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { DirectProducer directProducer = new DirectProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
上面的代碼執行一遍以後,看控制檯會發現新增了一個Exchange
一樣的咱們寫一下對應的消費者,一個用來消費aaa,一個消費bbb
public class DirectConsumer { private static final String exchangeName = "direct.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(exchangeName, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{"aaa", "bbb"}; String[] queueNames = new String[]{"qa", "qb"}; for (int i = 0; i < 2; i++) { consumer.msgConsumer(queueNames[i], routingKey[i]); } Thread.sleep(1000 * 60 * 10); } }
執行上面的代碼以後,就會多兩個Queue,且增長了Exchange到Queue的綁定
當上面兩個代碼配合起來使用時,就能夠看到對於消費者而言,qa一直消費的是偶數,qb一直消費的是奇數,一次輸出以下:
[qa] Waiting for message. To exist press CTRL+C [qb] Waiting for message. To exist press CTRL+C [qa] Received 'hello >>> 0 [qb] Received 'hello >>> 1 [qa] Received 'hello >>> 2 [qb] Received 'hello >>> 3 [qa] Received 'hello >>> 4 ...
有了上面的case以後,這個的實現和測試就比較簡單了
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String[] routingKey = new String[]{"aaa", "bbb"}; String msg = "hello >>> "; for (int i = 0; i < 30; i++) { directProducer.publishMsg(routingKey[i % 2], msg + i); } System.out.println("----over-------"); } }
這個的輸出就比較有意思了,fa,fb兩個隊列均可以接收到發佈的消息,並且單獨的執行一次上面的投遞數據以後,發現fa/fb兩個隊列的數據都是30條
而後消費的結果以下
[qa] Waiting for message. To exist press CTRL+C [qb] Waiting for message. To exist press CTRL+C [qa] Received 'hello >>> 0 [qb] Received 'hello >>> 0 [qa] Received 'hello >>> 1 [qb] Received 'hello >>> 1 [qb] Received 'hello >>> 2 [qa] Received 'hello >>> 2 [qa] Received 'hello >>> 3 [qb] Received 'hello >>> 3 [qb] Received 'hello >>> 4 [qa] Received 'hello >>> 4 ...
代碼和上面差很少,就不重複拷貝了,接下來卡另外幾個問題
在上面的基礎使用中,會有幾個疑問以下:
以上內容,留待下一篇進行講解
一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激