本地安裝好RabbitMQ Server後,就能夠在Java語言中使用RabbitMQ了。java
RabbitMQ是一個消息代理,從「生產者」接收消息並傳遞消息至「消費者」,期間可根據規則路由、緩存、持久化消息。「生產者」也即message發送者如下簡稱P,相對應的「消費者」乃message接收者如下簡稱C,message經過queue由P到C,queue存在於RabbitMQ,可存儲儘量多的message,多個P可向同一queue發送message,多個C可從同一queue接收message。sql
幾個關鍵概念:apache
Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。緩存
由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的惟一的線路。服務器
消息隊列的使用過程大概以下:maven
(1)客戶端鏈接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
(5)客戶端投遞消息到exchange。函數
如今,能夠上代碼了。首先,是在項目中加入須要的jar包,我使用的是maven項目,直接配置maven及可:測試
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.0.4</version> </dependency>
後面還會用到的jar包,配置以下this
<dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency>
先寫一個類,將產生產者和消費者統一爲 EndPoint類型的隊列。無論是生產者仍是消費者, 鏈接隊列的代碼都是同樣的,這樣能夠通用一些。spa
package cn.com.shopec.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException { this.endPointName = endpointName; // Create a connection factory ConnectionFactory factory = new ConnectionFactory(); // 與RabbitMQ Server創建鏈接 // 鏈接到的broker在本機localhost上 factory.setHost("localhost"); // getting a connection connection = factory.newConnection(); // creating a channel channel = connection.createChannel(); // declaring a queue for this channel. If queue does not exist, // it will be created on the server. // queueDeclare的參數:queue 隊列名;durable true爲持久化;exclusive 是否排外,true爲隊列只能夠在本次的鏈接中被訪問, // autoDelete true爲connection斷開隊列自動刪除;arguments 用於拓展參數 channel.queueDeclare(endpointName, false, false, false, null); } /** * 關閉channel和connection。並不是必須,由於隱含是自動調用的。 * @throws IOException */ public void close() throws IOException { this.channel.close(); this.connection.close(); } }
生產者類的任務是向隊列裏寫一條消息
package cn.com.shopec.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; public class Producer extends EndPoint { public Producer(String endPointName) throws IOException { super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("", endPointName, null, SerializationUtils.serialize(object)); } }
消費者能夠以線程方式運行,對於不一樣的事件有不一樣的回調函數,其中最主要的是處理新消息到來的事件。
package cn.com.shopec.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; public class QueueConsumer extends EndPoint implements Runnable, Consumer { public QueueConsumer(String endPointName) throws IOException { super(endPointName); } public void run() { try { // start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true, this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer " + consumerTag + " registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(body); System.out.println("Message Number " + map.get("message number") + " received."); } public void handleCancel(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleRecoverOk(String consumerTag) { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) { } }
測試類中,先運行一個消費者線程,而後開始產生大量的消息,這些消息會被消費者取走。
package cn.com.shopec.rabbitmq; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; public class Main { public Main() throws Exception { // 建立消費者,即消息接收者,並啓動線程 QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); // 建立生產者,即消息發送者 Producer producer = new Producer("queue"); // 循環發送消息 for (int i = 0; i < 20; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number " + i + " sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception { new Main(); } }
運行結果:
Consumer amq.ctag-8TFduKUwrE1I8iT2L5DaZg registeredMessage Number 0 sent.Message Number 1 sent.Message Number 2 sent.Message Number 3 sent.Message Number 4 sent.Message Number 5 sent.Message Number 6 sent.Message Number 7 sent.Message Number 8 sent.Message Number 9 sent.Message Number 10 sent.Message Number 11 sent.Message Number 12 sent.Message Number 13 sent.Message Number 14 sent.Message Number 15 sent.Message Number 16 sent.Message Number 17 sent.Message Number 18 sent.Message Number 19 sent.Message Number 0 received.Message Number 1 received.Message Number 2 received.Message Number 3 received.Message Number 4 received.Message Number 5 received.Message Number 6 received.Message Number 7 received.Message Number 8 received.Message Number 9 received.Message Number 10 received.Message Number 11 received.Message Number 12 received.Message Number 13 received.Message Number 14 received.Message Number 15 received.Message Number 16 received.Message Number 17 received.Message Number 18 received.Message Number 19 received.