消息: 在應用之間傳遞的數據c++
消息中間件:Message Queue Middleware,簡稱MQ,是指利用高效可靠的消息機制進行與平臺無關的數據交流,並基於數通訊機制來進行分佈式系統的集成。經過提供的消息傳遞和消息排隊模型,它能夠在分佈式環境下拓張進程間的通訊.vim
傳遞的模式: 1.點對點(P2P,Point-to-Point)安全
2.發佈訂閱模式(Pub/Sub)app
解耦:在項目的啓動之初來預測未來會碰到什麼需求是極其困難的。消息中間件在處理過程當中間插入一個隱含的,基於數據的接口層,兩邊的處理過程都要實現這一個接口,這容許你獨立的拓展或修改兩邊的處理過程,只要確保他們遵循一樣的接口約束便可。異步
冗餘(存儲):有些狀況下,處理數據的過程會失敗。消息中間件能夠把數據進行持久化直到他們已經被徹底處理,經過這一方式規避了數據丟失的風險。在把一個消息從消息中間件中刪除以前,須要你的處理系統明確地指出這個消息被處理完成,從而確保你地數據被安全地保存知道你使用完畢。分佈式
擴展性:消息中間件解耦了應用地處理過程,因此提供了消息入隊和處理地效率是很容易地,只須要另外增長處理過程便可,不須要改變代碼,也不須要調節參數。ide
削峯:在訪問量劇增地狀況下,應用仍然須要繼續發揮做用,可是這個月地突發流量的狀況的不常見。.net
可恢復性:當系統一部分的組件失效時,不會影響到整個系統。消息中間件下降了進程間的耦合度。線程
順序保證:在大所屬使用場景下,數據處理順序很重要,大部分消息中間件支持必定程度上的順序性。unix
緩衝:在任何重要的系統中,都會存在須要不一樣處理時間的元素。消息中間件經過一個緩衝層來幫助任務最高效率的執行,寫入消息中間件的處理會盡量快速。
異步通訊:在不少時候應用不想也不須要當即處理消息。消息中間件提供了異步通訊機制,容許應用把一些消息放入消息中間件中,但並不當即處理它,在以後須要的時候再慢慢處理。
由於RabbitMQ採用Erlang語言編寫的,因此須要配置erlang的環境,配置以下:
1.安裝gcc和openssl模塊
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
2.安裝
yum -y install ncurses-devel
3.指定編譯後程序的路徑
./configure --prefix=/usr/erlang
4.編譯安裝
make
make install
5.配置環境變量
配置好以後使用source /etc/profile將該配置文件生效,生效以後輸入erl回車能夠看到:
1.這裏我下載的時rabbitmq-server-generic-unix-3.7.9.tar.xz,使用xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz,tar -xvf rabbitmq-server-generic-unix-3.7.9.tar解壓。
2.配置環境變量
vim /etc/profile
3.使用守護線程的模式啓動rabbitmq
rabbitmq-server -detached
4.添加用戶 rabbitmqctl add_user root root123 rabbitmqctl set_permissions -p / root "." "." ".*" rabbitmqctl set_user_tags root administrator
5.雖然已經啓動了rabbitmq,可是後臺管理尚未打開,須要使用下面的命令打開後臺管理。
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
此時仍是有問題,須要關閉防火牆,systemctl stop firewalld.service
此時訪問http://192.168.123.124:15672 登陸後能夠看到以下的界面:
到此爲止咱們就已經對rabbitmq有了基本認識,也能作些基本安裝配置。
簡單的例子:
1.導入依賴:
<dependencies> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies>
消息生產者:
public class DemoProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUNTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.124.129"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root@123"); //建立連接 Connection connection = factory.newConnection(); //建立信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); //建立一個持久化,非排他的,非自動刪除的的隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //將交換器與隊列經過路由鍵綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY); //發送一條持久化的消息:hello world; String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //關閉資源 channel.close(); connection.close(); } }
消息消費者:
public class ConsumerDemo { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUNTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.124.129"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[]{ new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root@123"); //這裏的連接方式和生產者的連接連接方式有所不一樣,須要區別對待 Connection connection = factory.newConnection(addresses); //建立信道 final Channel channel = connection.createChannel(); //設置客戶端最多接收爲被ack的消息個數 channel.basicQos(64); Consumer consumer = new DefaultConsumer(channel) { [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message:" + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } }
執行結果: