上一篇記錄了rabbitmq的安裝,這一篇記錄一下rabbitmq的java客戶端的簡單使用,固然在項目中咱們有更爲複雜的應用場景,這裏只有最簡單的點對點生產者與消費者模式。java
一、創建工程apache
首先創建一個簡單的maven工程,我這邊使用了平時使用的demo工程服務器
pom.xml配置,本次案例中只須要兩個包便可,是用commons包的序列化,amqp則是rabbitmq的java包。maven
二、新建點對點抽象類ide
由於這個例子只講述很是簡單的點對點生產者與消費者關係,在某種程度上二者有不少共性,因此這裏乾脆抽象成一個類了。具體代碼以下:測試
package ucs_test.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author 做者 ucs_fuqing * @date 建立時間:2017年8月11日 下午2:21:27 * @version 1.0 * @parameter * @since * @return */ public abstract class PointToPoint { protected Channel channel; protected Connection connection; protected String pointName; /** * 獲取一個隊列的鏈接 * @param pointName * @throws IOException */ public PointToPoint(String pointName) throws IOException{ this.pointName = pointName; //建立鏈接工廠 ConnectionFactory cf = new ConnectionFactory(); //設置rabbitmq服務器地址 cf.setHost("192.168.149.133"); //設置rabbitmq服務器用戶名 cf.setUsername("hxb"); //設置rabbitmq服務器密碼 cf.setPassword("hxb"); //獲取一個新的鏈接 connection = cf.newConnection(); //建立一個通道 channel = connection.createChannel(); //申明一個隊列,若是這個隊列不存在,將會被建立 channel.queueDeclare(pointName, false, false, false, null); } /** * * @Title: close * @Description: 其實在程序完成時通常會自動關閉鏈接,可是這裏提供手動操做的入口, * @param @throws IOException 設定文件 * @return void 返回類型 * @throws */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } }
在上面代碼中,實現的是建立一個隊列或者關閉它,在默認的狀況下channel和connection會自動關閉,可是我以爲仍是提供手動關閉的入口更好一些。this
三、生產者spa
這個例子中的生產者其實很是簡單,咱們建立了一個鏈接,而且獲取了通道,接下來就能夠直接往咱們指定的隊列(queue)中發送消息了,若是這個隊列不存在,則會被程序自動建立。code
package ucs_test.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; import com.mchange.io.SerializableUtils; /** * @author 做者 ucs_fuqing * @date 建立時間:2017年8月11日 下午2:33:13 * @version 1.0 * @parameter * @since * @return */ public class Producer extends PointToPoint{ public Producer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } /** * * @Title: sendMessage * @Description: 生產消息 * @param @param Object * @param @throws IOException 設定文件 * @return void 返回類型 * @throws */ public void sendMessage(Serializable Object) throws IOException{ channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object)); } }
上面代碼看到,咱們只是簡單的向pointName的隊列發送了一個對象。xml
四、消費者
咱們這裏的消費者也很是簡單,僅僅只是拿到並打印出消息便可
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * @author 做者 ucs_fuqing * @date 建立時間:2017年8月11日 下午2:39:51 * @version 1.0 * @parameter * @since * @return */ public class QueueConsumer extends PointToPoint implements Runnable,Consumer{ public QueueConsumer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } public void run(){ try { channel.basicConsume(pointName,true,this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void handleConsumeOk(String consumerTag) { // TODO Auto-generated method stub System.out.println("Consumer "+consumerTag +" registered"); } @Override public void handleCancelOk(String consumerTag) { // TODO Auto-generated method stub } @Override public void handleCancel(String consumerTag) throws IOException { // TODO Auto-generated method stub } @Override public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { // TODO Auto-generated method stub Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("tagId") + " received."); //channel.basicAck(env.getDeliveryTag(), false); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { // TODO Auto-generated method stub } @Override public void handleRecoverOk(String consumerTag) { // TODO Auto-generated method stub } }
以上代碼中,咱們指定了消費的隊列,並從中拿到消息,打印出來。
五、測試類
至此咱們的生產者與消費者都寫完了,接着寫個測試類來驗證一下
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 做者 ucs_fuqing * @date 建立時間:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); cuThread.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); System.out.println("發送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在這裏咱們的生產者生產10000條消息,消費者拿到並打印出來。看看運行結果:
能夠看到雖然有點亂序,可是10000條消息所有被消費完畢。
六、消息應答
在上面的例子中,咱們的生產者只管發送消息,消費者只管消費消息,而RabbitMQ在上面的例子中,將消息交付給消費者以後,會從內存中移除掉這個消息。在正式的項目中,消費消息可能須要那麼幾秒鐘,那麼問題來了:若是咱們拿到消息後須要進行更爲複雜的業務處理,而這個業務處理失敗或者中斷了,那麼意味着這條消息表明的工做並未完成,可是消息已經不存在了,咱們會丟失掉正在處理的信息,也會丟失掉髮給消費者可是並未被消費的消息。
如今咱們使用兩個消費者來接受同一個隊列的消息,測試類以下:
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 做者 ucs_fuqing * @date 建立時間:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); QueueConsumer consumer2 = new QueueConsumer("testqueue"); Thread cuThread2 = new Thread(consumer2); cuThread.start(); cuThread2.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); //System.out.println("發送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在這種狀況下,MQ將會均勻的將消息發送給兩個消費者消費,可是若是consumer2半路終止或者異常,那麼將會致使咱們的測試結果顯示接受到的消息少於10000條,消失的消息被異常的消費者吃掉了,而咱們沒有任何辦法。。。
爲了保證消息不會丟失,或者說確定被消費,RabbitMQ支持消息應答模式,簡單的只須要修改兩個位置:
消費者類QueueConsumer中
設置basicConsume方法參數爲false,打開消息應答
消費完成以後,向mq返回應答消息。
這樣,當消費者異常時,MQ沒有收到消費者消息應答,將會把消息發送給其餘消費者,保證這條消息被消費掉。
OK,簡單的RabbitMQ服務器Java端例子就這樣了。下一篇會在此基礎上增長一些高級的應用。