RabbitMQ安裝以及java使用(二)

上一篇記錄了rabbitmq的安裝,這一篇記錄一下rabbitmq的java客戶端的簡單使用,固然在項目中咱們有更爲複雜的應用場景,這裏只有最簡單的點對點生產者與消費者模式。java

一、創建工程apache

首先創建一個簡單的maven工程,我這邊使用了平時使用的demo工程服務器

pom.xml配置,本次案例中只須要兩個包便可,是用commons包的序列化,amqp則是rabbitmq的java包。maven

 


 

 

二、新建點對點抽象類ide

由於這個例子只講述很是簡單的點對點生產者與消費者關係,在某種程度上二者有不少共性,因此這裏乾脆抽象成一個類了。具體代碼以下:測試

package ucs_test.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/** 
* @author  做者 ucs_fuqing
* @date 建立時間:2017年8月11日 下午2:21:27 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public abstract class PointToPoint {
    protected Channel channel;
    protected Connection connection;
    protected String pointName;
    
    /**
     * 獲取一個隊列的鏈接
     * @param pointName
     * @throws IOException
     */
    public PointToPoint(String pointName) throws IOException{
        this.pointName = pointName;
        
        //建立鏈接工廠
        ConnectionFactory cf = new ConnectionFactory();
        
        //設置rabbitmq服務器地址
        cf.setHost("192.168.149.133");
        
        //設置rabbitmq服務器用戶名
        cf.setUsername("hxb");
        
        //設置rabbitmq服務器密碼
        cf.setPassword("hxb");
        
        //獲取一個新的鏈接
        connection = cf.newConnection();
        
        //建立一個通道
        channel = connection.createChannel();
        
        //申明一個隊列,若是這個隊列不存在,將會被建立
        channel.queueDeclare(pointName, false, false, false, null);
    }
    
    
    /**
     * 
    * @Title: close 
    * @Description: 其實在程序完成時通常會自動關閉鏈接,可是這裏提供手動操做的入口, 
    * @param @throws IOException    設定文件 
    * @return void    返回類型 
    * @throws
     */
    public void close() throws IOException{
        this.channel.close();
        this.connection.close();
    }
}

在上面代碼中,實現的是建立一個隊列或者關閉它,在默認的狀況下channel和connection會自動關閉,可是我以爲仍是提供手動關閉的入口更好一些。this

 


 

 

三、生產者spa

這個例子中的生產者其實很是簡單,咱們建立了一個鏈接,而且獲取了通道,接下來就能夠直接往咱們指定的隊列(queue)中發送消息了,若是這個隊列不存在,則會被程序自動建立。code

package ucs_test.rabbitmq;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;

import com.mchange.io.SerializableUtils;

/** 
* @author  做者 ucs_fuqing
* @date 建立時間:2017年8月11日 下午2:33:13 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class Producer extends PointToPoint{

    public Producer(String pointName) throws IOException {
        super(pointName);
        // TODO Auto-generated constructor stub
    }
    
    /**
     * 
    * @Title: sendMessage 
    * @Description: 生產消息
    * @param @param Object
    * @param @throws IOException    設定文件 
    * @return void    返回類型 
    * @throws
     */
    public void sendMessage(Serializable Object) throws IOException{
        channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object));
    }
}

上面代碼看到,咱們只是簡單的向pointName的隊列發送了一個對象。xml

 


 

 

四、消費者

咱們這裏的消費者也很是簡單,僅僅只是拿到並打印出消息便可

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

/** 
* @author  做者 ucs_fuqing
* @date 建立時間:2017年8月11日 下午2:39:51 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class QueueConsumer extends PointToPoint implements Runnable,Consumer{

    public QueueConsumer(String pointName) throws IOException {
        super(pointName);
        // TODO Auto-generated constructor stub
    }
    
    public void run(){
        try {
            channel.basicConsume(pointName,true,this);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        // TODO Auto-generated method stub
        System.out.println("Consumer "+consumerTag +" registered");
        
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException {
        // TODO Auto-generated method stub
        Map map = (HashMap)SerializationUtils.deserialize(body);
        System.out.println("Message Number "+ map.get("tagId") + " received.");
        //channel.basicAck(env.getDeliveryTag(), false);
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        // TODO Auto-generated method stub
        
    }

}

以上代碼中,咱們指定了消費的隊列,並從中拿到消息,打印出來。

 


 

 

五、測試類

至此咱們的生產者與消費者都寫完了,接着寫個測試類來驗證一下

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;

/** 
* @author  做者 ucs_fuqing
* @date 建立時間:2017年8月11日 下午2:44:59 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class MainTest {
    public MainTest() throws IOException{
        QueueConsumer consumer = new QueueConsumer("testqueue");
        Thread cuThread = new Thread(consumer);
        
        cuThread.start();
        
        Producer producer = new Producer("testqueue");
        int i = 0;
        while (i<10000) {
            HashMap<String, Object> hm = new HashMap<>();
            hm.put("tagId", i);
            producer.sendMessage(hm);
            System.out.println("發送第"+i+"消息");
            i++;
        }
    }
    
    public static void main(String[] args) throws IOException {
        new MainTest();
    }
}

在這裏咱們的生產者生產10000條消息,消費者拿到並打印出來。看看運行結果:

能夠看到雖然有點亂序,可是10000條消息所有被消費完畢。

 


 

 

六、消息應答

在上面的例子中,咱們的生產者只管發送消息,消費者只管消費消息,而RabbitMQ在上面的例子中,將消息交付給消費者以後,會從內存中移除掉這個消息。在正式的項目中,消費消息可能須要那麼幾秒鐘,那麼問題來了:若是咱們拿到消息後須要進行更爲複雜的業務處理,而這個業務處理失敗或者中斷了,那麼意味着這條消息表明的工做並未完成,可是消息已經不存在了,咱們會丟失掉正在處理的信息,也會丟失掉髮給消費者可是並未被消費的消息。
如今咱們使用兩個消費者來接受同一個隊列的消息,測試類以下:

package ucs_test.rabbitmq;

import java.io.IOException;
import java.util.HashMap;

/** 
* @author  做者 ucs_fuqing
* @date 建立時間:2017年8月11日 下午2:44:59 
* @version 1.0 
* @parameter  
* @since  
* @return  
*/
public class MainTest {
    public MainTest() throws IOException{
        QueueConsumer consumer = new QueueConsumer("testqueue");
        Thread cuThread = new Thread(consumer);
        QueueConsumer consumer2 = new QueueConsumer("testqueue");
        Thread cuThread2 = new Thread(consumer2);
        cuThread.start();
        cuThread2.start();
        Producer producer = new Producer("testqueue");
        int i = 0;
        while (i<10000) {
            HashMap<String, Object> hm = new HashMap<>();
            hm.put("tagId", i);
            producer.sendMessage(hm);
            //System.out.println("發送第"+i+"消息");
            i++;
        }
    }
    
    public static void main(String[] args) throws IOException {
        new MainTest();
    }
}

在這種狀況下,MQ將會均勻的將消息發送給兩個消費者消費,可是若是consumer2半路終止或者異常,那麼將會致使咱們的測試結果顯示接受到的消息少於10000條,消失的消息被異常的消費者吃掉了,而咱們沒有任何辦法。。。

爲了保證消息不會丟失,或者說確定被消費,RabbitMQ支持消息應答模式,簡單的只須要修改兩個位置:

消費者類QueueConsumer中

設置basicConsume方法參數爲false,打開消息應答

消費完成以後,向mq返回應答消息。

這樣,當消費者異常時,MQ沒有收到消費者消息應答,將會把消息發送給其餘消費者,保證這條消息被消費掉。

 

OK,簡單的RabbitMQ服務器Java端例子就這樣了。下一篇會在此基礎上增長一些高級的應用。

相關文章
相關標籤/搜索