RabbitMQ開放的編程接口主要是經過下面4個類來實現了。java
類名 | 做用 | 說明 |
---|---|---|
Connection | 鏈接對象,表示客戶端與服務器端的一個鏈接。 | 用於打開Channel,註冊connection各個生命週期的處理方法,以及關閉鏈接。 |
ConnectionFactory | 用於建立connection對象。 | 能夠設置服務器的地址、端口、用戶名、密碼、虛擬空間等全局配置 |
Channel | AMQP協議操做的入口 | 對消息的絕大多數操做都是經過channel操做的 |
Consumer | 消費者對象 | 該對象裏面有一些對消息到達後的調用方法 |
#Connection and Channel編程
核心api類Connection、Channel分別表明AMQP 0-9-1協議中的connection和channel。api
Connection是用來鏈接AMQP broker的。使用方式以下服務器
ConnectionFactory factory = new ConnectionFactory(); // 建立一個鏈接工廠 factory.setUsername(userName); // 設置用戶名 factory.setPassword(password); // 設置密碼 factory.setVirtualHost(virtualHost); // 設置虛擬主機 factory.setHost(hostName); // 設置主機域名 factory.setPort(portNumber); // 設置端口 Connection conn = factory.newConnection(); // 建立一個鏈接
以上配置參數,都有默認的值。默認值是鏈接到本地的服務器。多線程
其實你也何以經過URIs 來鏈接broker。ide
ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost"); // 相似於http協議請求同樣 Connection conn = factory.newConnection();
獲得connection後,則能夠經過connection打開channel編碼
Channel channel = conn.createChannel();
如今你能夠經過channel發送消息和接受消息了。線程
記住,但使用完畢後,要記得關閉鏈接。不過通常的程序,應該會一直髮送消息或則等待消息。代理
channel.close(); conn.close();
注意。這兒手動關閉channel並非嚴格必須的,但倒是一個較好的編碼習慣。當connection關閉的時候,也會自動去關閉channel。code
#使用Exchange and Queue
客戶端應用程序工做是須要exchange和queue的。exchange和queue在使用以前必須先定義。定義他們的類型,名字,在必要的時候建立他們,以確保他們是必須存在。
咱們繼續上面的例子,經過代碼來定義一個exchange和queue,並把他們二者綁定在一塊兒。
channel.exchangeDeclare(exchangeName, "direct",true); String queueName = channel.queueDeclar().getQueue(); channel.queueBind(queueName, exchangeName, routingKey);
經過上面幾行代碼的參數配置,能夠獲得自定義的channel和queue。
##Channels and Concurrency Considerations(Thread Safety) Channel對象是不能在多線程之間共享的。應用程序應該保證一個channel對象只在一個線程中使用。
消息是先發送到exchange的。exchange再根據routingkey發送的對應的queue中去。 發送消息要用到Channel.basicPublish()方法。
byte[] messageBodyBytes = "Hello world!".getBytes("UTF-8"); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
接收消息,須要用到Consumer接口及其子類。當有消息到達時,消息會自動的被投送到Consumer,而不是手動的去獲取。
當執行消費者Consumer中的API方法時,代碼中每每會用到消費的tags。一個channel中能夠註冊多個消費者,因此每一個消費者要有本身的tag用於區分。
boolean autoAck = false; // 是否開啓自動確認消息回覆,此處的false須要手動回覆消息確認。 // 下面的語句相似於在channel上註冊了一個消費者。 channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { // Called when a basic.recover-ok is received @Override public void handleDelivery(String consumerTag, // 消費者標籤 Envelope envelope, // AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); // 手動回覆消息確認 } });
經過上面的代碼,咱們就在channel中註冊了一個消費者。固然你也能夠明確的經過下面的一句代碼放棄一個消費者。
channel.basicCannel(consumerTag);
消息生產者,Send類
package com.benny.amqp.rabbitmq_provider; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Send { private final static String TASK_QUEUE_NAME = "hello_task"; public static void main(String[] argv) throws java.io.IOException { try { ConnectionFactory factory = new ConnectionFactory(); // 鏈接工廠對象 factory.setHost("localhost"); // 設置消息broker服務器的域名地址,其餘配置項使用默認值 Connection connection = factory.newConnection(); // 建立一個鏈接 Channel channel = connection.createChannel(); // 經過connection獲得一個channel // 定義一個隊列, channel.queueDeclare(TASK_QUEUE_NAME, // 隊列名 true, // 隊列可持久化 false, // 隊列非獨佔,若是是true則只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除 false, // 當最後一個消費者退訂後,隊列當即刪除 null // ther properties (construction arguments) for the queue,一些消息代理用他來完成相似與TTL的某些額外功能 ); String message = "Hello World ......."; // channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // message = getMessage(argv); channel.basicPublish("", // exchangeName, 使用默認exchange,名字有服務器生產。 TASK_QUEUE_NAME, // routingKey, 這兒使用與隊列名相同的名字 MessageProperties.PERSISTENT_TEXT_PLAIN, // 消息的屬性, 內容是文本,並能夠持久化 message.getBytes() // 消息的內容 ); System.out.println(" [x] Sent '" + message + "'"); channel.close(); // 關閉channel connection.close(); // 關閉鏈接 } catch (Exception e) { // TODO: handle exception } finally { } } }
消息消費者,Recv類
package com.benny.amqp.rabbitmq_provider; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Recv { private final static String TASK_QUEUE_NAME = "hello_task"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); // 設置限速。在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息 final Consumer consumer = new DefaultConsumer(channel) { // 定義一個消費者類 // 當有消息到來則該方法被調用。 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ System.out.println(" [x] "+message+" Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); // 把上面new出的消費者註冊到channel中。 } catch (Exception e) { // TODO: handle exception } } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(3000); } } }