RabbitMQ指南(Java)

原文地址:http://www.rabbitmq.com/getstarted.html 翻譯得很差,歡迎指出。html

1、Hello World

一、基本概念介紹

RabbitMQ是一個消息代理(或者說消息隊列),它的主要意圖很明顯,就是接收和轉發消息。你能夠把它想象成一個郵局:當你把一封郵件放入郵箱,郵遞員會幫你把郵件送到收件人的手上。在這裏,RabbitMQ就比如一個郵箱、郵局或者郵遞員。java

RabbitMQ和郵局的主要區別在於,RabbitMQ不是處理郵件,而是接收、存儲和將消息以二進制的方式轉發出去。shell


在這裏,咱們先說明一些RabbitMQ中涉及到的術語。編程

  • 生產者(Producer)。生產表示只負責發送的意思,一個只負責發送消息的程序稱爲一個生產者,咱們經過一個P來表示一個生產者,以下圖:
    Producerwindows

  • 隊列(Queue)。隊列就比如一個郵箱,它在RabbitMQ的內部。雖然消息在RabbitMQ和程序之間傳遞,可是它們是存儲在隊列中的。一個隊列沒有大小的限制,你想存儲多少條消息就存儲多少條,它的本質是一個無限大的緩衝區。任何生產者均可以往一個隊列裏發送消息,一樣的,任何消費者也能夠從一個隊列那接收到消息。咱們用下圖來表示一個隊列,隊列上面的文字表示這個隊列的名字:
    Queue數組

  • 消費者(Consumer)。接收和發送的過程很相似,一個消費者程序一般是等待別人發送消息給它。咱們經過一個C來表示一個消費者,以下圖:
    Consumer緩存

注意一點,消費者、生產者和消息隊列能夠不用運行在同一臺機器上。實際上,在大多數的應用程序中,它們並非在同一臺機器上運行的。安全

二、"Hello World"

在這一小節中,咱們將編寫兩個Java程序,一個做爲生產者,發送一條簡單的消息;另外一個做爲消費者,接收並打印出接收到的消息。在這裏,咱們先不討論Java API的具體細節,而是先編寫出一個可運行的「Hello World」程序。服務器

在下圖中,「P」表示一個生產者,"C"表示一個消費者,中間的矩形表示一個隊列。併發

一個簡單的結構

RabbitMQ會涉及許多協議,其中AMQP協議是一個開放式的、通用的消息協議。RabbitMQ支持不少編程語言,咱們這裏經過RabbitMQ提供的Java客戶端來進行演示。請自行下載、安裝RabbitMQ和相關jar包。

(1)發送

發送端

在下面的代碼中,咱們用Send來表示一個發送端,用Recv來表示一個接收端。發送端會先鏈接RabbitMQ,併發送一個簡單的消息後關閉。


對於Send而言,咱們須要導入一些相關的類:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

首先,先創建一個類,併爲隊列起一個名:

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) 
        throws Exception {
        ...
    }
}

而後咱們建立一個connection:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

connection是一個抽象的sockect鏈接,在鏈接時,咱們須要注意一下登陸認證信息。在這裏咱們鏈接到本地機器,因此填寫上「localhost」。若是咱們想要鏈接到其餘機器上,只須要填寫上其餘機器的IP地址便可。

接下來,咱們建立一個channel,大多數的API均可以經過這個channel來獲取。

若是要發送消息,咱們必須先聲明一個隊列,而後咱們才能夠把消息發送到這個隊列裏去:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

聲明一個隊列後,僅當隊列不存在時,隊列纔會被建立。另外,消息的內容是一個字節數組,因此咱們須要先對消息進行編碼。最後,咱們須要關閉鏈接。

channel.close();
connection.close();

以上就是Send.java類的全部代碼

發送端運行不起來

若是你是第一次使用RabbitMQ,並且沒有看到打印的「Sent」信息,你可能會奇怪是哪裏出問題了。這裏有多是RabbitMQ在啓動時沒有足夠的磁盤空間(默認狀況下最少須要1Gb的空間),因此它會拒絕接收任何消息。經過查看日誌文件能夠肯定是不是由這個緣由所形成的。若是有必要,咱們也能夠經過修改配置文件中的disk_free_limit來下降大小的限制。

(2)接收

上面的代碼是發送端的。咱們的接收端是從RabbitMQ那獲取消息,因此並非像發送端那樣發送一個簡單的消息,而是須要一直監聽獲取消息,並打印輸出。

接收端

對於接收端而言,它須要導入以下的類:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer是一個實現了Consumer接口的類,咱們可使用它來獲取發送過來的消息。

跟Send同樣,咱們先建立一個connection和channel,並聲明一個接收消息的隊列。注意一點,這裏的隊列名要和Send的相匹配。

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv)
        throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        ...
    }
}

在這裏咱們一樣聲明瞭一個隊列,由於咱們可能在啓動發送端以前就先啓動了接收端,在咱們開始接收消息以前,咱們要先確認隊列是否存在。

咱們告訴RabbitMQ經過這個隊列給咱們發送消息,所以RabbitMQ會異步的給咱們推送消息,咱們須要提供一個回調對象用來緩存消息,直到咱們準備使用它。這就是DefaultConsumer所作的事情。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

以上就是Recv.java類的全部代碼

三、代碼整合

編譯並運行以上程序,你會看到接收端將會收到並打印出來自發送端的「Hello World!」消息。

你能夠在類路徑下編譯這些文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

爲了運行它們,你須要rabbitma-client.jar和它的依賴文件。在一個終端運行發送者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

而後運行接收者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

在windows環境中,咱們使用分號代替冒號來分隔classpath上的選項。

Hint

你能夠設置一個環境變量到classpath中:

 $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
 $ java -cp $CP Send
在windows上:
 > set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
 > java -cp %CP% Send

四、我的補充

(1)登陸認證

在實際使用中,RabbitMQ並非簡單的經過指定一個IP就能夠進行鏈接的,它還須要指定端口號、用戶名和密碼。就像是這樣:

factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");

當沒有明確指明登陸認證信息的時候,就會使用默認值來進行登陸,以上都是默認的認證信息。

另外,咱們也能夠經過設置URI來進行鏈接,URI的格式以下:

amqp://username:password@host:port

factory.setUri("amqp://guest:guest@localhost:5672");

2、工做隊列

這裏寫圖片描述
在第一部分中,咱們的程序結構很是簡單。在接下來,咱們將會建立一個工做隊列,向多個消費者分發任務。

一、準備

在以前的程序中,咱們發送了一個包含「Hello World!」的消息,如今咱們發送一些字符串用來表示複雜的任務。在這裏,由於咱們沒有真正意義上的任務,好比調整圖片的大小或者渲染pdf文件,因此咱們經過Thread.sleep()函數來模擬程序的處理時間。咱們用字符串中的句號來表明它的複雜度,每個句號表示要花費一秒的工做時間。例如,一個「Hello...」的字符串就表示它須要3秒鐘的處理時間。

咱們會稍微修改咱們以前程序中的Send.java代碼,使其容許發送包含任意內容的消息。這個程序將會定時向隊列中發送任務,咱們把它命名爲NewTask.java:

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些幫助咱們從命令行的參數中獲取消息的函數:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

咱們的Recv.java程序也須要做出一些修改:它須要從隊列中獲取消息,並統計消息中有多少個「.」,而後sleep相應的時間。咱們把它取名爲Worker.java:

final Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
        }
    }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

編譯以上程序:

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

二、輪詢分發

使用任務隊列的優點之一是任務的並行處理。若是如今積壓了一大堆任務,咱們僅須要添加更多的消費者便可,這是很容易擴展的。

首先,咱們嘗試同時運行兩個消費者實例,他們都會從隊列裏去獲取消息,以下。

你須要打開三個控制檯,其中兩個用來運行消費者程序,分別稱爲C1和C2

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

第三個控制檯用來運行生產者程序。一旦你開啓了消費者程序,就能夠啓動生產者了:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

讓咱們看看消費者獲得了什麼消息:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默認狀況下,RabbitMQ會一直把消息發送給下一個消費者,平均狀況下,每一個消費者得到的消息是同樣多的。這種分發方式叫作輪詢,你能夠嘗試運行更多的消費者。

三、消息確認

處理一個任務可能須要花費數秒時間,你可能會好奇若是一個消費者執行了一個長任務,而且在完成處理前就掛了的狀況下會發送什麼事。就拿咱們當前的代碼來講,一旦RabbitMQ將消息傳遞給消費者,消息就會從內存中刪除。在這種狀況下,若是你強行關閉正在運行的消費者,那麼它正在處理的消息就會丟失。那些發送給這個消費者但尚未開始處理的消息也會一併丟失。

可是,咱們並不想丟失任何消息。實際上,若是一個消費者掛了,咱們更但願將消息傳遞給其餘的消費者消費。

爲了保證消息不會丟失,RabbitMQ支持消息確認(acknowledgments)機制。Ack是由消費者發送的,用來告訴RabbitMQ這個消息已經接收到並處理完成,能夠從內存中刪除它了。

若是一個消費者沒有發送ack就掛了,RabbitMQ會認爲這個消息沒有處理完成並將消息從新入隊。若是這時有其餘消費者在運行,它將會把這個消息發送給另外一個消費者。經過這種方式,即便消費者掛了,也能夠確保消息不會丟失。

在這裏不存在超時的概念,只有在消費者掛了的狀況下,RabbitMQ纔會重發消息,不然就會一直等待消息的處理,即便須要花費很長的時間來處理。

消息確認機制默認狀況下是開啓的。如今咱們要關閉它,改成手動提交確認信號。當處理完一個任務後,咱們將手動提交。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

經過這段代碼,咱們能夠保證即便你經過Ctrl+C來關閉一個消費者,也不會丟失任何消息。那些沒有發送確認信號的消息將會很快被重發。

忘記發送確認信號

忘記寫basicAck是一個很廣泛的錯誤,可是這會產生嚴重的後果。當你的客戶端退出後,全部的消息將會被從新發送,RabbitMQ會愈來愈佔內存,由於它不會刪除那些沒有發送確認信號的消息。

想要調試這種類型的錯誤,你可使用rabbitmqctl打印出messages_unacknowledged屬性:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

四、消息持久化

咱們已經學習瞭如何保證在消費者掛了的狀況下,消息也不會丟失。可是若是RabbitMQ掛了,咱們的消息仍然會丟失。

當RabbitMQ退出或崩潰時,它將會丟失全部隊列和消息,除非你讓它不要這麼作。經過兩個方面能夠保證消息不會丟失:對消息和隊列進行持久化處理。

首先,咱們須要保證RabbitMQ不會丟失咱們的隊列。爲此,咱們須要將一個隊列聲明爲一個持久化的隊列:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

雖然這個代碼是正確的,可是它卻不會執行,由於咱們已經定義了一個非持久化的hello隊列。RabbitMQ不容許使用不一樣的參數去從新定義一個已經存在的隊列,若是你強行這樣作,它將會返回一個錯誤。有一個快速的解決方案,就是從新聲明一個不一樣名字的隊列,好比task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

須要在生產者和消費者的代碼中對queuqDeclare進行修改。

至此,咱們能夠保證即便RabbitMQ重啓,task_queue隊列也不會丟失。如今,咱們須要對消息進行持久化,經過設置MessageProperties(實現了BasicProperties)的值爲PERSISTENT_TEXT_PLAIN便可。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意:

消息的持久化並不能徹底保證消息不會丟失,雖然它會告訴RabbitMQ把消息保存在硬盤上,可是從接收消息到保存消息之間,仍是須要必定的時間的。一樣,RabbitMQ沒有對每一個消息作fsync(2)——消息僅僅存在於緩存中,並無真正的寫入硬盤。因此這個持久化並非健壯的,可是對於簡單的工做隊列來講已經徹底足夠了。若是你須要更強大的持久化的話,你能夠考慮使用publisher confirms機制。

五、公平分發

你可能會注意到,分發的過程並非咱們所但願的那樣。例如在某一狀況下有兩個消費者,RabbitMQ默認將序號爲奇數的消息發送給第一個消費者,序號爲偶數的消息發送給另外一個消費者,當序號爲奇數的消息都是一些很耗時的任務,而序號爲偶數的消息都是一些小任務,那麼就會形成第一個消費者一直處於忙碌狀態,而另外一個消費者處理完畢後會處於空等待的狀態。RabbitMQ並不會在乎這些事情,它只會一直均勻的分發消息。

這種狀況的發生是由於RabbitMQ只負責分發消息,它並不關心一個消費者返回了多少個確認信號(即處理了多少條消息),它只是盲目的將第n條消息往第n個消費者發送罷了。

這裏寫圖片描述

爲了解決這個問題,咱們可使用basicQos方法,設置prefetchCount = 1。它將會告訴RabbitMQ不要同時給一個消費者超過一個任務,換句話說,就是在一個消費者發送確認信息以前,不要再給它發送新消息了。取而代之的是將消息發送給下一個空閒的消費者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意隊列的大小

若是全部的消費者都處於繁忙中,你的隊列很快就會被佔滿。你須要注意這件事,而且添加更多的消費者或者經過其餘策略來解決他。

六、代碼整合

最終的NewTask.java以下:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) 
                      throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = getMessage(argv);

        channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }      
    //...
}

Worker.java的代碼:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");   
            doWork(message); 
            System.out.println(" [x] Done" );

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
    //...
}

3、發佈和訂閱

在以前的示例中,咱們建立了一個工做隊列,在這以前,咱們都假設每個消息都準確的發送到一個消費者那裏。在接下來,咱們將作一些徹底不一樣的事情——將一個消息發送到多個消費者,這種模式被稱爲發佈和訂閱模式。

爲了說明這個模式,咱們將會建立一個簡單的日誌系統,它由兩部分程序組成,一個是發送日誌消息,另外一個是接收並打印日誌消息。

在咱們的日誌系統中,每個運行的接收程序都會獲取一個消息的拷貝副本。經過這種方式,咱們可讓一個消費者把日誌記錄到硬盤中,同時可讓另外一個消費者把日誌輸出到屏幕上。

在本質上,發送日誌消息至關於廣播到全部接收者。

一、交換機

在以前,咱們都是直接從一個隊列中發送或獲取消息。如今是時候介紹RabbitMQ中的full messaging模型了。

讓咱們快速複習下在先前部分中咱們所學的知識:

  • 一個發送消息的生產者是一個用戶程序。
  • 一個存儲消息的隊列是一個緩衝區。
  • 一個接收消息的消費者是一個用戶程序。

    在RabbitMQ的消息模型中,核心思想是生產者不直接將消息發送給隊列。實際上,生產者甚至徹底不知道消息會被髮送到哪一個隊列中。


    相反,生產者只能將消息發送到交換機上。交換機是一個很是簡單的東西,它一邊從生產者那裏接收消息,一邊向隊列推送消息。交換機必須確切的知道它想要把消息發送給哪些接收者。例如是否發送到一個特定的隊列中?仍是發送給不少個隊列?或者是把消息丟棄等等。這些東西都經過交換機的類型來規定。


    這裏寫圖片描述

    交換機的類型包括:direct, topic, headers和fanout。咱們先關注fanout。讓咱們先建立一個這種類型的交換機,並稱呼它爲logs:

channel.exchangeDeclare("logs", "fanout");

fanout類型的交換機很是簡單,經過它的名字,你可能已經猜出它的用處了。它將會以廣播的方式把接收到的消息發送到它所知道的隊列中去,這個正是咱們所須要的。

交換機列表

經過使用rabbitmqctl命令,咱們能夠列出服務器中的全部交換機:

$
sudo rabbitmqctl list_exchanges Listing exchanges ...
        direct amq.direct      direct amq.fanout      fanout amq.headers     headers amq.match       headers amq.rabbitmq.log      
topic amq.rabbitmq.trace      topic amq.topic       topic logs   
fanout ...done.

在這個列表裏,有一些以amq.*開頭的交換機和默認(沒有名字)的交換機。這些都是默認建立的,並且你不太可能會使用到它們。

匿名交換機

在以前的部分中,咱們對交換機毫無概念,但仍然能將消息發送到隊列中,那是由於咱們使用了默認的交換機,經過使用空串("")來標識它。

回想一下以前是如何發送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

這裏的第一個參數就是交換機的名字。空串表示它是默認交換機或者是匿名交換機:若是routingKey存在的話,消息將經過routingKey路由到特定的隊列中去。

如今,咱們定義本身命名的交換機:

channel.basicPublish( "logs", "", null, message.getBytes());

二、臨時隊列

你可能會想起咱們以前使用的隊列是有特定的名稱的(hello和task_queue)。對於咱們來講,爲一個隊列命名是很是有必要的,咱們須要指定多個消費者到同一個隊列獲取消息。當你想在多個生產者和消費者之間共用一個隊列,那麼爲隊列命名就很是重要了。

可是對於咱們目前來講還不須要。咱們想要監聽全部日誌消息,而不是它的一個子集。一樣,咱們只會對最新的消息感興趣,而不是舊消息。爲了解決這個問題,咱們須要作兩件事。

第一,不管何時鏈接RabbitMQ,咱們都須要一個新的空隊列。爲了這樣作,咱們會建立一個隨機的名字,或者直接讓服務器給咱們一個隨機的名字。

第二,一旦咱們與消費者斷開,隊列應該被自動刪除。

在Java中,當咱們使用無參的queueDeclare(),它將會爲咱們建立一個非持久化的、專用的、自動刪除、帶隨機名字的隊列:

String queueName = channel.queueDeclare().getQueue();

在這裏,隊列名queueName的值是一個隨機產生的字符串,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。

三、綁定

這裏寫圖片描述

咱們已經建立一個fanout類型的交換機和一個隊列。如今咱們須要告訴交換機發送消息到咱們的隊列裏。在這裏,把交換機和隊列之間的關係稱爲綁定。

channel.queueBind(queueName, "logs", "");

從如今開始,logs交換機將會把消息發送到咱們的隊列中去。

綁定列表

你能夠經過使用rabbitmqctl list_bindings來列出全部綁定。

四、代碼整合

這裏寫圖片描述

生產者程序負責發送日誌消息,看起來跟之前的代碼沒有什麼區別。最重要的改變是,如今咱們把消息發送到咱們的logs交換機中,而不是匿名交換機。在發送時咱們須要指定一個routingKey,可是在使用fanout類型的交換機時,routingKey的值會被忽略。這是咱們的EmitLog.java程序:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

正如你所看到的,在創建一個connection以後,咱們聲明瞭一個交換機。這個步驟是必須的,由於RabbitMQ禁止把消息發送到一個不存在的交換機。

若是交換機上沒有綁定任何隊列的話,消息將會被丟棄。可是這個對咱們來講是能夠接受的,若是沒有消費者監聽,咱們能夠安全的丟棄消息。

ReceiveLogs.java的代碼以下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

像以前那樣編譯程序,我已經編譯完成了。

$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

若是你想要把日誌保存到文件中,你只須要打開控制檯並輸入:

$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

若是你但願在屏幕中看到日誌,新建一個終端並運行:

$ java -cp .:rabbitmq-client.jar ReceiveLogs

固然,爲了發送日誌,輸入:

$ java -cp .:rabbitmq-client.jar EmitLog

使用rabbitmqctl list_bindings,你能夠驗證這代碼確實建立和綁定了咱們想要的隊列。隨着兩個ReceiveLogs.java程序的運行,你能夠看到以下的信息:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

這個結果簡單明瞭:數據從logs交換機發送到服務器安排的兩個隊列中去,那正是咱們所指望的。

4、路由

在上一節,咱們創建了一個簡單的日誌系統,咱們能夠將日誌信息廣播到多個接收者那裏去。

接下來,咱們將要實現只訂閱部分消息。例如,咱們只將error類型的日誌信息保存到硬盤中去,固然依舊在控制檯中打印出全部的日誌信息。

一、綁定

在以前的例子中,咱們已經創建了綁定,你能夠回顧一下代碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個交換機跟一個隊列之間的綁定,能夠理解爲:就是這個隊列只對這個交換機發送過來的消息感興趣。

綁定還有另外的一個routingKey參數,爲了不跟basic_publish的參數搞混,咱們把它稱爲binding key。如下是咱們經過一個key建立的一個綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

這個binding key的意義取決於交換機的類型,咱們以前使用的fanout類型的交換機是會忽略它的值的。

二、direct交換類型

咱們以前的日誌系統是將全部消息廣播到全部消費者那裏去。咱們想要擴展它,讓它能根據消息的嚴重性來進行過濾。例如,咱們只把error類型的錯誤日誌記錄到硬盤中去,而不是將硬盤空間浪費在warning或info類型的日誌上。

咱們使用的fanout類型的交換機沒有太多的靈活性,它只能無腦的進行廣播。

因此咱們將會使用direct類型的交換機來替換它。direct類型交換機的路由規則很簡單——它只將消息發送到那些binding key和routing key徹底匹配的隊列中去。

爲了說明清楚,咱們看一下下面的結構圖:

這裏寫圖片描述

在這裏,咱們能夠看到direct類型的交換機X綁定了兩個隊列。第一個隊列經過一個orange關鍵字來綁定,第二個隊列綁定了兩個關鍵字,分別爲black和green。


經過這樣的綁定,當交換器接收到routing key爲orange的消息的時候,就會發送到隊列Q1中去;當接收到 routing key爲black或green的消息的時候,就會發送到隊列Q2中去;其餘類型的消息則會被丟棄。

三、多重綁定

這裏寫圖片描述

用一個binding key綁定多個隊列是合法的,在上面的例子中,咱們還可使用black將X和Q1綁定起來。在這裏,direct類型的交換機就跟fanout類型的交換機同樣,會把消息發送到全部的匹配的隊列中去。一個routing key爲black的消息將會被髮送到Q1和Q2中去。

四、發送日誌

咱們將會在咱們的日誌系統中使用這種模型,使用direct類型的交換機來替換fanout類型的交換機。咱們能夠把routing key做爲日誌消息的嚴重級別,經過這種方式,接收程序就能夠選擇對應的級別進行接收。總之,讓咱們先看一下如何發送日誌:

首先,咱們須要建立一個交換機:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

而後咱們準備發送一條消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

在這裏爲了簡化,咱們先約定日誌的級別分爲info、warning和error三種。

五、訂閱

接收消息的部分跟以前的程序沒有什麼不一樣。惟一的區別就是咱們要爲隊列建立它感興趣的binding key。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

六、代碼整合

這裏寫圖片描述

EmitLogDirect.java的代碼:

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java的代碼

import com.rabbitmq.client.*;
import java.io.IOException;
    
public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

像以前那樣編譯它們。爲了方便,當咱們運行實例時,咱們使用一個$CP的環境變量來表示類的路徑。

若是你只想保存warning和error類型的日誌到文件裏去,你能夠打開控制檯,並輸入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

若是你想要在屏幕中看到全部的日誌信息,你能夠打開一個新的終端並作以下操做:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

另外,例如想要發送error類型的日誌信息,能夠輸入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

5、topic類型

在上一部分中,咱們改進了咱們的日誌系統,使用direct類型的交換機代替fanout類型的交換機,獲得了一個具備選擇性的日誌接收系統。

雖然使用了direct類型的交換機改進了咱們的系統,可是它依然不完善,它不能根據多個條件進行路由。

在咱們的日誌系統中,咱們可能不只想要獲得各類級別的日誌,還想要獲得日誌的發送源。你可能從syslog unix tool瞭解過這個概念,它基於severity(info/warn/crit...) 和facility (auth/cron/kern...)來路由日誌信息。

那將會給咱們帶來更多的靈活性,由於咱們可能只須要監聽那些來自cron的critical日誌,而不是全部的kern日誌。

爲了在咱們的日誌系統中實現此功能,咱們須要使用更加複雜的類型——topic類型的交換機

一、topic類型的交換機

發送到topic類型的交換機的消息不能是隨意的routing key,它必須是一個經過點分隔的字符串列表。字符串的內容什麼均可以,可是咱們通常會給它一個有意義的名字。例如一些合法的routing key就像這樣:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。Routing key中你想要寫多少個單詞均可以,它的長度上線是255個字節。

Binding key也是同樣。Topic類型的交換機的邏輯跟direct類型的很是類似,它也是把消息發送到與routing key匹配的隊列中去。然而,它的binding key卻有兩種很是特殊的用法:

  • *(星號)能代替任何一個單詞。
  • (井號)能代替0個或任意多個單詞。

下面這個例子很容易理解:

這裏寫圖片描述

在這個例子中,咱們發送的消息都是用來描述動物的。每一個消息的routing key都包含三個單詞,第一個單詞用來描述動物的速度,第二個用來表示顏色,第三個用來表示物種:" . . "。


咱們爲交換機和隊列建立三個綁定關係,隊列Q1經過".orange."來綁定,隊列Q2經過 "..rabbit"和"lazy.#"來綁定。


這些綁定關係能夠歸納爲:

  • Q1只對橙色的動物感興趣。
  • Q2想要了解兔子和那些行動緩慢的動物。

一個routing key爲"quick.orange.rabbit"的消息將會被髮送到全部隊列。帶有"lazy.orange.elephant"的消息也會被髮往全部隊列中去。而對於"quick.orange.fox"來講,它只會被髮送到Q1隊列中,"lazy.brown.fox"只會被髮送到Q2隊列中。而"lazy.pink.rabbit"只會被髮送到Q2隊列中一次,儘管它匹配了Q2隊列的兩個routing key。對於"quick.brown.fox"來講,沒有routing key和它匹配,因此它會被丟棄。

若是咱們不遵照以前的約定,發送一條只帶一個單詞或四個單詞的消息,例如"orange"或"quick.orange.male.rabbit",會發生什麼事呢?好吧,這些消息都會由於沒有找到匹配routing key而被丟棄。

另外一方面,對於"lazy.orange.male.rabbit"來講,雖然它包含四個單詞,可是它卻和最後一個routing key("lazy.#")匹配,因此它會被髮送到Q2隊列中去。

Topic類型的交換機

Topic類型的交換機的功能十分強大,能夠媲美其餘類型的交換機。

當一個隊列的binding key爲一個「#」,它將接收全部消息,就像fanout類型的交換機同樣。

當binding key中沒有包含「*」或「#」字符的話,topic類型的交換機就至關於direct類型的交換機。

二、代碼整合

咱們將在咱們的日誌系統中使用topic類型的交換機。咱們假設日誌消息的routing key經過兩個單詞組成,就像" . "。

代碼幾乎跟先前的同樣。

EmitLogTopic.java的代碼以下:

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代碼以下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

接收全部日誌類型:

$ java -cp $CP ReceiveLogsTopic "#"

從"kern"那接收日誌信息:

$ java -cp $CP ReceiveLogsTopic "kern.*"

或者只接收「critical」類型的日誌:

$ java -cp $CP ReceiveLogsTopic "*.critical"

你還能夠創建多個綁定關係:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

或者直接發送routing key爲"kern.critical"類型的日誌:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
<p><b>一些有趣的問題:</b></p>

- "*"會匹配到routing key爲空的消息嗎?
- "#.*"會匹配到". ."嗎?或者只匹配一個單詞嗎?
- "a.*.#」和"a.#"有什麼不一樣?
相關文章
相關標籤/搜索