【RabbitMq】是一個【AMQP】協議的實現。服務端使用的是Erlang語言進行編寫,那也就是說,咱們要運行它,就要安裝相關Erlang環境。前面說了AMQP最初是爲了解決金融行業的可用性問題,因此Rabbit在高可用方面表現不俗,而且在我看來他是這幾種中間件中最容易上手的一個。並且它在併發方面表現十分出色,能夠實現大概10w的吞吐量。他的特色是:【可靠性、消息集羣、高可用、插件機制(可讓它支持別的協議)、支持多語言客戶端、管理頁面 so on】本篇主要聊聊如何安裝、使用、以及關於他的一些名詞方面的闡述。run。。html
- 個人環境是CentOS7
- http://www.rabbitmq.com/which-erlang.html 頁面查看安裝rabbitmq須要安裝erlang對應的版本,前面是Rabbit的版本,後面是Erlang的對它支持的版本。這裏先後要對應下載,版本必須符合他的要求,我這裏使用的就是第一個。
- https://github.com/rabbitmq/erlang-rpm/releases 中複製對應的版本erlang下載地址
- https://github.com/rabbitmq/rabbitmq-server/tags 中複製對應的版本rabbitmq的下載地址
- 下載Erlang
- 【wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安裝Erlang
- 【sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm】
- 安裝socat
- 【sudo yum install -y socat】
- 下載RabbitMQ
- 【wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
- 安裝RabbitMQ
- 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】
到目前爲止咱們的準備工做完畢,如下是一些啓動和關閉命令。git
【中止服務】:sudo systemctl stop rabbitmq-server 【查詢狀態】:sudo systemctl status rabbitmq-server 【啓動】:sudo systemctl start rabbitmq-server 【設置開啓自啓】:sudo systemctl enable rabbitmq-server github
使用啓動命令啓動後,咱們查詢狀態發現狀態爲 dead,這是由於咱們要啓動他的插件 使用【rabbitmq-plugins list】能夠查詢全部他支持的插件,咱們這裏須要啓動服務器
【rabbitmq-plugins enable rabbitmq_management】併發
執行完成後使用【 cat /etc/rabbitmq/enabled_plugins】就能夠知道是否啓動插件成功,而後再次啓動發現啓動狀態就爲running,使用【netstat -nplt | grep 15672 】發現他的專用端口已經開啓,至此,安裝啓動完畢。這個時候就能夠對它進行訪問了(你的ip:15672),出現下面的圖,就證實搭建成功。這裏注意開放一下端口,不然別的機器沒法訪問:tcp
- sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
- sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
然而咱們用他本身的gust是沒法login in 進去的,由於這個支持在搭建的服務器自己上訪問,那咱們就要建立本身的用戶,而且賦予相應的權限。ide
- 【添加一個admin用戶】:rabbitmqctl add_user admin admin
- 【分配操做權限】:rabbitmqctl set_user_tags admin administrator
- 【分配資源權限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*
使用admin進行登陸,至此,能夠rabbitmq能夠正常使用ui
添加相關依賴spa
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency>生產一個消息插件
View Codepublic class Producer { public static void main(String[] args) { // 一、建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 二、設置鏈接屬性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、從鏈接工廠獲取鏈接 connection = factory.newConnection(); // 四、從連接中建立通道 channel = connection.createChannel(); /** * 五、聲明(建立)隊列 * 若是隊列不存在,纔會建立 * RabbitMQ 不容許聲明兩個隊列名相同,屬性不一樣的隊列,不然會報錯 * * queueDeclare參數說明: * @param queue 隊列名稱 * @param durable 隊列是否持久化 * @param exclusive 是否排他,便是否爲私有的,若是爲true,會對當前隊列加鎖,其它通道不能訪問,而且在鏈接關閉時會自動刪除,不受持久化和自動刪除的屬性控制 * @param autoDelete 是否自動刪除,當最後一個消費者斷開鏈接以後是否自動刪除 * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中全部消息的生命週期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息內容 String message = "Hello World!"; // 六、發送消息 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息已發送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 七、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 八、關閉鏈接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
生產後你去管理頁面查詢,會發現一個消息還未讀取。
消費一個消息(消費後再次查詢,發現ready中沒有東西了)
View Codepublic class Consumer { public static void main(String[] args) { // 一、建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 二、設置鏈接屬性 factory.setHost("你的ip"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、從鏈接工廠獲取鏈接 connection = factory.newConnection("消費者"); // 四、從連接中建立通道 channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); // 六、定義收到消息後的回調 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 七、監聽隊列 channel.basicConsume("queue1", true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("開始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 八、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 九、關閉鏈接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }至此,簡單使用結束。
使用客戶端:這裏首先建立兩個隊列,而後在交換機上模擬發送消息,以topic類型的交換機爲例,他會進行routing key的匹配,在發送消息的時候,把你的routing key 攜帶,便可匹配。
一個topic類型的交換機的例子
View Code/** * Topic--生產者 * * 生產者將消息發送到topic類型的交換器上,和routing的用法相似,都是經過routingKey路由,但topic類型交換器的routingKey支持通配符 */ public class Producer { public static void main(String[] args) { // 一、建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 二、設置鏈接屬性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 三、從鏈接工廠獲取鏈接 connection = factory.newConnection("生產者"); // 四、從連接中建立通道 channel = connection.createChannel(); // 路由關係以下:com.# --> queue-1 *.order.* ---> queue-2 // 消息內容 String message = "Hello A"; // 發送消息到topic_test交換器上 channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已發送!"); // 消息內容 message = "Hello B"; // 發送消息到topic_test交換器上 channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes()); System.out.println("消息 " + message + " 已發送!"); // 消息內容 message = "Hello C"; // 發送消息到topic_test交換器上 channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已發送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 七、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 八、關閉鏈接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } /** * 路由--消費者 * * 消費者經過一個臨時隊列和交換器綁定,接收發送到交換器上的消息 */ public class Consumer { private static Runnable receive = () -> { // 一、建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 二、設置鏈接屬性 factory.setHost("你的ip"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; final String queueName = Thread.currentThread().getName(); try { // 三、從鏈接工廠獲取鏈接 connection = factory.newConnection("消費者"); // 四、從連接中建立通道 channel = connection.createChannel(); // 定義消息接收回調對象 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 監聽隊列 channel.basicConsume(queueName, true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println(queueName + " 開始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 八、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 九、關閉鏈接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }; public static void main(String[] args) { new Thread(receive, "queue1").start(); new Thread(receive, "queue-2").start(); } }
Rabbit名詞介紹
Blocker:一個rabbit服務器就是一個Blocker
虛擬主機(virtual host):一個Blocker中能夠有多個虛擬機,每一個虛擬機相似於一個工做空間,每一個虛擬主機中的消息和其餘虛擬主機的消息不相互影響
connection:消費者和rabbit中間的鏈接,有了這個鏈接,雙方纔能通訊。
RoutingKey:消息被髮給交換機的時候,會攜帶它,這個是用來指定消息的路由規則(能夠爲空)
channel(信道):是在connection上創建的管道,一個connection上能夠創建多個channel,消息經過他們進行傳遞。
BindingKey:Exchange和Queue綁定的關係,Exchange接收到的消息會帶有RoutingKey這個字段。
交換機(exchanger):當rabbit接收到消息後,交換機對這些消息進行轉換,他的類型決定哪一個隊列中應該擁有這些消息,
交換機類型:
- 【direct】:當發送消息的時候,咱們會在消息體上攜帶一個路由鍵【routekey】,若是消息體上你的路由鍵和隊列匹配則發送給對應的隊列。
- 【fanout 】:發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。
- 【headers】:根據發送的消息內容中的【headers】屬性進行匹配,當消息發送到RabbitMQ時會取到該消息的【headers】與Exchange綁定時指定的鍵值對進行匹配,若是匹配到,則對應隊列能夠接受到消息。
- 【topic】:將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」會匹配一個或多個詞,好比【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要隊列能夠匹配到,就能夠接受消息
隊列(queue):rabbit接收到的信息存儲在這裏,消費者也是從這裏獲取的消息。
binder: 隊列和交換機之間的綁定
AMQP(advanced message queuing protocol):
他是應用層協議的一個開放標準,爲面向消息的中間件協議。他分爲三層:
【底層協議層】:主要傳輸二進制數據流,
【中間層】:將客戶端的命令轉發給服務器,而後將服務器的回覆轉給客戶端。【將最高層的用戶層傳遞的信息轉化爲二進制,傳遞給底層。把底層的信息轉化爲客戶端能夠知道的語言。】
【最高層】:提供用戶調用的命令。
流轉流程
生產者:創建鏈接->開啓通道->發送消息->關閉資源
消費者:創建鏈接->開啓通道->接受消息->發送確認消息(告訴rabbit,rabbit修改消息狀態爲已經讀 and so on)->釋放資源