RabbitMQ的安裝與使用

一、主流的消息中間件簡單介紹哦。java

  1)、ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線,而且它一個徹底支持jms(java message service)規範的消息中間件。其豐富的api,多種集羣構建模式使得他成爲業界老牌消息中間件,在中小企業中應用普遍。
若是不是高併發的系統,對於ActiveMQ,是一個不錯的選擇的,豐富的api,讓你開發的很愉快喲。
注意:MQ衡量指標:服務性能,數據存儲,集羣架構。
node

  2)、kafka是LinkedIn開源的分佈式發佈/訂閱消息系統,目前歸屬於Apache頂級項目。kafka主要特色是基於Pull的模式來處理消息消費,最求高吞吐量,一開始的目的就是用於日誌收集和傳輸,0.8版本開始支持複製,不支持事務,對消息的重複,丟失,錯誤沒有嚴格要求,適量產生大量數據的互聯網服務的數據收集業務。web

  3)、RocketMQ是阿里開源的消息中間件,目前也已經孵化爲了Apache頂級項目,它是純java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於kafka,它對消息的可靠傳輸以及事務性作了優化,目前在阿里集團被普遍用於交易,充值,流計算、消息推送、日誌流式處理,binglog分發等場景。vim

  4)、RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現的。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱模式)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
centos

二、RabbitMQ的簡單介紹。api

  RabbitMQ是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。
瀏覽器

三、RabbitMQ高性能的緣由所在是什麼呢?
  答:RabbitMQ所使用的開發語言是ErLang語言,ErLang其最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是很是優秀的。Erlang的優勢,Erlang有着和原生Socket同樣的延遲。性能十分優越。
安全

四、AMQP高級消息隊列協議是什麼?
  答:AMQP全稱是Advanced Message Queuing Protocol(高級消息隊列協議)。AMQP定義是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。
服務器

五、AMQP協議模型。
網絡

詳細介紹以下所示:

1)、Server,又稱爲Broker,接受客戶端的鏈接,實現AMQP實體服務。
2)、Connection,鏈接,應用程序與Broker的網絡鏈接。
3)、Channel,網絡信道,幾乎全部的操做都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可創建多個Channel,每一個Channel表明一個會話任務。
4)、Message,消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Propertie能夠對消息進行修飾,好比消息的優先級,延遲等高級特性,Body則就是消息體內容。
5)、Virtual Host,虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個Virtual Host裏面能夠有若干個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或者Queue。
6)、Exchange,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。
7)、Binding,Exchange和Queue之間的虛擬鏈接,binding中能夠包含routing key。
8)、Routing key,一個路由規則,虛擬機能夠用它來肯定如何路由一個特定消息。
9)、Queue,也稱爲Message Queue,消息隊列,保存消息並將它們轉發給消費者。 

六、RabbitMQ的架構設計以下所示:

七、RabbitMQ的安裝。RabbitMQ的官方網址:https://www.rabbitmq.com/

能夠選擇本身RabbitMQ的版本,以及對應的Erlang的版本。這裏使用rabbitmq-server-3.6.5-1.noarch.rpm一鍵安裝方式進行安裝RabbitMQ的方式。

搭建RabbitMQ所需包:

a)、erlang-18.3-1.el7.centos.x86_64.rpm這個是erlang語言基礎安裝包。

b)、rabbitmq-server-3.6.5-1.noarch.rpm這個是rabbitmq服務端安裝包。

c)、socat-1.7.3.2-1.1.el7.x86_64.rpm這個是socat密鑰。

能夠下載安裝包,而後進行安裝便可:

1 wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
2 wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
3 wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

 首先安裝Erlang的語言基礎安裝包,安裝過程以下所示:

1 [root@slaver4 package]# ls
2 erlang-18.3-1.el7.centos.x86_64.rpm  haproxy-1.6.5.tar.gz  keepalived-1.2.18.tar.gz  rabbitmq_delayed_message_exchange-0.0.1.ez  rabbitmq-server-3.6.5-1.noarch.rpm  socat-1.7.3.2-1.1.el7.x86_64.rpm
3 [root@slaver4 package]# rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
4 Preparing...                          ################################# [100%]
5 Updating / installing...
6    1:erlang-18.3-1.el7.centos         ################################# [100%]
7 [root@slaver4 package]#

開始安裝密鑰包,以下所示:

1 [root@slaver4 package]# rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 
2 warning: socat-1.7.3.2-1.1.el7.x86_64.rpm: Header V4 RSA/SHA1 Signature, key ID 87e360b8: NOKEY
3 Preparing...                          ################################# [100%]
4 Updating / installing...
5    1:socat-1.7.3.2-1.1.el7            ################################# [100%]
6 [root@slaver4 package]#

開始安裝rabbitmq服務器端,以下所示:

1 [root@slaver4 package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 
2 warning: rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA1 Signature, key ID 6026dfca: NOKEY
3 Preparing...                          ################################# [100%]
4 Updating / installing...
5    1:rabbitmq-server-3.6.5-1          ################################# [100%]
6 [root@slaver4 package]#

八、rpm安裝方式已經幫助你配置好了環境這些東西,比解壓縮安裝好點,由於解壓縮安裝還須要手動配置環境變量的。接下來,配置一下RabbitMQ。配置以下所示:

 1 [root@slaver4 package]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/
 2 [root@slaver4 ebin]# ls
 3 background_gc.beam                   rabbit_epmd_monitor.beam               rabbit_plugins_main.beam
 4 delegate.beam                        rabbit_error_logger.beam               rabbit_plugins_usage.beam
 5 delegate_sup.beam                    rabbit_error_logger_file_h.beam        rabbit_policies.beam
 6 dtree.beam                           rabbit_exchange.beam                   rabbit_policy.beam
 7 file_handle_cache.beam               rabbit_exchange_parameters.beam        rabbit_prelaunch.beam
 8 file_handle_cache_stats.beam         rabbit_exchange_type_direct.beam       rabbit_prequeue.beam
 9 gatherer.beam                        rabbit_exchange_type_fanout.beam       rabbit_priority_queue.beam
10 gm.beam                              rabbit_exchange_type_headers.beam      rabbit_queue_consumers.beam
11 lqueue.beam                          rabbit_exchange_type_invalid.beam      rabbit_queue_index.beam
12 mirrored_supervisor_sups.beam        rabbit_exchange_type_topic.beam        rabbit_queue_location_client_local.beam
13 mnesia_sync.beam                     rabbit_file.beam                       rabbit_queue_location_min_masters.beam
14 mochinum.beam                        rabbit_framing.beam                    rabbit_queue_location_random.beam
15 pg2_fixed.beam                       rabbit_guid.beam                       rabbit_queue_location_validator.beam
16 pg_local.beam                        rabbit_hipe.beam                       rabbit_queue_master_location_misc.beam
17 rabbit_access_control.beam           rabbit_limiter.beam                    rabbit_recovery_terms.beam
18 rabbit_alarm.beam                    rabbit_log.beam                        rabbit_registry.beam
19 rabbit_amqqueue_process.beam         rabbit_memory_monitor.beam             rabbit_resource_monitor_misc.beam
20 rabbit_amqqueue_sup.beam             rabbit_mirror_queue_coordinator.beam   rabbit_restartable_sup.beam
21 rabbit_amqqueue_sup_sup.beam         rabbit_mirror_queue_master.beam        rabbit_router.beam
22 rabbit.app                           rabbit_mirror_queue_misc.beam          rabbit_runtime_parameters.beam
23 rabbit_auth_mechanism_amqplain.beam  rabbit_mirror_queue_mode_all.beam      rabbit_sasl_report_file_h.beam
24 rabbit_auth_mechanism_cr_demo.beam   rabbit_mirror_queue_mode.beam          rabbit_ssl.beam
25 rabbit_auth_mechanism_plain.beam     rabbit_mirror_queue_mode_exactly.beam  rabbit_sup.beam
26 rabbit_autoheal.beam                 rabbit_mirror_queue_mode_nodes.beam    rabbit_table.beam
27 rabbit.beam                          rabbit_mirror_queue_slave.beam         rabbit_trace.beam
28 rabbit_binding.beam                  rabbit_mirror_queue_sync.beam          rabbit_upgrade.beam
29 rabbit_boot_steps.beam               rabbit_mnesia.beam                     rabbit_upgrade_functions.beam
30 rabbit_channel_sup.beam              rabbit_mnesia_rename.beam              rabbit_variable_queue.beam
31 rabbit_channel_sup_sup.beam          rabbit_msg_file.beam                   rabbit_version.beam
32 rabbit_cli.beam                      rabbit_msg_store.beam                  rabbit_vhost.beam
33 rabbit_client_sup.beam               rabbit_msg_store_ets_index.beam        rabbit_vm.beam
34 rabbit_connection_helper_sup.beam    rabbit_msg_store_gc.beam               supervised_lifecycle.beam
35 rabbit_connection_sup.beam           rabbit_node_monitor.beam               tcp_listener.beam
36 rabbit_control_main.beam             rabbit_parameter_validation.beam       tcp_listener_sup.beam
37 rabbit_ctl_usage.beam                rabbit_password.beam                   truncate.beam
38 rabbit_dead_letter.beam              rabbit_password_hashing_md5.beam       vm_memory_monitor.beam
39 rabbit_diagnostics.beam              rabbit_password_hashing_sha256.beam    worker_pool.beam
40 rabbit_direct.beam                   rabbit_password_hashing_sha512.beam    worker_pool_sup.beam
41 rabbit_disk_monitor.beam             rabbit_plugins.beam                    worker_pool_worker.beam
42 [root@slaver4 ebin]# vim rabbit.app

修改內容如是:{loopback_users, <<"guest">>},修改成{loopback_users, [guest]}。這個是用戶的設置。必須修改的。

九、RabbitMQ安裝成功之後,就能夠進行RabbitMQ的服務啓動和中止。

 1 [root@slaver4 ~]# rabbitmq-server start &
 2 [1] 14092
 3 [root@slaver4 ~]# 
 4               RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
 5   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
 6   ##  ##
 7   ##########  Logs: /var/log/rabbitmq/rabbit@slaver4.log
 8   ######  ##        /var/log/rabbitmq/rabbit@slaver4-sasl.log
 9   ##########
10               Starting broker...
11  completed with 0 plugins.
12 
13 [root@slaver4 ~]#

啓動完成之後,如何驗證啓動是否正常呢,使用以下命令能夠查看RabbitMQ啓動是否正常。能夠看到RabbitMQ的進程號,以及協議名稱等等。

1 [root@slaver4 ~]# lsof -i:5672
2 COMMAND   PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
3 beam    14206 rabbitmq   48u  IPv6  70172      0t0  TCP *:amqp (LISTEN)
4 [root@slaver4 ~]# 

如何中止RabbitMQ呢,可使用以下所示中止方式,以下所示:

1 [root@slaver4 ~]# rabbitmqctl stop
2 Stopping and halting node rabbit@slaver4 ...
3 Gracefully halting Erlang VM

可使用[root@slaver4 ~]# rabbitmq-plugins list命令查看默認提供了什麼樣的插件。

 1 [root@slaver4 ~]# rabbitmq-plugins list
 2  Configured: E = explicitly enabled; e = implicitly enabled
 3  | Status:   * = running on rabbit@slaver4
 4  |/
 5 [  ] amqp_client                       3.6.5
 6 [  ] cowboy                            1.0.3
 7 [  ] cowlib                            1.0.1
 8 [  ] mochiweb                          2.13.1
 9 [  ] rabbitmq_amqp1_0                  3.6.5
10 [  ] rabbitmq_auth_backend_ldap        3.6.5
11 [  ] rabbitmq_auth_mechanism_ssl       3.6.5
12 [  ] rabbitmq_consistent_hash_exchange 3.6.5
13 [  ] rabbitmq_event_exchange           3.6.5
14 [  ] rabbitmq_federation               3.6.5
15 [  ] rabbitmq_federation_management    3.6.5
16 [  ] rabbitmq_jms_topic_exchange       3.6.5
17 [  ] rabbitmq_management               3.6.5
18 [  ] rabbitmq_management_agent         3.6.5
19 [  ] rabbitmq_management_visualiser    3.6.5
20 [  ] rabbitmq_mqtt                     3.6.5
21 [  ] rabbitmq_recent_history_exchange  1.2.1
22 [  ] rabbitmq_sharding                 0.1.0
23 [  ] rabbitmq_shovel                   3.6.5
24 [  ] rabbitmq_shovel_management        3.6.5
25 [  ] rabbitmq_stomp                    3.6.5
26 [  ] rabbitmq_top                      3.6.5
27 [  ] rabbitmq_tracing                  3.6.5
28 [  ] rabbitmq_trust_store              3.6.5
29 [  ] rabbitmq_web_dispatch             3.6.5
30 [  ] rabbitmq_web_stomp                3.6.5
31 [  ] rabbitmq_web_stomp_examples       3.6.5
32 [  ] sockjs                            0.3.4
33 [  ] webmachine                        1.10.3

那麼安裝RabbitMQ成功之後,如何安裝管理臺或者管控臺的插件呢,以下所示操做:

 1 [root@slaver4 ~]# rabbitmq-plugins enable rabbitmq_management
 2 The following plugins have been enabled:
 3   mochiweb
 4   webmachine
 5   rabbitmq_web_dispatch
 6   amqp_client
 7   rabbitmq_management_agent
 8   rabbitmq_management
 9 
10 Applying plugin configuration to rabbit@slaver4... started 6 plugins.
11 [root@slaver4 ~]# 

安裝好管控臺插件之後就可使用瀏覽器進行驗證(管控臺的默認端口號是15672,5672是java端通訊的端口號,25672是集羣進行通訊的端口號),訪問地址如是:http://192.168.110.133:15672/。帳號和密碼默認就是guest喲。

十、命令行和管控臺的基本操做。

 1 經常使用命令以下所示:
 2 # 關閉應用
 3 [root@slaver4 ~]# rabbitmqctl stop_app
 4 # 啓動應用
 5 [root@slaver4 ~]# rabbitmqctl start_app
 6 # 節點狀態,查看集羣節點狀態是什麼樣子的
 7 [root@slaver4 ~]# rabbitmqctl status
 8 # 添加用戶
 9 [root@slaver4 ~]# rabbitmqctl add_user username password
10 # 列出全部用戶
11 [root@slaver4 ~]# rabbitmqctl list_users
12 # 刪除用戶
13 [root@slaver4 ~]# rabbitmqctl delete_user username
14 # 清除用戶權限
15 [root@slaver4 ~]# rabbitmqctl clear_permissions -p vhostpath username
16 # 列出用戶權限
17 [root@slaver4 ~]# rabbitmqctl list_user_permissions username
18 # 修改用戶密碼
19 [root@slaver4 ~]# rabbitmqctl change_password username newpassword
20 # 設置用戶權限
21 [root@slaver4 ~]# rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
22 
23 RabbitMQ支持對虛擬主機,交換機,隊列這些進行操做。經常使用命令以下所示:
24 # 建立虛擬主機
25 [root@slaver4 ~]# rabbitmqctl add_vhost vhostpath
26 # 列出全部虛擬主機
27 [root@slaver4 ~]# rabbitmqctl list_vhosts
28 # 列出虛擬主機上全部權限
29 [root@slaver4 ~]# rabbitmqctl list_permissions -p vhostpath
30 # 刪除虛擬主機
31 [root@slaver4 ~]# rabbitmqctl delete_vhosts vhostpath
32 # 列出全部隊列信息
33 [root@slaver4 ~]# rabbitmqctl list_queues
34 # 清除隊列裏的信息
35 [root@slaver4 ~]# rabbitmqctl -p vhostpath purge_queue blue
36 
37 命令行和管控臺的高級操做。
38 # 移除全部數據,要在rabbitmqctl stop_app以後使用
39 [root@slaver4 ~]# rabbitmqctl reset
40 # 組成集羣命令,ram是加入節點的時候能夠指定存儲模式。
41 [root@slaver4 ~]# rabbitmqctl join_cluster <clusternode> [--ram]
42 # 查看集羣的狀態
43 [root@slaver4 ~]# rabbitmqctl cluster_status
44 # 修改集羣節點的存儲形式
45 [root@slaver4 ~]# rabbitmqctl change_cluster_node_type disc | ram
46 # 忘記節點(摘除節點)
47 [root@slaver4 ~]# rabbitmqctl forget_cluster_node [--offline]
48 # 修改節點名稱
49 [root@slaver4 ~]# rabbitmqctl rename_cluster_node oladnode1 newnode1 [oldnode2] [newnode2...]

命令行能夠操做的命令,在管控臺也能夠進行響應的操做,下面是管控臺的菜單欄介紹:

十一、RabbitMQ的消息生產和消息。

生產者生產消息的代碼以下所示:

 1 package com.example.bie;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.BasicProperties;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         ConnectionFactory,獲取鏈接工廠。
16  * 
17  *         Connection,一個鏈接。
18  * 
19  *         Channel,數據通訊信道,能夠發送和接受消息。
20  * 
21  *         Queue,具體的消息存儲隊列。
22  * 
23  *         Producer和Consumer,生產和消費者。
24  */
25 public class RabbitMqProducer {
26 
27     public static void main(String[] args) {
28         try {
29             // 一、建立一個ConnectionFactory
30             ConnectionFactory connectionFactory = new ConnectionFactory();
31             // 配置服務器ip地址,端口號,虛擬主機
32             connectionFactory.setHost("192.168.110.133");
33             connectionFactory.setPort(5672);
34             connectionFactory.setVirtualHost("/");
35 
36             // 二、建立鏈接工廠建立鏈接
37             Connection connection = connectionFactory.newConnection();
38 
39             // 三、經過connection建立一個Channel
40             Channel channel = connection.createChannel();
41 
42             // 四、經過Channel發送數據。消息組成部分就是props(即消息的附加屬性)和body(消息實體)。
43             // 生產者發送消息,只須要指定exchange和routingKey。
44             String exchange = "";// 數據通訊信道,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。
45             // 一個路由規則,虛擬機能夠用它來肯定如何路由一個特定消息。
46             String routingKey = "queue001";
47             BasicProperties props = null;// 消息的附加屬性
48             // 循環發送消息
49             System.out.println("開始生產消息......");
50             for (int i = 0; i < 100; i++) {
51                 // 消息實體
52                 // String msg = "Hello RabbitMQ!";
53                 byte[] body = (String.valueOf(i) + " hello RabbitMQ!!!").getBytes();
54                 // 若是exchange是空的話,會使用AMQP default這個Exchange。
55                 // 而後會根據routingKey的名稱去隊列裏面找到名稱對應的,而後將消息路由過去。
56                 channel.basicPublish(exchange, routingKey, props, body);
57             }
58 
59             // 五、關閉鏈接,原則,由小到大進行關閉
60             channel.close();
61             connection.close();
62         } catch (IOException e) {
63             e.printStackTrace();
64         } catch (TimeoutException e) {
65             e.printStackTrace();
66         }
67     }
68 
69 }

消費者消費消息的代碼以下所示:

 1 package com.example.bie;
 2 
 3 import java.io.IOException;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
 9 import com.rabbitmq.client.Channel;
10 import com.rabbitmq.client.Connection;
11 import com.rabbitmq.client.ConnectionFactory;
12 import com.rabbitmq.client.Consumer;
13 import com.rabbitmq.client.ConsumerCancelledException;
14 import com.rabbitmq.client.QueueingConsumer;
15 import com.rabbitmq.client.QueueingConsumer.Delivery;
16 import com.rabbitmq.client.ShutdownSignalException;
17 
18 /**
19  * 
20  * @author biehl
21  *
22  */
23 public class RabbitMqConsumer {
24 
25     public static void main(String[] args) {
26         try {
27             // 一、建立一個ConnectionFactory
28             ConnectionFactory connectionFactory = new ConnectionFactory();
29             // 配置服務器ip地址,端口號,虛擬主機
30             connectionFactory.setHost("192.168.110.133");
31             connectionFactory.setPort(5672);
32             connectionFactory.setVirtualHost("/");
33 
34             // 二、建立鏈接工廠建立鏈接
35             Connection connection = connectionFactory.newConnection();
36 
37             // 三、經過connection建立一個Channel
38             Channel channel = connection.createChannel();
39 
40             // 四、建立(聲明)一個隊列
41             String queue = "queue001";// 隊列
42             boolean durable = true;// 是否持久化,true是持久化,false是不持久化
43             // 獨佔的方式,只有一個channel能夠去監聽,其餘channel不能進行監聽。保證了順序消費。
44             boolean exclusive = false;
45             boolean autoDelete = false;// 隊列沒有和Exchange綁定,就進行自動刪除
46             // 擴展參數
47             Map<String, Object> arguments = new HashMap<String, Object>();
48             DeclareOk declareOk = channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
49             System.out.println("consumerCount: " + declareOk.getConsumerCount());
50 
51             // 五、建立消費者,指定參數,消費者創建在那個channel鏈接之上
52             QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
53 
54             // 六、對channel進行設置。queue是設置要消費的隊列名稱。
55             boolean autoAck = true;// 是否自動簽收。
56             Consumer callback = queueingConsumer;//
57             channel.basicConsume(queue, autoAck, callback);
58 
59             // 七、獲取消息
60             // 消費者建立起來了,消費者監聽的隊列建立起來了。接下來就獲取消息。
61             // delivery是消息封裝的對象
62             System.out.println("等待消費......");
63             while (true) {
64                 // 獲取消息
65                 Delivery delivery = queueingConsumer.nextDelivery();
66                 String body = new String(delivery.getBody());
67                 System.out.println("消費端body: " + body);
68                 System.out.println("envelope" + delivery.getEnvelope().toString());
69             }
70 
71             // 八、關閉鏈接,原則,由小到大進行關閉
72             // channel.close();
73             // connection.close();
74         } catch (IOException e) {
75             e.printStackTrace();
76         } catch (TimeoutException e) {
77             e.printStackTrace();
78         } catch (ShutdownSignalException e) {
79             e.printStackTrace();
80         } catch (ConsumerCancelledException e) {
81             e.printStackTrace();
82         } catch (InterruptedException e) {
83             e.printStackTrace();
84         }
85     }
86 
87 }

實現的效果,除了控制檯的輸出,你也能夠在管控臺裏面查看對應的效果,如鏈接Connection的個數、Channel的個數、Exchange的個數、Queue的個數、Consumer的個數、以及主頁折線圖展現的最新消息個數、消費速率等等信息。觀察這些變化以達到監控的目的。

十二、RabbitMQ的Exchange交換機,Exchange接受消息,而且根據路由鍵轉發消息所綁定的隊列。交換機的屬性以下所示:

a)、Name:交換機的名稱。
b)、Type:交換機的類型direct,topic,fanout,headers。
  1)、Direct Exchange(即直連交換機),全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。
    注意:Direct模式可使用RabbitMQ自帶的Exchange,default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。
  2)、Topic Exchange(即話題交換機),全部發送到Topic Exchange的消息被轉發到全部關心RouteKey中指定Topic的Queue上。Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic。注意:可使用通配符進行模糊匹配。符號,"#"匹配一個或者多個詞,符號"*"匹配很少很多一個詞。
  3)、Fanout Exchange,不處理路由鍵,只須要簡單的將隊列綁定到交換機上面。發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上面。Fanout交換機轉發消息是最快的。
  4)、Headers Exchange,根據消息頭進行路由,不是很經常使用。
c)、Durability:是否須要持久化,true爲持久化。
d)、Auto Delete:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange。
e)、Internal:當前Exchange是否用於RabbitMQ內部使用,默認爲false。
f)、Arguments:擴展參數,用戶擴展AMQP協議自制定化使用。

 

做者:別先生

博客園:https://www.cnblogs.com/biehongli/

若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。

相關文章
相關標籤/搜索