RabbitMQ入門(4)--路由

###路由 ###(使用Java客戶端) 在先前的指南中,咱們創建了一個簡單的日誌系統。咱們能夠將咱們的日誌信息廣播到多個接收者。 在這部分的指南中,咱們將要往其中添加一個功能-讓僅僅訂閱一個消息的子集成爲可能。例如,咱們能夠直接將關鍵的錯誤信息指向到日誌文件(保存在愛硬盤空間),同時依舊能打印全部日誌信息到平臺上。java

###綁定python

在以前的例子裏咱們已經建立綁定。你能夠回顧下代碼:git

channel.queueBind(queueName, EXCHANGE_NAME, "");

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange. 一個綁定是一個交換所和一個隊列之間的關係。這個很容易理解爲:這個隊列是對這 交換機的消息感興趣。github

綁定能夠帶上額外的路由關鍵字參數。爲了消除對basic_publish參數的迷惑,咱們將會將它稱之爲綁定關鍵字。如下是咱們如何經過一個關鍵字建立一個綁定:算法

channel.queueBind(queueName, EXCHANGE_NAME, "black");

這綁定關鍵字的意義取決於 交換機類型。這fanout 交換機,咱們以前使用的那個,僅僅忽略它的值。shell

###直接交換 咱們當前的日誌系統將全部消息廣播到全部消費者。咱們想擴展它,讓其容許依據其嚴格的規則過濾消息。例如咱們可能想讓一個往硬盤中寫日誌消息的程序僅僅接收關鍵的錯誤,而不是將硬盤空間浪費在警告和信息的日誌消息上。 咱們使用fanout類型的交換機,那個不會給咱們太多的靈活性-它僅僅能勝任沒頭腦的廣播。windows

咱們可使用direct類型的交換機來替代。一個direct交換機背後的路由算法是簡單的-一個消息將會進入那些隊列的綁定關鍵字與消息中路由關鍵字匹配的隊列中。this

爲了說明那個,考慮接下來結構: direct-exchange.png 在這個結構裏,咱們看見了這direct類型的交換機綁定了兩個隊列。第一個隊列裝有orange綁定關鍵字,這第二個有兩個綁定,一個是black綁定關鍵字而且另外一個是green關鍵之。 在這個結構裏,發送到交換機裏的消息,其中消息中帶路由關鍵字爲orange將要路由到隊列Q1上,消息中帶路由關鍵字爲blackgreen將路由到隊列Q2上。全部其餘類型的消息會被丟棄。 ###多種綁定 direct-exchange-multiple.png 將一個綁定關鍵字綁定到貨個隊列上是十分合法的。在咱們例子中使用綁定關鍵字blackXQ1綁定在一塊兒。既然那樣,這direct類型的交換機與fanout類型類似,一樣會廣播這消息到全部符合的隊列中。一個路由關鍵位balck的關鍵字將會被傳遞到Q1Q2。 ###發出日誌 咱們將會爲咱們的日誌系統使用這個模型。使用direct類型的交換機來代替fanout類型,發送消息。因爲這路由關鍵字咱們能夠嚴格的記錄。接收程序經過這種方式能夠嚴格接收它想接收的。讓咱們首先關注發佈日誌。 總之,咱們首先須要建立個交換機。rest

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

咱們準備發送一個消息:日誌

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

爲了簡化這個事情,咱們保證這severityinof,warning,error中的一個。 ###訂閱 接收消息如先前那樣工做,有一個例外,咱們會把每個咱們感興趣的severity建立一個新的綁定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

###把它們放在一塊兒 python-four.png EmitLogDirect.java類的代碼:

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java類的代碼:

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

如日常那樣編譯(看指南第一部分,編譯和類路徑的建議)。爲了方便,當咱們運行實例是,咱們如今使用一個環境變量$CP(在windows環境上是%CP%)表示類路徑。 若是你想僅保存warningerror記錄不包含info記錄信息到文件裏,打開一個控制平臺並輸入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

若是你想在你的屏幕上看全部的日誌信息,打開一個新的終端並鍵入:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

例如,爲了發佈一個錯誤日誌信息,僅須要鍵入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

EmitLogDirect.java sourceReceiveLogsDirect.java source的全部源代碼。

閱覽指南第五部分,查看如何根據一個模式來監聽消息。

相關文章
相關標籤/搜索