轉載請註明原文地址:http://www.javashuo.com/article/p-qkuxhakj-ne.htmlhtml
AMQP(Advanced Message Queuing Protocol)高級消息隊列協議,是基於JMS進行進一步擴展和優化的異步消息處理協議。spring
其在JMS的基礎上,提供了更多的方法。安全
AMQP引入了消息交換機Exchange的概念,實現了消息生產者與消息隊列之間的解耦。消息再也不直接發送到隊列或者主題,而是統一發送給Exchange,由交換機根據路由規則,將消息分發到不一樣隊列中。服務器
AMQP還引入了Channel概念,將一個connection細分爲不一樣channel,適用於多線程場景下,消息消費者與AMQP服務器只需創建一個TCP鏈接便可,各個線程對應不一樣channel,經過channel實現消息的提取。網絡
發佈者(Publisher)發佈消息(Message),傳送到broker;多線程
在broker中,消息被交換機(Exchange)根據路由規則,經過binding傳送到不一樣的隊列;app
最後 AMQP 代理會將消息投遞給訂閱了此隊列的消費者(push API),或者消費者按照需求自行獲取(pull API)。異步
Direct:當消息的routing key 與 binding 的 routing key 直接匹配,消息路由到該隊列函數
Topic: 當消息routing key 與 binding 的 routing key 符合通配符匹配,消息路由到該隊列(請百度通配符匹配)spring-boot
Headers: 當消息參數表中的頭信息和值都與 binding參數表中匹配的話,消息路由到該隊列
Fanout: 任何消息直接匹配到全部隊列上
1)配置pom包:spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2)配置RabbitMQ服務器信息
首先,須要一臺主機安裝並啓動RabbitMQ。
而後在項目中配置:
spring.application.name=應用名 spring.rabbitmq.host=RabbitMQ服務器的host spring.rabbitmq.port=5672 spring.rabbitmq.username=RabbitMQ服務器的登陸賬號 spring.rabbitmq.password=密碼
3)配置消息隊列
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
4)使用模版,實現消息生產者
@component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context);//自動轉換爲消息對象並要求發送到hello隊列 } }
5)實現消息接收者——建立監聽器,監聽hello隊列,一旦有消息則調用process函數進行處理
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
二、Python使用RabbitMQ
1)下載RabbitMQ並解壓
下載網址: http://www.rabbitmq.com/install-generic-unix.html
解壓後,進入 sbin 目錄, 運行 server。
默認端口爲5672。
2)pip安裝AMQP協議實現模塊——pika
3)消息生產者:
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='first', type='fanout') channel.queue_declare(queue='hello') channel.queue_bind(exchange='first', queue='hello') channel.basic_publish(exchange='first', routing_key='', body='Hello World!')
4)消息消費者
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):#消息處理函數 print body channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()
5)查看rabbitMQ的服務狀態
使用 rabbitmqctl 這個工具。例如:查看當前的隊列狀況
./rabbitmqctl list_queues