RabbitMQ入門(5)--主題

###主題(topic) ###(使用Java客戶端) 在先前的指南中咱們改進了咱們的日誌系統。取代使用fanout類型的交易所,那個僅僅有能力實現啞的廣播,咱們使用一個direct類型的交易所,得到一個能夠有選擇性的接收日誌。java

雖然使用direct交易所類型已經改善了咱們的系統,但它依舊有限制-它不能根據多個條件進行路由。python

咱們的日誌系統中,咱們可能想要訂閱不單單基於嚴格的日誌,一樣基於發佈日誌的源碼。你可能瞭解到syslog unix tool的概念,那個基於嚴格的(info/warn/crit...)和靈巧的(auth/cron/kern...)路由日誌。git

那個將會給咱們許多靈活性-咱們可能僅僅想監聽來自於cron的關鍵性的錯誤和全部來自於kern的日誌。github

爲了在咱們日誌系統中實現那個,咱們須要學習更復雜的topic類型交易所。shell

###topic類型交易所 發送到topic類型的交易所不能有任意的路由的關鍵字-它必須是一個關鍵字列表,由點分隔。這關鍵字能夠是任意的,可是一般能夠說明消息的基本的聯繫。幾個合法的路由關鍵字例子:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。可能有不少你想要的路由關鍵字,上限是255個字節。 這綁定關鍵字必須也在這一樣的表單裏。topic交易所邏輯背後是與direct交易所類型相似-一個帶特別的路由關鍵字的消息將會被傳遞到全部匹配綁定的關鍵字的隊列。可是有兩個特別重要的綁定關鍵字。windows

>* (星標) 能替代任意一個單詞。 ># (哈希) 能代替零個或多個單詞。學習

這個例子中是很容易解釋的: python-five.pngui

在這個例子中,咱們發送的消息都描述的是動物。被髮送的消息的路由關鍵字是由三個單詞(兩個點)組成。路由關鍵字中第一個單詞描述的是速度,第二個描述的是顏色,第三個是物種: "<速度>.<顏色>.<物種>"。unix

咱們建立了三個綁定:Q1s是由綁定關鍵字"。orange."所約束,Q2由"。rabbit"和"lazy.#"所約束。 這些綁定能夠歸納爲:日誌

>Q1 是對orange顏色的動物感興趣。 >Q2 想了解關於兔子的全部信息和全部慢吞吞的動物信息。

一個路由關鍵字爲"quick.orange.rabbit"消息將會被傳遞到全部隊列。消息"lazy.orange.elephant"一樣也傳遞到全部隊列。另外一方面"quick.orange.fox"僅進入第一隊列,"lazy.brown.fox"僅進入第二個隊列。"lazy.pink.rabbit"僅傳遞到第二個隊列一次,即便它會匹配兩個綁定。"quick.brown.fox"不符合任何綁定關鍵字,因此會被丟棄。

若是咱們打破咱們的約定,發送一個消息帶一個或四個單詞的關鍵字,像"orange"或"quick.orange.male.rabbit",會發生什麼呢?好吧,這些消息不會匹配任何綁定,將會丟失。 另外一方面"lazy.orange.male.rabbit",即便它有四個單詞,將會匹配最最後那個綁定,將消息傳遞到第二個隊列。

topic類型交易所 topic類型交易所是強大的,能表現的像其餘的交易所。 Topic exchange is powerful and can behave like other exchanges. 當一個隊列綁定到了"#"(哈希)綁定關鍵字-它會接收全部消息,無論路由關鍵字是什麼-相似於fanout類型交易所 當topic類型交易所中沒有使用像"*"(星標)和"#"(哈希)的特殊字符,它的行爲相似於direct類型交易所

###把全部放在一塊兒 咱們將會在咱們的日誌系統中使用topic類型交易所。咱們假設咱們的工做的日誌消息的路由關鍵字是由兩個單詞組成,格式爲:"<facility>.<severity>"。 這代碼和先前的幾乎同樣: EmitLogTopic.java的代碼:

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代碼:

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

運行接下來的例子,在windows環境中,使用%CP%,包含指南一種的類路徑。 接收全部日誌:

$ java -cp $CP ReceiveLogsTopic "#"

接收全部靈巧的kern日誌:

$ java -cp $CP ReceiveLogsTopic "kern.*"

或者你僅僅想接收'critical'日誌:

$ java -cp $CP ReceiveLogsTopic "*.critical"

你能夠建立多個綁定:

$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

發出一個路由關鍵字爲"kern.critical"的日誌,輸入:

$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

跟這些程序玩的開心。注意代碼沒有對特定的路由和綁定關鍵字作臆斷,你能夠操做多於兩個的路由關鍵字參數。

一些難題:

>""綁定會捕獲路由關鍵字是空的消息嗎? >"#."會捕獲消息中關鍵字帶".."的嗎?它會捕獲一個單詞的關鍵字嗎? >"a.*.#和"a.#"之間有什麼不一樣?

EmitLogTopic.javaReceiveLogsTopic.java的源代碼。

接下來,讓咱們在指南的第六部分,弄清當一個遠端程序被調用,如何作一個一個往返的消息。

相關文章
相關標籤/搜索