一、主流的消息中間件簡單介紹哦。java
1)、ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線,而且它一個徹底支持jms(java message service)規範的消息中間件。其豐富的api,多種集羣構建模式使得他成爲業界老牌消息中間件,在中小企業中應用普遍。
若是不是高併發的系統,對於ActiveMQ,是一個不錯的選擇的,豐富的api,讓你開發的很愉快喲。
注意:MQ衡量指標:服務性能,數據存儲,集羣架構。node
2)、kafka是LinkedIn開源的分佈式發佈/訂閱消息系統,目前歸屬於Apache頂級項目。kafka主要特色是基於Pull的模式來處理消息消費,最求高吞吐量,一開始的目的就是用於日誌收集和傳輸,0.8版本開始支持複製,不支持事務,對消息的重複,丟失,錯誤沒有嚴格要求,適量產生大量數據的互聯網服務的數據收集業務。web
3)、RocketMQ是阿里開源的消息中間件,目前也已經孵化爲了Apache頂級項目,它是純java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於kafka,它對消息的可靠傳輸以及事務性作了優化,目前在阿里集團被普遍用於交易,充值,流計算、消息推送、日誌流式處理,binglog分發等場景。spring
4)、RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現的。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱模式)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
apache
二、RabbitMQ的簡單介紹。vim
RabbitMQ是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據(即RabbitMQ能夠實現跨語言、跨平臺操做),RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。
centos
三、RabbitMQ高性能的緣由所在是什麼呢?
答:RabbitMQ所使用的開發語言是ErLang語言,ErLang其最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是很是優秀的。Erlang的優勢,Erlang有着和原生Socket同樣的延遲。性能十分優越。api
四、AMQP高級消息隊列協議是什麼?
答:AMQP全稱是Advanced Message Queuing Protocol(高級消息隊列協議)。AMQP定義是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。瀏覽器
五、AMQP協議模型。
安全
詳細介紹以下所示:
1)、Server,又稱爲Broker,接受客戶端的鏈接,實現AMQP實體服務。
2)、Connection,鏈接,應用程序與Broker的網絡鏈接。
3)、Channel,網絡信道,幾乎全部的操做都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可創建多個Channel,每一個Channel表明一個會話任務。
4)、Message,消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Propertie能夠對消息進行修飾,好比消息的優先級,延遲等高級特性,Body則就是消息體內容。
5)、Virtual Host,虛擬地址,用於進行邏輯隔離,最上層的消息路由。一個Virtual Host裏面能夠有若干個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或者Queue。
6)、Exchange,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。
7)、Binding,Exchange和Queue之間的虛擬鏈接,binding中能夠包含routing key。
8)、Routing key,一個路由規則,虛擬機能夠用它來肯定如何路由一個特定消息。
9)、Queue,也稱爲Message Queue,消息隊列,保存消息並將它們轉發給消費者。
六、RabbitMQ的架構設計以下所示:
七、RabbitMQ的安裝。RabbitMQ的官方網址:https://www.rabbitmq.com/
能夠選擇本身RabbitMQ的版本,以及對應的Erlang的版本。這裏使用rabbitmq-server-3.6.5-1.noarch.rpm一鍵安裝方式進行安裝RabbitMQ的方式。必定要注意RabbitMQ的版本和Erlang的版本對應哦。點進去Erlang version能夠本身對照版本。
搭建RabbitMQ所需包:
a)、erlang-18.3-1.el7.centos.x86_64.rpm這個是erlang語言基礎安裝包。
b)、rabbitmq-server-3.6.5-1.noarch.rpm這個是rabbitmq服務端安裝包。
c)、socat-1.7.3.2-1.1.el7.x86_64.rpm這個是socat密鑰。
能夠下載安裝包,而後進行安裝便可:
1 wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm 2 wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm 3 wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
首先安裝Erlang的語言基礎安裝包,安裝過程以下所示:
1 [root@slaver4 package]# ls 2 erlang-18.3-1.el7.centos.x86_64.rpm haproxy-1.6.5.tar.gz keepalived-1.2.18.tar.gz rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq-server-3.6.5-1.noarch.rpm socat-1.7.3.2-1.1.el7.x86_64.rpm 3 [root@slaver4 package]# rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 4 Preparing... ################################# [100%] 5 Updating / installing... 6 1:erlang-18.3-1.el7.centos ################################# [100%] 7 [root@slaver4 package]#
開始安裝密鑰包,以下所示:
1 [root@slaver4 package]# rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm 2 warning: socat-1.7.3.2-1.1.el7.x86_64.rpm: Header V4 RSA/SHA1 Signature, key ID 87e360b8: NOKEY 3 Preparing... ################################# [100%] 4 Updating / installing... 5 1:socat-1.7.3.2-1.1.el7 ################################# [100%] 6 [root@slaver4 package]#
開始安裝rabbitmq服務器端,以下所示:
1 [root@slaver4 package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 2 warning: rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA1 Signature, key ID 6026dfca: NOKEY 3 Preparing... ################################# [100%] 4 Updating / installing... 5 1:rabbitmq-server-3.6.5-1 ################################# [100%] 6 [root@slaver4 package]#
八、rpm安裝方式已經幫助你配置好了環境這些東西,比解壓縮安裝好點,由於解壓縮安裝還須要手動配置環境變量的。接下來,配置一下RabbitMQ。配置以下所示:
1 [root@slaver4 package]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/ 2 [root@slaver4 ebin]# ls 3 background_gc.beam rabbit_epmd_monitor.beam rabbit_plugins_main.beam 4 delegate.beam rabbit_error_logger.beam rabbit_plugins_usage.beam 5 delegate_sup.beam rabbit_error_logger_file_h.beam rabbit_policies.beam 6 dtree.beam rabbit_exchange.beam rabbit_policy.beam 7 file_handle_cache.beam rabbit_exchange_parameters.beam rabbit_prelaunch.beam 8 file_handle_cache_stats.beam rabbit_exchange_type_direct.beam rabbit_prequeue.beam 9 gatherer.beam rabbit_exchange_type_fanout.beam rabbit_priority_queue.beam 10 gm.beam rabbit_exchange_type_headers.beam rabbit_queue_consumers.beam 11 lqueue.beam rabbit_exchange_type_invalid.beam rabbit_queue_index.beam 12 mirrored_supervisor_sups.beam rabbit_exchange_type_topic.beam rabbit_queue_location_client_local.beam 13 mnesia_sync.beam rabbit_file.beam rabbit_queue_location_min_masters.beam 14 mochinum.beam rabbit_framing.beam rabbit_queue_location_random.beam 15 pg2_fixed.beam rabbit_guid.beam rabbit_queue_location_validator.beam 16 pg_local.beam rabbit_hipe.beam rabbit_queue_master_location_misc.beam 17 rabbit_access_control.beam rabbit_limiter.beam rabbit_recovery_terms.beam 18 rabbit_alarm.beam rabbit_log.beam rabbit_registry.beam 19 rabbit_amqqueue_process.beam rabbit_memory_monitor.beam rabbit_resource_monitor_misc.beam 20 rabbit_amqqueue_sup.beam rabbit_mirror_queue_coordinator.beam rabbit_restartable_sup.beam 21 rabbit_amqqueue_sup_sup.beam rabbit_mirror_queue_master.beam rabbit_router.beam 22 rabbit.app rabbit_mirror_queue_misc.beam rabbit_runtime_parameters.beam 23 rabbit_auth_mechanism_amqplain.beam rabbit_mirror_queue_mode_all.beam rabbit_sasl_report_file_h.beam 24 rabbit_auth_mechanism_cr_demo.beam rabbit_mirror_queue_mode.beam rabbit_ssl.beam 25 rabbit_auth_mechanism_plain.beam rabbit_mirror_queue_mode_exactly.beam rabbit_sup.beam 26 rabbit_autoheal.beam rabbit_mirror_queue_mode_nodes.beam rabbit_table.beam 27 rabbit.beam rabbit_mirror_queue_slave.beam rabbit_trace.beam 28 rabbit_binding.beam rabbit_mirror_queue_sync.beam rabbit_upgrade.beam 29 rabbit_boot_steps.beam rabbit_mnesia.beam rabbit_upgrade_functions.beam 30 rabbit_channel_sup.beam rabbit_mnesia_rename.beam rabbit_variable_queue.beam 31 rabbit_channel_sup_sup.beam rabbit_msg_file.beam rabbit_version.beam 32 rabbit_cli.beam rabbit_msg_store.beam rabbit_vhost.beam 33 rabbit_client_sup.beam rabbit_msg_store_ets_index.beam rabbit_vm.beam 34 rabbit_connection_helper_sup.beam rabbit_msg_store_gc.beam supervised_lifecycle.beam 35 rabbit_connection_sup.beam rabbit_node_monitor.beam tcp_listener.beam 36 rabbit_control_main.beam rabbit_parameter_validation.beam tcp_listener_sup.beam 37 rabbit_ctl_usage.beam rabbit_password.beam truncate.beam 38 rabbit_dead_letter.beam rabbit_password_hashing_md5.beam vm_memory_monitor.beam 39 rabbit_diagnostics.beam rabbit_password_hashing_sha256.beam worker_pool.beam 40 rabbit_direct.beam rabbit_password_hashing_sha512.beam worker_pool_sup.beam 41 rabbit_disk_monitor.beam rabbit_plugins.beam worker_pool_worker.beam 42 [root@slaver4 ebin]# vim rabbit.app
修改內容如是:{loopback_users, <<"guest">>},修改成{loopback_users, [guest]}。這個是用戶的設置。必須修改的。
九、RabbitMQ安裝成功之後,就能夠進行RabbitMQ的服務啓動和中止。
1 [root@slaver4 ~]# rabbitmq-server start & 2 [1] 14092 3 [root@slaver4 ~]# 4 RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. 5 ## ## Licensed under the MPL. See http://www.rabbitmq.com/ 6 ## ## 7 ########## Logs: /var/log/rabbitmq/rabbit@slaver4.log 8 ###### ## /var/log/rabbitmq/rabbit@slaver4-sasl.log 9 ########## 10 Starting broker... 11 completed with 0 plugins. 12 13 [root@slaver4 ~]#
啓動完成之後,如何驗證啓動是否正常呢,使用以下命令能夠查看RabbitMQ啓動是否正常。能夠看到RabbitMQ的進程號,以及協議名稱等等。
1 [root@slaver4 ~]# lsof -i:5672 2 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME 3 beam 14206 rabbitmq 48u IPv6 70172 0t0 TCP *:amqp (LISTEN) 4 [root@slaver4 ~]#
如何中止RabbitMQ呢,可使用以下所示中止方式,以下所示:
1 [root@slaver4 ~]# rabbitmqctl stop 2 Stopping and halting node rabbit@slaver4 ... 3 Gracefully halting Erlang VM
可使用[root@slaver4 ~]# rabbitmq-plugins list命令查看默認提供了什麼樣的插件。
1 [root@slaver4 ~]# rabbitmq-plugins list 2 Configured: E = explicitly enabled; e = implicitly enabled 3 | Status: * = running on rabbit@slaver4 4 |/ 5 [ ] amqp_client 3.6.5 6 [ ] cowboy 1.0.3 7 [ ] cowlib 1.0.1 8 [ ] mochiweb 2.13.1 9 [ ] rabbitmq_amqp1_0 3.6.5 10 [ ] rabbitmq_auth_backend_ldap 3.6.5 11 [ ] rabbitmq_auth_mechanism_ssl 3.6.5 12 [ ] rabbitmq_consistent_hash_exchange 3.6.5 13 [ ] rabbitmq_event_exchange 3.6.5 14 [ ] rabbitmq_federation 3.6.5 15 [ ] rabbitmq_federation_management 3.6.5 16 [ ] rabbitmq_jms_topic_exchange 3.6.5 17 [ ] rabbitmq_management 3.6.5 18 [ ] rabbitmq_management_agent 3.6.5 19 [ ] rabbitmq_management_visualiser 3.6.5 20 [ ] rabbitmq_mqtt 3.6.5 21 [ ] rabbitmq_recent_history_exchange 1.2.1 22 [ ] rabbitmq_sharding 0.1.0 23 [ ] rabbitmq_shovel 3.6.5 24 [ ] rabbitmq_shovel_management 3.6.5 25 [ ] rabbitmq_stomp 3.6.5 26 [ ] rabbitmq_top 3.6.5 27 [ ] rabbitmq_tracing 3.6.5 28 [ ] rabbitmq_trust_store 3.6.5 29 [ ] rabbitmq_web_dispatch 3.6.5 30 [ ] rabbitmq_web_stomp 3.6.5 31 [ ] rabbitmq_web_stomp_examples 3.6.5 32 [ ] sockjs 0.3.4 33 [ ] webmachine 1.10.3
那麼安裝RabbitMQ成功之後,如何安裝管理臺或者管控臺的插件呢,以下所示操做:
1 [root@slaver4 ~]# rabbitmq-plugins enable rabbitmq_management 2 The following plugins have been enabled: 3 mochiweb 4 webmachine 5 rabbitmq_web_dispatch 6 amqp_client 7 rabbitmq_management_agent 8 rabbitmq_management 9 10 Applying plugin configuration to rabbit@slaver4... started 6 plugins. 11 [root@slaver4 ~]#
安裝好管控臺插件之後就可使用瀏覽器進行驗證(管控臺的默認端口號是15672,5672是java端通訊的端口號,25672是集羣進行通訊的端口號),訪問地址如是:http://192.168.110.133:15672/。帳號和密碼默認就是guest喲。
十、命令行和管控臺的基本操做。
1 經常使用命令以下所示: 2 # 關閉應用 3 [root@slaver4 ~]# rabbitmqctl stop_app 4 # 啓動應用 5 [root@slaver4 ~]# rabbitmqctl start_app 6 # 節點狀態,查看集羣節點狀態是什麼樣子的 7 [root@slaver4 ~]# rabbitmqctl status 8 # 添加用戶 9 [root@slaver4 ~]# rabbitmqctl add_user username password 10 # 列出全部用戶 11 [root@slaver4 ~]# rabbitmqctl list_users 12 # 刪除用戶 13 [root@slaver4 ~]# rabbitmqctl delete_user username 14 # 清除用戶權限 15 [root@slaver4 ~]# rabbitmqctl clear_permissions -p vhostpath username 16 # 列出用戶權限 17 [root@slaver4 ~]# rabbitmqctl list_user_permissions username 18 # 修改用戶密碼 19 [root@slaver4 ~]# rabbitmqctl change_password username newpassword 20 # 設置用戶權限 21 [root@slaver4 ~]# rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" 22 23 RabbitMQ支持對虛擬主機,交換機,隊列這些進行操做。經常使用命令以下所示: 24 # 建立虛擬主機 25 [root@slaver4 ~]# rabbitmqctl add_vhost vhostpath 26 # 列出全部虛擬主機 27 [root@slaver4 ~]# rabbitmqctl list_vhosts 28 # 列出虛擬主機上全部權限 29 [root@slaver4 ~]# rabbitmqctl list_permissions -p vhostpath 30 # 刪除虛擬主機 31 [root@slaver4 ~]# rabbitmqctl delete_vhosts vhostpath 32 # 列出全部隊列信息 33 [root@slaver4 ~]# rabbitmqctl list_queues 34 # 清除隊列裏的信息 35 [root@slaver4 ~]# rabbitmqctl -p vhostpath purge_queue blue 36 37 命令行和管控臺的高級操做。 38 # 移除全部數據,要在rabbitmqctl stop_app以後使用 39 [root@slaver4 ~]# rabbitmqctl reset 40 # 組成集羣命令,ram是加入節點的時候能夠指定存儲模式。 41 [root@slaver4 ~]# rabbitmqctl join_cluster <clusternode> [--ram] 42 # 查看集羣的狀態 43 [root@slaver4 ~]# rabbitmqctl cluster_status 44 # 修改集羣節點的存儲形式 45 [root@slaver4 ~]# rabbitmqctl change_cluster_node_type disc | ram 46 # 忘記節點(摘除節點) 47 [root@slaver4 ~]# rabbitmqctl forget_cluster_node [--offline] 48 # 修改節點名稱 49 [root@slaver4 ~]# rabbitmqctl rename_cluster_node oladnode1 newnode1 [oldnode2] [newnode2...]
命令行能夠操做的命令,在管控臺也能夠進行響應的操做,下面是管控臺的菜單欄介紹:
十一、RabbitMQ的消息生產和消費。生產者Producer發送一條消息,將消息投遞到Rabbitmq的集羣中即Broker中。消費端進行監聽,監聽Rabbitmq隊列,獲取到數據進行消費。
1)、ConnectionFactory,獲取鏈接工廠,須要配置相關信息ip地址、端口號port,虛擬主機vhost。
2)、Connection,經過鏈接工廠獲取到一個鏈接。
3)、Channel,經過鏈接建立一個Channel,網絡通訊信道,能夠發送和接收消息。Channel是Rabbitmq全部進行數據交互的關鍵。
4)、Queue,建立一個隊列,具體的消息存儲隊列。真正的物理的隊列,存在於RabbitMQ的Broker上面。進行存儲消息的功能。
5)、Producer生產者,生產者生產消息和Consumer消費者,消費者消費消息。
方式一,因爲使用的maven構建的springboot2.x版本的項目,引入的依賴包以下所示:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.1.1.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.bie</groupId> 14 <artifactId>rabbitmq</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>rabbitmq</name> 17 <description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.springframework.boot</groupId> 34 <artifactId>spring-boot-starter-test</artifactId> 35 <scope>test</scope> 36 </dependency> 37 <dependency> 38 <groupId>org.springframework.boot</groupId> 39 <artifactId>spring-boot-starter-amqp</artifactId> 40 </dependency> 41 </dependencies> 42 43 <build> 44 <plugins> 45 <plugin> 46 <groupId>org.springframework.boot</groupId> 47 <artifactId>spring-boot-maven-plugin</artifactId> 48 </plugin> 49 </plugins> 50 </build> 51 52 </project>
配置application.properties的配置文件,將rabbitmq所在的服務器地址,端口號,帳號,密碼,以及隊列的名稱。
1 # 給當前項目起名稱. 2 spring.application.name=rabbitmq 3 4 # 配置rabbitmq的參數. 5 # rabbitmq服務器的ip地址. 6 spring.rabbitmq.host=192.168.110.133 7 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號. 8 spring.rabbitmq.port=5672 9 # rabbitmq的帳號. 10 spring.rabbitmq.username=guest 11 # rabbitmq的密碼. 12 spring.rabbitmq.password=guest 13 14 # 隊列的名稱 15 rabbitmq.queue=queue001
首先建立一個隊列,在項目啓動的時候,就進行加載,方便生產者生產的消息保存到隊列裏面。
1 package com.example.bie.config; 2 3 import org.springframework.amqp.core.Queue; 4 import org.springframework.beans.factory.annotation.Value; 5 import org.springframework.context.annotation.Bean; 6 import org.springframework.context.annotation.Configuration; 7 8 /** 9 * 10 * @author biehl 11 * 12 * @Configuration項目啓動加載本類 13 * 14 */ 15 @Configuration 16 public class RabbitMqQueueConfig { 17 18 @Value("${rabbitmq.queue}") 19 private String queueName; 20 21 /** 22 * 建立一個隊列 23 * 24 * @return 25 */ 26 @Bean 27 public Queue createQueue() { 28 return new Queue(this.queueName); 29 } 30 31 }
而後,建立好生產者和消費者之後,可使用web項目的請求,建立一個控制類,來發送消息,觸發生產者生產消息,觸發消費者消費消息。
1 package com.example.bie.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.example.bie.rabbitmq.producer.RabbitmqProducer; 9 10 /** 11 * 12 * @author biehl 13 * 14 */ 15 @Controller 16 public class RabbitmqController { 17 18 @Autowired 19 private RabbitmqProducer rabbitmqProducer; 20 21 @RequestMapping(value = "/sendMessage") 22 @ResponseBody 23 public void rabbitmqSendMessage() { 24 String msg = "消息產===>生者<===消息message: "; 25 for (int i = 0; i < 10000; i++) { 26 rabbitmqProducer.producer(msg + i); 27 } 28 } 29 30 }
生產者生產消息的,實現類,以下所示:
1 package com.example.bie.rabbitmq.producer; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * 10 * @author biehl 11 * 12 * RabbitmqProducer消息發送者 13 * 14 * @Component加入到容器中. 15 * 16 */ 17 @Component 18 public class RabbitmqProducer { 19 20 @Autowired 21 private AmqpTemplate rabbitmqAmqpTemplate; 22 23 @Value("${rabbitmq.queue}") 24 private String queueName; 25 26 /** 27 * 發送消息的方法 28 */ 29 public void producer(String msg) { 30 // 向消息隊列中發送消息 31 // 參數1,隊列的名稱 32 // 參數2,發送的消息 33 this.rabbitmqAmqpTemplate.convertAndSend(this.queueName, msg); 34 } 35 36 }
消費者消費消息的實現類,以下所示:
1 package com.example.bie.rabbitmq.consumer; 2 3 import org.springframework.amqp.rabbit.annotation.RabbitListener; 4 import org.springframework.beans.factory.annotation.Value; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * 9 * @author biehl 10 * 11 * RabbitmqConsumer消息消費者 12 * 13 * 消費者是根據消息隊列的監聽器,進行消息的接收和消費。 14 * 15 * 消息隊列發生變化,消息事件就會產生,觸發方法進行消息的接收。 16 * 17 */ 18 @Component 19 public class RabbitmqConsumer { 20 21 @Value("${rabbitmq.queue}") 22 private String queueName; 23 24 /** 25 * 消費者消費消息,接受消息的方法,採用消息隊列監聽機制. 26 * 27 * @RabbitListener 28 * 29 * 意思是當隊列發生變化,消息事件產生了或者生產者發送消息了。 30 * 31 * 立刻就會觸發這個方法,進行消息的消費。 32 */ 33 @RabbitListener(queues = "queue001") 34 public void consumer(String msg) { 35 // 打印消息 36 System.out.println("消費者===>消費<===消息message: " + msg); 37 } 38 39 }
springboot2.x版本的主啓動類,以下所示:
1 package com.example; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class RabbitmqApplication { 8 9 public static void main(String[] args) { 10 SpringApplication.run(RabbitmqApplication.class, args); 11 } 12 13 }
效果以下所示:
方式二,或者使用下面這種方式,直接進行生產者生產消息和消費者消費消息的測試,生產者生產消息的代碼以下所示:
1 package com.example.bie; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.AMQP.BasicProperties; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 /** 12 * 13 * @author biehl 14 * 15 * ConnectionFactory,獲取鏈接工廠。 16 * 17 * Connection,一個鏈接。 18 * 19 * Channel,數據通訊信道,能夠發送和接受消息。 20 * 21 * Queue,具體的消息存儲隊列。 22 * 23 * Producer和Consumer,生產和消費者。 24 */ 25 public class RabbitMqProducer { 26 27 public static void main(String[] args) { 28 try { 29 // 一、建立一個ConnectionFactory 30 ConnectionFactory connectionFactory = new ConnectionFactory(); 31 // 配置服務器ip地址,端口號,虛擬主機 32 connectionFactory.setHost("192.168.110.133"); 33 connectionFactory.setPort(5672); 34 connectionFactory.setVirtualHost("/"); 35 36 // 二、建立鏈接工廠建立鏈接 37 Connection connection = connectionFactory.newConnection(); 38 39 // 三、經過connection建立一個Channel 40 Channel channel = connection.createChannel(); 41 42 // 四、經過Channel發送數據。消息組成部分就是props(即消息的附加屬性)和body(消息實體)。 43 // 生產者發送消息,只須要指定exchange和routingKey。 44 String exchange = "";// 數據通訊信道,交換機,接受消息,根據路由鍵轉發消息到綁定的隊列。 45 // 一個路由規則,虛擬機能夠用它來肯定如何路由一個特定消息。 46 String routingKey = "queue001"; 47 BasicProperties props = null;// 消息的附加屬性 48 // 循環發送消息 49 System.out.println("開始生產消息......"); 50 for (int i = 0; i < 100; i++) { 51 // 消息實體 52 // String msg = "Hello RabbitMQ!"; 53 byte[] body = (String.valueOf(i) + " hello RabbitMQ!!!").getBytes(); 54 // 若是exchange是空的話,會使用AMQP default這個Exchange。 55 // 而後會根據routingKey的名稱去隊列裏面找到名稱對應的,而後將消息路由過去。 56 channel.basicPublish(exchange, routingKey, props, body); 57 } 58 59 // 五、關閉鏈接,原則,由小到大進行關閉 60 channel.close(); 61 connection.close(); 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } catch (TimeoutException e) { 65 e.printStackTrace(); 66 } 67 } 68 69 }
消費者消費消息的代碼以下所示:
1 package com.example.bie; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.concurrent.TimeoutException; 7 8 import com.rabbitmq.client.AMQP.Queue.DeclareOk; 9 import com.rabbitmq.client.Channel; 10 import com.rabbitmq.client.Connection; 11 import com.rabbitmq.client.ConnectionFactory; 12 import com.rabbitmq.client.Consumer; 13 import com.rabbitmq.client.ConsumerCancelledException; 14 import com.rabbitmq.client.QueueingConsumer; 15 import com.rabbitmq.client.QueueingConsumer.Delivery; 16 import com.rabbitmq.client.ShutdownSignalException; 17 18 /** 19 * 20 * @author biehl 21 * 22 */ 23 public class RabbitMqConsumer { 24 25 public static void main(String[] args) { 26 try { 27 // 一、建立一個ConnectionFactory 28 ConnectionFactory connectionFactory = new ConnectionFactory(); 29 // 配置服務器ip地址,端口號,虛擬主機 30 connectionFactory.setHost("192.168.110.133"); 31 connectionFactory.setPort(5672); 32 connectionFactory.setVirtualHost("/"); 33 34 // 二、建立鏈接工廠建立鏈接 35 Connection connection = connectionFactory.newConnection(); 36 37 // 三、經過connection建立一個Channel 38 Channel channel = connection.createChannel(); 39 40 // 四、建立(聲明)一個隊列 41 String queue = "queue001";// 隊列 42 boolean durable = true;// 是否持久化,true是持久化,false是不持久化 43 // 獨佔的方式,只有一個channel能夠去監聽,其餘channel不能進行監聽。保證了順序消費。 44 boolean exclusive = false; 45 boolean autoDelete = false;// 隊列沒有和Exchange綁定,就進行自動刪除 46 // 擴展參數 47 Map<String, Object> arguments = new HashMap<String, Object>(); 48 DeclareOk declareOk = channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments); 49 System.out.println("consumerCount: " + declareOk.getConsumerCount()); 50 51 // 五、建立消費者,指定參數,消費者創建在那個channel鏈接之上 52 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 53 54 // 六、對channel進行設置。queue是設置要消費的隊列名稱。 55 boolean autoAck = true;// 是否自動簽收。 56 Consumer callback = queueingConsumer;// 57 channel.basicConsume(queue, autoAck, callback); 58 59 // 七、獲取消息 60 // 消費者建立起來了,消費者監聽的隊列建立起來了。接下來就獲取消息。 61 // delivery是消息封裝的對象 62 System.out.println("等待消費......"); 63 while (true) { 64 // 獲取消息 65 Delivery delivery = queueingConsumer.nextDelivery(); 66 String body = new String(delivery.getBody()); 67 System.out.println("消費端body: " + body); 68 System.out.println("envelope" + delivery.getEnvelope().toString()); 69 } 70 71 // 八、關閉鏈接,原則,由小到大進行關閉 72 // channel.close(); 73 // connection.close(); 74 } catch (IOException e) { 75 e.printStackTrace(); 76 } catch (TimeoutException e) { 77 e.printStackTrace(); 78 } catch (ShutdownSignalException e) { 79 e.printStackTrace(); 80 } catch (ConsumerCancelledException e) { 81 e.printStackTrace(); 82 } catch (InterruptedException e) { 83 e.printStackTrace(); 84 } 85 } 86 87 }
實現的效果,除了控制檯的輸出,你也能夠在管控臺裏面查看對應的效果,如鏈接Connection的個數、Channel的個數、Exchange的個數、Queue的個數、Consumer的個數、以及主頁折線圖展現的最新消息個數、消費速率等等信息。觀察這些變化以達到監控的目的。
十二、RabbitMQ的Exchange交換機。Exchange接受消息(即生產者生產的消息,將消息投遞到交換機Exchange上面),而且根據路由鍵轉發消息所綁定的隊列。
RabbitMQ架構圖,概述,以下所示:
1)、藍色的框,主要表示,生產者客戶端將消息投遞(Send Message)到交換機Exchange上面,經過路由關係,將生產者生產的消息路由到指定的隊列裏面。
2)、綠色的框,主要表示,消費者客戶端監聽隊列裏面的消息(Receive Message),進行消費。消費者客戶端和隊列創建了監聽,而後接收消息。
3)、紅色的虛線框,主要表示,RabbitMQ Server服務器。
4)、黃色的框,主要表示,路由鍵Routing key,一個綁定的關係,即交換機Exchange和隊列Queue創建一個綁定Binding。交換機Exchange上面的消息到達那個隊列Queue的規則主要是由路由鍵Routing key來指定的。
1三、交換機的屬性,以下所示:
1)、Name:交換機的名稱,能夠本身指定交換機的名稱。
2)、Type:交換機的類型direct,topic,fanout,headers。
a)、Direct Exchange(即直連交換機,路由鍵Routing key必須一致性),全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。路由規則:Direct Exchange直連交換機,Routing key的名稱必須徹底匹配(即生產者生產消息攜帶的路由鍵和將交換機和隊列綁定的路由鍵必須一致),就會將交換機Exchange上面的消息發送到(路由到)隊列Queue上面。
注意:Direct模式可使用RabbitMQ自帶的Exchange,即default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。
b)、Topic Exchange(即話題交換機,路由鍵Routing key規則匹配或者成爲模糊匹配),全部發送到Topic Exchange的消息被轉發到全部關心RouteKey中指定Topic的Queue上。Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic。注意:可使用通配符進行模糊匹配。符號,"#"匹配一個或者多個詞,符號"*"匹配很少很多一個詞(即*號僅僅能夠匹配一個詞)。路由規則:生產者生產的消息攜帶的路由鍵Routing key,若是交換機與隊列Queue綁定的路由鍵,和生產者生產消息攜帶的路由鍵規則匹配上,就能夠將交換機上面的消息發送到該隊列上。
c)、Fanout Exchange(即廣播交換機,沒有路由鍵Routing key的概念),不處理路由鍵,只須要簡單的將隊列綁定到交換機上面。發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上面(即,一個或者多個隊列綁定交換機,那麼交換機會將消息轉發到一個或者多個隊列上面)。Fanout交換機轉發消息是最快的(性能最好,由於廣播交換機,不作匹配,沒有路由規則)。
d)、Headers Exchange,根據消息頭進行路由,不是很經常使用。
3)、Durability:是否須要持久化,true爲持久化,false表示非持久化。
4)、Auto Delete:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange。值爲true表示自動刪除,值爲false表示不進行自動刪除。
5)、Internal:當前Exchange是否用於RabbitMQ內部使用,默認爲false。基本不使用該屬性。
6)、Arguments:擴展參數,用戶擴展AMQP協議自制定化使用。
1四、RabbitMQ的綁定Binding。
答:Binding綁定,是Exchange和Exchange、Queue之間的鏈接關係。即交換機和交換機能夠綁定,交換機和隊列能夠進行綁定。Binding中能夠包含Routing key或者參數。
1五、RabbitMQ的消息隊列Queue。
答:消息隊列Queue,實際存儲消息數據,在實際的物理磁盤中有一塊空間建立隊列。包含的屬性有,Durability是否持久化,Durable是持久化,Transient是不進行持久化。Auto delete,若是選擇yes表明當最後一個監聽被移除以後,該Queue會自動被刪除。
1六、RabbitMQ的消息Message。
答:消息Message,服務器和應用程序之間傳送的數據。消息本質就是一段數據,由Properties和Payload(即Body)組成。經常使用屬性,delivery mode(消息到Broker上,能夠作持久化,也能夠作內存級別的非持久化),headers(自定義屬性)。content_type,content_encoding(字符集),priority(優先級)。
correlation_id(惟一id),reply_to(消息失敗了返回到那個隊列),expiration(消息的過時時間),message_id(消息的id)。timestamp,type,user_id,app_id,cluster_id。
1七、RabbitMQ的虛擬主機Virtual host。
答:虛擬主機Virtual host,用於進行邏輯隔離,最上層的消息路由。虛擬主機不是物理的概念。一個Virtual Host裏面能夠有若個幹Exchange和Queue。同一個Virtual Host裏面不能有相同名稱的Exchange或者Queue。
做者:別先生
博客園:https://www.cnblogs.com/biehongli/
若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。