1 . RabbitMQ是一個有Erlang開發的AMQP(Advanced Message Queue)的開源實現java
2 . RabbitMQ的官網:http://www.rabbitmq.comspring
3 . RabbitMQ是一款消息組件,其中必定包含生產者,消費者,消息組件。RabbitMQ中有三個重要組成部分vim
a . Exchange:交換空間瀏覽器
b . Queue:數據隊列bash
c . RoutingKey:隊列路由(若是全部的隊列的RoutingKey都同樣,則屬於廣播小,若是不同,則屬於點對點消息)ide
4 . RabbitMQ中的幾個核心概念spring-boot
a . Broker:消息隊列的服務主機測試
b . Exchange:消息交換機,用於分發消息到隊列ui
c . Queue:消息隊列的載體,每一個消息都會被投入到一個或多個隊列es5
e . Binding:將Exchange與Queue按照RoutingKey規則進行綁定
f . RoutingKey:路由Key,Exchange根據RoutingKey進行消息分發
g . Vhost:虛擬主機,一個Broker能夠有多個Vhost,用於實現用戶(權限)的分離
h . Producer:消息生產者
i . Consumer:消息消費者
j . Channel:消息通道,每一個Channel表明一個會話任務
a . 在這裏安裝Erlang時遇到的坑較多,我的不推薦下載erlang源碼進行解壓縮編譯安裝,由於依賴的庫較多(gcc,libncurses5-dev,.eg):
創建erlang目錄
mkdir -p /usr/local/erlang
進入源碼目錄
cd /user/local/src/otp_src_19.3
編譯配置
./configure --prefix=/usr/local/erlang
編譯安裝
make && make install
配置環境變量
vim /etc/profile
export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:$ERLANG_HOME/bin:
source /etc/profile
b . 本人使用apt-get安裝erlang語言環境
apt-get install erlang
或者apt-get install erlang-nox
c . 測試erlang
輸入
erl
表示進入erlang環境輸入
halt().
退出
a . 根據官網介紹進行安裝
相關命令
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
b . 後臺啓動RabbitMQrabbitmq-server start > /dev/null 2>&1 &
c . 開啓管理頁面插件rabbitmq-plugins enable rabbitmq_management
d . 添加新用戶rabbitmqctl add_user evans 123123
(建立一個用戶名爲evans,密碼爲123123的用戶)
e . 將新用戶設爲管理員rabbitmqctl set_user_tags evans administrator
f . 打開瀏覽器輸入訪問地址http://192.168.1.1:15672訪問RabbitMQ管理頁面
g . 查看RabbitMQ狀態rabbitmqctl status
,關閉RabbitMQrabbitmqctl stop
h . 設置用戶虛擬主機,不然程序沒法鏈接Queue
1 . 在管理界面中增長一個新的Queue
a . Name:隊列名稱
b . Durability:持久化選項:Durable(持久化保存),Transient(即時保存),持久化保存在RabbitMQ宕機或者重啓後,未消費的消息仍然存在,即時保存在RabbitMQ宕機或者重啓後不存在
c . Auto delete:自動刪除
2 . 引入RabbitMQ的Repository
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
3 . 消息生產者MessageProducer.java
package com.evans.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by Evans */ public class MessageProducer { //隊列名稱 private static final String QUEUE_NAME = "first"; //主機IP private static final String HOST="127.0.0.1"; //端口 private static final Integer PORT=5672; //用戶名 private static final String USERNAME="evans"; //密碼 private static final String PASSWORD="evans"; public static void main(String[] args) throws Exception { //建立工廠類 ConnectionFactory factory = new ConnectionFactory(); //設置參數 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); //建立鏈接 Connection connection =factory.newConnection(); //建立Channel Channel channel=connection.createChannel(); //聲明Queue /* * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) * 隊列名稱,是否持久保存,是否爲專用的隊列,是否容許自動刪除,配置參數 * 此處的配置與RabbitMQ管理界面的配置一致 */ channel.queueDeclare(QUEUE_NAME,true,false,true,null); Long start = System.currentTimeMillis(); for (int i=0;i<100;i++){ //發佈消息 /* * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * exchange名稱,RoutingKey,消息參數(消息頭等)(持久化時須要設置),消息體 * MessageProperties有4中針對不一樣場景能夠進行選擇 */ channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,("Message:"+i).getBytes()); } Long end = System.currentTimeMillis(); System.out.println("System cost :"+(end-start)); channel.close(); connection.close(); } }
4 . 運行MessageProduce的Main方法,在管理界面會出現詳細的監控數據,此時消息已經成功發送至RabbitMQ的隊列中
5 . 消息消費者MessageConsumer.java
package com.evans.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by Evans on 2017/7/15. */ public class MessageConsumer { //隊列名稱 private static final String QUEUE_NAME = "first"; //主機IP private static final String HOST="10.0.0.37"; //端口 private static final Integer PORT=5672; //用戶名 private static final String USERNAME="evans"; //密碼 private static final String PASSWORD="evans"; public static void main(String[] args) throws IOException, TimeoutException { //建立工廠類 ConnectionFactory factory = new ConnectionFactory(); //設置參數 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); //建立鏈接 Connection connection =factory.newConnection(); //建立Channel Channel channel=connection.createChannel(); //聲明Queue /* * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) * 隊列名稱,是否持久保存,是否爲專用的隊列,是否容許自動刪除,配置參數 * 此處的配置與RabbitMQ管理界面的配置一致 */ channel.queueDeclare(QUEUE_NAME,true,false,true,null); //這裏須要複寫handleDelivery方法進行消息自定義處理 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("Consume Get Message : "+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
6 . 運行MessageConsumer的Main方法,會進行消息消費處理,此時控制檯會輸出消費的消息,此時完成了消息的生產與消費的基本操做,當存在多個消費者的處理同一個隊列時,RabbitMQ會自動進行均衡負載處理,多個消費者共同來處理消息
Consume Get Message : Message:0 Consume Get Message : Message:1 Consume Get Message : Message:2 ... Consume Get Message : Message:99
7 . RabbitMQ虛擬主機
a . 能夠在管理界面的admin-vhost下設置多個虛擬主機
b . 在程序中經過設置factory參數進行虛擬主機的指定factory.setVirtualHost("firstHost")
8 . Exchange工做模式:topic、direct、fanout
a . 廣播模式(fanout):一條消息被全部的消費者進行處理
① .將消費者與生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "fanout")`方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange ② .此時再次運行生產者和多個消費者,則一個消息會被多個消費者進行消費處理
b . 直連模式(direct):一跳消息根據RoutingKey進行生產者與消費者的匹配,從而達到指定生產者的消息被指定消費者進行處理
① .將生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "direct")`方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange和RoutingKey("mykey") ② .將消費者中的`channel.queueDeclare()`方法替換爲
// 定義EXCHANGE的聲明String channel.exchangeDeclare(EXCHANGE_NAME, "direct") ; // 經過通道獲取一個隊列名稱 String queueName= channel.queueDeclare().getQueue() ; // 進行綁定處理 channel.queueBind(queueName, EXCHANGE_NAME, "mykey") ;
③ .此時RoutingKey做爲惟一標記,這樣就能夠將消息推送到指定的消費者進行處理
c . 主題模式(topic):一條消息被全部的消費者進行處理
① .將生產者中的`channel.queueDeclare()`方法替換爲`channel.exchangeDeclare(EXCHANGE_NAME, "topic") `方法進行Exchange的指定,channel.basicPublish()方法須要指定exchange和RoutingKey("mykey-01") ② .將消費者中的`channel.queueDeclare()`方法替換爲
// 定義EXCHANGE的聲明String channel.exchangeDeclare(EXCHANGE_NAME, "topic") ; // 經過通道獲取一個隊列名稱 String queueName= channel.queueDeclare().getQueue() ; // 進行綁定處理 channel.queueBind(queueName, EXCHANGE_NAME, "mykey-01");
③ .此時主題模式即爲廣播模式與直連模式的混合使用。
1 . 引入srping-rabbit的Repository
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency>
2 . 創建rabbitmq.properties,對RabbitMQ的屬性參數進行設置
# RabbitMQ的主機IP mq.rabbit.host=192.168.68.211 # RabbitMQ的端口 mq.rabbit.port=5672 # RabbitMQ的VHost mq.rabbit.vhost=hello # RabbitMQ的exchange名稱 mq.rabbit.exchange=spring.rabbit # 用戶名 mq.rabbit.username=evans # 密碼 mq.rabbit.password=evans
3 . 生產者XML(需增長xmlns:rabbit命名空間)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd"> <context:component-scan base-package="com.evans.rabbitmq"/> <!--定義rabbitmq配置的相關屬性文件信息--> <context:property-placeholderlocation="classpath:rabbitmq.properties"/> <!--若是要想進行RabbiMQ的操做管理,則首先必定要準備出一個鏈接工廠類--> <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/> <!--全部的鏈接工廠要求被RabbitMQ所管理--> <rabbit:adminconnection-factory="connectionFactory"/> <!--建立一個隊列信息--> <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/> <!--下面實現一個直連的操做模式--> <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true"a uto-delete="true"> <rabbit:bindings> <!--如今要求綁定到指定的隊列之中--> <rabbit:bindingqueue="myQueue" key="key01"/> </rabbit:bindings> </rabbit:direct-exchange> <!--全部整合的消息系統都會有一個模版--> <rabbit:templateid="amqpTemplate" exchange="${mq.rabbit.exchange}" connection-factory="connectionFactory"/> </beans>
4 . 消費者XML(需增長xmlns:rabbit命名空間)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd"> <!--定義rabbitmq配置的相關屬性文件信息--> <context:property-placeholderlocation="classpath:rabbitmq.properties"/> <!--若是要想進行RabbiMQ的操做管理,則首先必定要準備出一個鏈接工廠類--> <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/> <!--全部的鏈接工廠要求被RabbitMQ所管理--> <rabbit:adminconnection-factory="connectionFactory"/> <!--建立一個隊列信息--> <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/> <!--下面實現一個直連的操做模式--> <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true" auto-delete="true"> <rabbit:bindings> <!--如今要求綁定到指定的隊列之中--> <rabbit:bindingqueue="myQueue" key="key01"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定義具體的消費處理類--> <beanid="messageConsumer" class="cn.evans.rabbitmq.MessageConsumer"/> <!--啓動消費監聽程序--> <rabbit:listener-containerconnection-factory="connectionFactory"> <rabbit:listenerref="messageConsumer"queues="myQueue"/> </rabbit:listener-container> </beans>
5 . 生產者
a . 定義消息Service
package com.evans.rabbitmq; /** * Created by Evans */ public interface MessageService { /** * 發送消息 * @param message */ public void sendMessage(String message); }
b . 定義MessageService的實現類
package com.evans.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import javax.annotation.Resource; /** * Created by Evans */ public class MessageServiceImpl implements MessageService { @Resource private AmqpTemplate template; @Override public void sendMessage(String message) { template.convertAndSend("key01",message); } }
5 . 消費者
a .消費者須要實現MessageListener接口
b .消息處理類
package com.evans.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * Created by Evans */ public class MessageConsumer implements MessageListener { @Override public void onMessage(Message message) { System.out.println("Consumer Message: "+ message); } }
1 . 引入SpringBoot的RabbitMQ腳手架
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2 . 配置Application.yml
spring: rabbitmq: host: 10.0.0.37 port: 5672 username: evans password: evans
3 . 配置類
package com.evans.rabbitmq; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by Evans */ @Configuration public class RabbitConfigure { @Bean public Queue firstQueue(){ return new Queue("firstQueue"); } }
4 . 生產者
package com.evans.rabbitmq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; /** * Created by Evans */ @Component public class MessageProducer { @Resource private RabbitTemplate rabbitTemplate; public void send(){ LocalDateTime current =LocalDateTime.now(); System.out.println("Send Message : "+current); rabbitTemplate.convertAndSend("firstQueue","Send Message"+ current); } }
5 . 消費者
package com.evans.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by Evans */ @Component @RabbitListener(queues = "firstQueue") public class MessageConsumer { @RabbitHandler public void consumer(String message){ System.out.println("Consumer Message : "+message); } }
6 . FanoutExchange配置
@Configuration public class FanoutConfiguration { @Bean public Queue fanoutFirstQueue() { return new Queue("fanout.first"); } @Bean public Queue fanoutSecondQueue() { return new Queue("fanout.second"); } @Bean public Queue fanoutThirdQueue() { return new Queue("fanout.third"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingExchangeFanoutFirst(Queue fanoutFirstQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutFirstQueue).to(fanoutExchange); } @Bean public Binding bindingExchangeFanoutSecond(Queue fanoutSecondQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutSecondQueue).to(fanoutExchange); } @Bean public Binding bindingExchangeFanoutThird(Queue fanoutThirdQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutThirdQueue).to(fanoutExchange); } }
7 . TopicExchange配置
@Configuration public class TopicConfiguration { @Bean public Queue topicFirstQueue() { return new Queue("topic.first"); } @Bean public Queue topicAnyQueue() { return new Queue("topic.any"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean public Binding bindingExchangeTopicFirst(Queue topicFirstQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicFirstQueue).to(topicExchange).with("topic.first"); } @Bean public Binding bindingExchangeTopicAny(Queue topicAnyQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicAnyQueue).to(topicExchange).with("topic.#"); } }