1. 消息:指的是在應用之間傳送的數據,好比json字符串、純文本字符串等java
2. 消息隊列中間件:指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下進行進程之間的通訊。如今經常使用的消息中間件有RabbitMQ、ActiveMQ、Kafka等shell
3. 消息隊列中間件的消息傳遞模式:json
4. 消息中間件的做用:vim
參考在Centos中RabbitMQ的安裝步驟centos
一、首先安裝erlang ,下載erlang的安裝包到centos上,bash
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
而且進行安裝 rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
查看是夠安裝成功多線程
[root@localhost Desktop]# erl併發
Erlang/OTP 19 [erts-8.0.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]異步
Eshell V8.0.3 (abort with ^G)
輸入halt().退出erl
二、安裝rabbitMQ
第一種在線下載,先下載async
rpm:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下載完成後安裝:
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
三、開放端口
能夠選擇直接關閉防火牆,執行命令
systemctl stop firewalld.service
或者
vim /etc/sysconfig/iptables
#添加一下內容
#RabbitMQ
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 25672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5672 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 4369 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 5671 -j ACCEPT
##RabbitMQ
四、啓動rabbit
service rabbitmq-service start
或者
/sbin/service rabbitmq-server start
五、訪問
遊覽器輸入下列地址,便可進入RabbitMQ的管理界面:
http://localhost:15672/
1. 建立Maven工廠,導入RabbitMQ的Java客戶端的相關jar包:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
2. 添加RabbitMQ的管理員帳號:在CentOS中啓動RabbitMQ後,執行指令,添加一個root用戶,而且密碼爲123456
rabbitmqctl add_user root 123456
設置帳戶權限,開放全部權限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
設置帳戶爲管理員角色
rabbitmqctl set_user_tags root administrator
3. 消息產生者客戶端代碼:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitMQProducer { private static final String IP="192.168.10.128"; private static final String USER="root"; private static final String PASSWORD="123456"; private static final int PORT=5672; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=null; Channel channel=null; try { String exchange_demo="exchange_demo";//聲明一個交換器名稱 String queue_demo="queue_demo";//聲明一個隊列名稱 String route_demo="route_demo";//聲明一個路由鍵,用於綁定交換器和隊列 ConnectionFactory fac=new ConnectionFactory();//獲取一個rabbitMQ鏈接池,並設置相關參數 fac.setHost(IP); fac.setPassword(PASSWORD); fac.setUsername(USER); fac.setPort(PORT); //從鏈接池中獲取一個rabbitMQ鏈接 connection=fac.newConnection(); channel=connection.createChannel();//建立一個頻道 channel.exchangeDeclare(exchange_demo, "direct",false,false,null);//建立一個type爲direct,持久化的、非自動刪除的交換器 channel.queueDeclare(queue_demo, true, false, false, null);//建立一個持久化、非排他的、非自動刪除的交換器 channel.queueBind(queue_demo, exchange_demo, route_demo);//將交換器和隊列經過路由鍵綁定 //發送一條消息 String message="Hello World"; channel.basicPublish(exchange_demo, route_demo, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } finally { //關閉資源 if(connection!=null){ if(channel!=null){ channel.close();//能夠不用關閉,當connection關閉後,channel也會自動關閉 } connection.close(); } } } }
4. 消息消費者客戶端:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitMQConsumer { private static final String IP="192.168.10.128"; private static final String USER="root"; private static final String PASSWORD="123456"; private static final int PORT=5672; private static final String QUEUE_NAME="queue_demo"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=null; Address[] address={new Address(IP,PORT)}; try { String queue_demo="queue_demo";//聲明一個隊列名稱 ConnectionFactory fac=new ConnectionFactory();//獲取一個rabbitMQ鏈接池,並設置相關參數 fac.setPassword(PASSWORD); fac.setUsername(USER); //從鏈接池中獲取一個rabbitMQ鏈接 connection=fac.newConnection(address); final Channel channel=connection.createChannel();//建立一個頻道 channel.basicQos(64);//設置客戶端最多接收未被ack的消息個數 Consumer con=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("get message:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue_demo, con); if(channel!=null){ channel.close(); } } finally { //關閉資源 if(connection!=null){ connection.close(); } } } }