翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.htmlhtml
在前面的教程中,咱們建立了一個工做隊列,都是假設一個任務只交給一個消費者。此次咱們作一些徹底不一樣的事兒——將消息發送給多個消費者。這種模式叫作「發佈/訂閱」。java
爲了說明這個模式,咱們將構建一個簡單日誌系統。它包含2段程序:第一個將發出日誌消息,第二個接受並打印消息。算法
若是在日誌系統中每個接受者(訂閱者)都會的獲得消息的拷貝。那樣的話,咱們能夠運行一個接受者(訂閱者)程序,直接把日誌記錄到硬盤。同時運行另外一個接受者(訂閱者)程序,打印日誌到屏幕上。服務器
說白了,發表日誌消息將被廣播給全部的接收者。less
Exchanges(轉發器)前面的博文彙總,咱們都是基於一個隊列發送和接受消息。如今介紹一下完整的消息傳遞模式。ide
RabbitMQ消息模式的核心理念是:生產者沒有直接發送任何消費到隊列。實際上,生產者都不知道這個消費是發送給哪一個隊列的。翻譯
相反,生產者只能發送消息給轉發器,轉發器是很是簡單的。一方面它接受生產者的消息,另外一方面向隊列推送消息。轉發器必須清楚的知道如何處理接收到的消息。附加一個特定的隊列嗎?附加多個隊列?或者是否丟棄?這些規則經過轉發器的類型進行定義。3d
類型有:Direct、Topic、Headers和Fanout。咱們關注最後一個。如今讓咱們建立一個該類型的轉發器,定義以下:日誌
channel.exchangeDeclare("logs", "fanout");
fanout轉發器很是簡單,從名字就能夠看出,它是廣播接受到的消息給全部的隊列。而這正好符合日誌系統的需求。code
Nameless exchange(匿名轉發)以前咱們對轉換器一無所知,卻能夠將消息發送到隊列,那是多是咱們用了默認的轉發器,轉發器名爲空字符串""。以前咱們發佈消息的代碼是:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數就是轉發器的名字,空字符串表示模式或者匿名的轉發器。消息經過隊列的routingKey路由到指定的隊列中去,若是存在的話。
如今咱們能夠指定轉發器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(臨時隊列)
你可能還記得以前咱們用隊列時,會指定一個名字。隊列有名字對咱們來講是很是重要的——咱們須要爲消費者指定同一個隊列。
但這並非咱們的日誌系統所關心的。咱們要監聽全部日誌消息,而不只僅是一類日誌。咱們只對對當前流動的消息感興趣。解決這些問題,我盟須要完成兩件事。
首先,每當我盟鏈接到RabbitMQ時,須要一個新的空隊列。爲此咱們須要建立一個隨機名字的空隊列,或者更好的,讓服務器選好年則一個隨機名字的空隊列給咱們。
其次,一旦消費者斷開鏈接,隊列將自動刪除。
咱們提供一個無參的queueDeclare()方法,建立一個非持久化、獨立的、自動刪除的隊列,且名字是隨機生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一個隨機隊列名。看起來會像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(綁定)咱們已經建立了一個廣播的轉發器和一個隨機隊列。如今須要告訴轉發器轉發消息到隊列。這個關聯轉發器和隊列的咱們叫它Binding。
channel.queueBind(queueName, "logs", "");
這樣,日誌轉發器將附加到日誌隊列上去。
完整的例子:發送端代碼(生產者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立鏈接鏈接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 發送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和鏈接 channel.close(); connection.close(); } }
消費者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消費者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能夠看到咱們1個生產者用於發送log消息,2個消費者,一個用於顯示,一個用於記錄文件。
生產者聲明瞭一個廣播模式的轉換器,訂閱這個轉換器的消費者均可以收到每一條消息。能夠看到在生產者中,沒有聲明隊列。這也驗證了以前說的。生產者其實只關心exchange,至於exchange會把消息轉發給哪些隊列,並非生產者關心的。
2個消費者,一個打印日誌,一個寫入文件,除了這2個地方不同,其餘地方如出一轍。也是聲明一下廣播模式的轉換器,而隊列則是隨機生成的,消費者實例啓動後,會建立一個隨機實例,這個在管理頁面能夠看到(如圖)。而實例關閉後,隨機隊列也會自動刪除。最後將隊列與轉發器綁定。
注:運行的時候要先運行2個消費者實例,而後在運行生產者實例。不然獲取不到實例。
看看最終的結果吧:
翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
在前面的教程中,咱們建立了一個工做隊列,都是假設一個任務只交給一個消費者。此次咱們作一些徹底不一樣的事兒——將消息發送給多個消費者。這種模式叫作「發佈/訂閱」。
爲了說明這個模式,咱們將構建一個簡單日誌系統。它包含2段程序:第一個將發出日誌消息,第二個接受並打印消息。
若是在日誌系統中每個接受者(訂閱者)都會的獲得消息的拷貝。那樣的話,咱們能夠運行一個接受者(訂閱者)程序,直接把日誌記錄到硬盤。同時運行另外一個接受者(訂閱者)程序,打印日誌到屏幕上。
說白了,發表日誌消息將被廣播給全部的接收者。
Exchanges(轉發器)前面的博文彙總,咱們都是基於一個隊列發送和接受消息。如今介紹一下完整的消息傳遞模式。
RabbitMQ消息模式的核心理念是:生產者沒有直接發送任何消費到隊列。實際上,生產者都不知道這個消費是發送給哪一個隊列的。
相反,生產者只能發送消息給轉發器,轉發器是很是簡單的。一方面它接受生產者的消息,另外一方面向隊列推送消息。轉發器必須清楚的知道如何處理接收到的消息。附加一個特定的隊列嗎?附加多個隊列?或者是否丟棄?這些規則經過轉發器的類型進行定義。
類型有:Direct、Topic、Headers和Fanout。咱們關注最後一個。如今讓咱們建立一個該類型的轉發器,定義以下:
channel.exchangeDeclare("logs", "fanout");
fanout轉發器很是簡單,從名字就能夠看出,它是廣播接受到的消息給全部的隊列。而這正好符合日誌系統的需求。
Nameless exchange(匿名轉發)以前咱們對轉換器一無所知,卻能夠將消息發送到隊列,那是多是咱們用了默認的轉發器,轉發器名爲空字符串""。以前咱們發佈消息的代碼是:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數就是轉發器的名字,空字符串表示模式或者匿名的轉發器。消息經過隊列的routingKey路由到指定的隊列中去,若是存在的話。
如今咱們能夠指定轉發器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(臨時隊列)
你可能還記得以前咱們用隊列時,會指定一個名字。隊列有名字對咱們來講是很是重要的——咱們須要爲消費者指定同一個隊列。
但這並非咱們的日誌系統所關心的。咱們要監聽全部日誌消息,而不只僅是一類日誌。咱們只對對當前流動的消息感興趣。解決這些問題,我盟須要完成兩件事。
首先,每當我盟鏈接到RabbitMQ時,須要一個新的空隊列。爲此咱們須要建立一個隨機名字的空隊列,或者更好的,讓服務器選好年則一個隨機名字的空隊列給咱們。
其次,一旦消費者斷開鏈接,隊列將自動刪除。
咱們提供一個無參的queueDeclare()方法,建立一個非持久化、獨立的、自動刪除的隊列,且名字是隨機生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一個隨機隊列名。看起來會像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(綁定)咱們已經建立了一個廣播的轉發器和一個隨機隊列。如今須要告訴轉發器轉發消息到隊列。這個關聯轉發器和隊列的咱們叫它Binding。
channel.queueBind(queueName, "logs", "");
這樣,日誌轉發器將附加到日誌隊列上去。
完整的例子:發送端代碼(生產者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立鏈接鏈接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 發送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和鏈接 channel.close(); connection.close(); } }
消費者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消費者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能夠看到咱們1個生產者用於發送log消息,2個消費者,一個用於顯示,一個用於記錄文件。
生產者聲明瞭一個廣播模式的轉換器,訂閱這個轉換器的消費者均可以收到每一條消息。能夠看到在生產者中,沒有聲明隊列。這也驗證了以前說的。生產者其實只關心exchange,至於exchange會把消息轉發給哪些隊列,並非生產者關心的。
2個消費者,一個打印日誌,一個寫入文件,除了這2個地方不同,其餘地方如出一轍。也是聲明一下廣播模式的轉換器,而隊列則是隨機生成的,消費者實例啓動後,會建立一個隨機實例,這個在管理頁面能夠看到(如圖)。而實例關閉後,隨機隊列也會自動刪除。最後將隊列與轉發器綁定。
注:運行的時候要先運行2個消費者實例,而後在運行生產者實例。不然獲取不到實例。
看看最終的結果吧:
翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html
在前篇博文中,咱們創建了一個簡單的日誌系統。能夠廣播消息給多個消費者。本篇博文,咱們將添加新的特性——咱們能夠只訂閱部分消息。好比:咱們能夠接收Error級別的消息寫入文件。同時仍然能夠在控制檯打印全部日誌。
Bindings(綁定)在上一篇博客中咱們已經使用過綁定。相似下面的代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定表示轉換器與隊列之間的關係。能夠簡單的人爲:隊列對該轉發器上的消息感興趣。
綁定能夠設定額外的routingKey參數。爲了與避免basicPublish方法(發佈消息的方法)的參數混淆,咱們準備把它稱做綁定鍵(binding key)。下面展現如何使用綁定鍵(binding key)來建立一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵關鍵取決於轉換器的類型。對於fanout類型,忽略此參數。
Direct exchange(直接轉發)前面講到咱們的日誌系統廣播消息給全部的消費者。咱們想對其擴展,根據消息的嚴重性來過濾消息。例如:咱們但願將致命錯誤的日誌消息記錄到文件,而不是把磁盤空間浪費在warn和info類型的日誌上。咱們使用的fanout轉發器,不能給咱們太多的靈活性。它僅僅只是盲目的廣播而已。咱們使用direct轉發器進行代替,其背後的算法很簡單——消息會被推送至綁定鍵(binding key)和消息發佈附帶的選擇鍵(routing key)徹底匹配的隊列。
在上圖中,咱們能夠看到direct類型的轉發器與2個隊列進行了綁定。第一個隊列使用的綁定鍵是orange,第二個隊列綁定鍵爲black和green。這樣當消息發佈到轉發器是,附帶orange綁定鍵的消息將被路由到隊列Q1中去。附帶black和green綁定鍵的消息被路由到Q2中去。其餘消息所有丟棄。
Multiple bindings(多重綁定)使用一個綁定鍵綁定多個隊列是徹底合法的。如上圖,綁定鍵black綁定了2個隊列——Q1和Q2。
Emitting logs(發送日誌)咱們將這種模式用於日誌系統,發送消息給direct類型的轉發器。咱們將 提供日誌嚴重性作爲綁定鍵。那樣,接收程序能夠選擇性的接收嚴重性的消息。首先關注發送日誌的代碼:
像往常同樣首先建立一個轉換器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
而後爲發送消息作準備:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
爲了簡化代碼,咱們假定日誌的嚴重性是‘info’,‘warning’,‘error’中之一。
Subscribing(訂閱)接收消息跟前面博文中的同樣。咱們僅須要修改一個地方:爲每個咱們感興趣的嚴重性的消息,建立一個新的綁定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }完整的例子
發送端代碼(EmitLogDirect.java)
public class EmitLogDirect { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { /** * 建立鏈接鏈接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //全部日誌嚴重性級別 String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ String severity = severities[i%3];//每一次發送一條不一樣嚴重性的日誌 // 發送的消息 String message = "Hello World"+Strings.repeat(".", i+1); //參數1:exchange name //參數2:routing key channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity +"':'"+ message + "'"); } // 關閉頻道和鏈接 channel.close(); connection.close(); } }
消費者1(ReceiveLogs2Console.java)
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); //全部日誌嚴重性級別 String[] severities={"error","info","warning"}; for (String severity : severities) { //關注全部級別的日誌(多重綁定) channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消費者2(ReceiveLogs2File.java)
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); String severity="error";//只關注error級別的日誌,而後記錄到文件中去。 channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //記錄日誌到文件: print2File( "["+ envelope.getRoutingKey() + "] "+message); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
最終結果:
羅哩羅嗦的說這麼多,其實就是說了這麼一件事:咱們可使用Direct exchange+routingKey來過濾本身感興趣的消息。一個隊列能夠綁定多個routingKey。這就是咱們今天的主題——路由選擇。
翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
在前面的教程中,咱們建立了一個工做隊列,都是假設一個任務只交給一個消費者。此次咱們作一些徹底不一樣的事兒——將消息發送給多個消費者。這種模式叫作「發佈/訂閱」。
爲了說明這個模式,咱們將構建一個簡單日誌系統。它包含2段程序:第一個將發出日誌消息,第二個接受並打印消息。
若是在日誌系統中每個接受者(訂閱者)都會的獲得消息的拷貝。那樣的話,咱們能夠運行一個接受者(訂閱者)程序,直接把日誌記錄到硬盤。同時運行另外一個接受者(訂閱者)程序,打印日誌到屏幕上。
說白了,發表日誌消息將被廣播給全部的接收者。
Exchanges(轉發器)前面的博文彙總,咱們都是基於一個隊列發送和接受消息。如今介紹一下完整的消息傳遞模式。
RabbitMQ消息模式的核心理念是:生產者沒有直接發送任何消費到隊列。實際上,生產者都不知道這個消費是發送給哪一個隊列的。
相反,生產者只能發送消息給轉發器,轉發器是很是簡單的。一方面它接受生產者的消息,另外一方面向隊列推送消息。轉發器必須清楚的知道如何處理接收到的消息。附加一個特定的隊列嗎?附加多個隊列?或者是否丟棄?這些規則經過轉發器的類型進行定義。
類型有:Direct、Topic、Headers和Fanout。咱們關注最後一個。如今讓咱們建立一個該類型的轉發器,定義以下:
channel.exchangeDeclare("logs", "fanout");
fanout轉發器很是簡單,從名字就能夠看出,它是廣播接受到的消息給全部的隊列。而這正好符合日誌系統的需求。
Nameless exchange(匿名轉發)以前咱們對轉換器一無所知,卻能夠將消息發送到隊列,那是多是咱們用了默認的轉發器,轉發器名爲空字符串""。以前咱們發佈消息的代碼是:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數就是轉發器的名字,空字符串表示模式或者匿名的轉發器。消息經過隊列的routingKey路由到指定的隊列中去,若是存在的話。
如今咱們能夠指定轉發器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(臨時隊列)
你可能還記得以前咱們用隊列時,會指定一個名字。隊列有名字對咱們來講是很是重要的——咱們須要爲消費者指定同一個隊列。
但這並非咱們的日誌系統所關心的。咱們要監聽全部日誌消息,而不只僅是一類日誌。咱們只對對當前流動的消息感興趣。解決這些問題,我盟須要完成兩件事。
首先,每當我盟鏈接到RabbitMQ時,須要一個新的空隊列。爲此咱們須要建立一個隨機名字的空隊列,或者更好的,讓服務器選好年則一個隨機名字的空隊列給咱們。
其次,一旦消費者斷開鏈接,隊列將自動刪除。
咱們提供一個無參的queueDeclare()方法,建立一個非持久化、獨立的、自動刪除的隊列,且名字是隨機生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一個隨機隊列名。看起來會像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(綁定)咱們已經建立了一個廣播的轉發器和一個隨機隊列。如今須要告訴轉發器轉發消息到隊列。這個關聯轉發器和隊列的咱們叫它Binding。
channel.queueBind(queueName, "logs", "");
這樣,日誌轉發器將附加到日誌隊列上去。
完整的例子:發送端代碼(生產者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立鏈接鏈接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("127.0.0.1"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個頻道 Channel channel = connection.createChannel(); // 指定轉發——廣播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 發送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和鏈接 channel.close(); connection.close(); } }
消費者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消費者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打開鏈接和建立頻道,與發送端同樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明一個隨機隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能夠看到咱們1個生產者用於發送log消息,2個消費者,一個用於顯示,一個用於記錄文件。
生產者聲明瞭一個廣播模式的轉換器,訂閱這個轉換器的消費者均可以收到每一條消息。能夠看到在生產者中,沒有聲明隊列。這也驗證了以前說的。生產者其實只關心exchange,至於exchange會把消息轉發給哪些隊列,並非生產者關心的。
2個消費者,一個打印日誌,一個寫入文件,除了這2個地方不同,其餘地方如出一轍。也是聲明一下廣播模式的轉換器,而隊列則是隨機生成的,消費者實例啓動後,會建立一個隨機實例,這個在管理頁面能夠看到(如圖)。而實例關閉後,隨機隊列也會自動刪除。最後將隊列與轉發器綁定。
注:運行的時候要先運行2個消費者實例,而後在運行生產者實例。不然獲取不到實例。
看看最終的結果吧: