AMQP與RabbitMQ

 

 

轉載請註明原文地址:http://www.javashuo.com/article/p-qkuxhakj-ne.htmlhtml

 

一:AMQP是什麼

  AMQP(Advanced Message Queuing Protocol)高級消息隊列協議,是基於JMS進行進一步擴展和優化的異步消息處理協議。spring

  其在JMS的基礎上,提供了更多的方法。安全

  AMQP引入了消息交換機Exchange的概念,實現了消息生產者與消息隊列之間的解耦。消息再也不直接發送到隊列或者主題,而是統一發送給Exchange,由交換機根據路由規則,將消息分發到不一樣隊列中。服務器

  AMQP還引入了Channel概念,將一個connection細分爲不一樣channel,適用於多線程場景下,消息消費者與AMQP服務器只需創建一個TCP鏈接便可,各個線程對應不一樣channel,經過channel實現消息的提取。網絡

  

二:AMQP模型

  • Broker: 接收和分發消息的應用,RabbitMQ Server就是一個消息處理實體。
  • Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,相似於網絡中的namespace概念。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange/queue等。
  • Connection: publisher/consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。
  • Channel: 若是每一次訪問RabbitMQ都創建一個Connection,在消息量大的時候創建TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。
  • Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最終被送到這裏等待consumer取走。一個message能夠被同時拷貝到多個queue中。
  • Binding: exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。

 

三:AMQP消息傳送模型

  

  發佈者(Publisher)發佈消息(Message),傳送到broker;多線程

  在broker中,消息被交換機(Exchange)根據路由規則,經過binding傳送到不一樣的隊列;app

  最後 AMQP 代理會將消息投遞給訂閱了此隊列的消費者(push API),或者消費者按照需求自行獲取(pull API)。異步

 

四:Exchange分發策略

  Direct:當消息的routing key 與 binding 的 routing key 直接匹配,消息路由到該隊列函數

  Topic:   當消息routing key 與 binding 的 routing key 符合通配符匹配,消息路由到該隊列(請百度通配符匹配)spring-boot

  Headers:  當消息參數表中的頭信息和值都與 binding參數表中匹配的話,消息路由到該隊列

    Fanout: 任何消息直接匹配到全部隊列上

 

五:RabbitMQ的使用

  一、SpringBoot集成RabbitMQ

  1)配置pom包:spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  2)配置RabbitMQ服務器信息

  首先,須要一臺主機安裝並啓動RabbitMQ。

  而後在項目中配置:

spring.application.name=應用名

spring.rabbitmq.host=RabbitMQ服務器的host
spring.rabbitmq.port=5672
spring.rabbitmq.username=RabbitMQ服務器的登陸賬號
spring.rabbitmq.password=密碼

  3)配置消息隊列

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }

}

  4)使用模版,實現消息生產者

@component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);//自動轉換爲消息對象並要求發送到hello隊列
    }

}

  5)實現消息接收者——建立監聽器,監聽hello隊列,一旦有消息則調用process函數進行處理

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }

}

 

  二、Python使用RabbitMQ

  1)下載RabbitMQ並解壓

  下載網址: http://www.rabbitmq.com/install-generic-unix.html 

  解壓後,進入 sbin 目錄, 運行 server。

  默認端口爲5672。

  2)pip安裝AMQP協議實現模塊——pika

  3)消息生產者:

# -*- coding: utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='first', type='fanout')

channel.queue_declare(queue='hello')
channel.queue_bind(exchange='first', queue='hello')

channel.basic_publish(exchange='first', routing_key='', body='Hello World!')
  • 獲取鏈接.
  • 從鏈接上獲取一個 channel.
  • 聲明一個 exchange . (只會建立一次)
  • 聲明一個 queue . (只會建立一次)
  • 把 queue 綁定到 exchange 上.
  • 向指定的 exchange 發送一條消息.

 

  4)消息消費者

# -*- coding: utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):#消息處理函數
    print body

channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
  • 獲取鏈接.
  • 從鏈接上拿到 channel .
  • 聲明須要的 queue .
  • 定義一個從指定 queue 獲取消息的回調處理.
  • 開始接收消息.

 

  5)查看rabbitMQ的服務狀態

  使用 rabbitmqctl 這個工具。例如:查看當前的隊列狀況

./rabbitmqctl list_queues

 

  

六:深刻

  • 持久化
  • 調度策略
  • 分配策略
  • 狀態反饋
相關文章
相關標籤/搜索