RabbitMQ 入門<轉載經典>

RabbitMQ是一個受歡迎的消息代理,一般用於應用程序之間或者程序的不一樣組件之間經過消息來進行集成。本文簡單介紹瞭如何使用 RabbitMQ,假定你已經配置好了rabbitmq服務器。java

RabbitMQ是用Erlang,對於主要的編程語言都有驅動或者客戶端。咱們這裏要用的是Java,因此先要得到Java客戶端。。下面是Java客戶端的maven依賴的配置。sql

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId> 
     <version>3.0.4</version>
</dependency>

 

像RabbitMQ這樣的消息代理可用來模擬不一樣的場景,例如點對點的消息分發或者訂閱/推送。咱們的程序足夠簡單,有兩個基本的組件,一個生產者用於產生消息,還有一個消費者用來使用產生的消息。apache

在這個例子裏,生產者會產生大量的消息,每一個消息帶有一個序列號,另外一個線程中的消費者會使用這些消息。編程

抽象類EndPoint:

咱們首先寫一個類,將產生產者和消費者統一爲 EndPoint類型的隊列。無論是生產者仍是消費者, 鏈接隊列的代碼都是同樣的,這樣能夠通用一些。數組

package co.syntx.examples.rabbitmq;
 
import java.io.IOException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
/**
 * Represents a connection with a queue
 * @author syntx
 *
 */
public abstract class EndPoint{
     
    protected Channel channel;
    protected Connection connection;
    protected String endPointName;
     
    public EndPoint(String endpointName) throws IOException{
         this.endPointName = endpointName;
         
         //Create a connection factory
         ConnectionFactory factory = new ConnectionFactory();
         
         //hostname of your rabbitmq server
         factory.setHost("localhost");
         
         //getting a connection
         connection = factory.newConnection();
         
         //creating a channel
         channel = connection.createChannel();
         
         //declaring a queue for this channel. If queue does not exist,
         //it will be created on the server.
         channel.queueDeclare(endpointName, false, false, false, null);
    }
     
     
    /**
     * 關閉channel和connection。並不是必須,由於隱含是自動調用的。
     * @throws IOException
     */
     public void close() throws IOException{
         this.channel.close();
         this.connection.close();
     }
}

生產者:

生產者類的任務是向隊列裏寫一條消息。咱們使用Apache Commons Lang把可序列化的Java對象轉換成 byte 數組。commons lang的maven依賴以下:服務器

<dependency>
	<groupId>commons-lang</groupId>
	<artifactId>commons-lang</artifactId>
	<version>2.6</version>
</dependency>
package co.syntx.examples.rabbitmq;
 
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
  
/**
 * The producer endpoint that writes to the queue.
 * @author syntx
 *
 */
public class Producer extends EndPoint{
     
    public Producer(String endPointName) throws IOException{
        super(endPointName);
    }
 
    public void sendMessage(Serializable object) throws IOException {
        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
    }  
}

消費者:

消費者能夠以線程方式運行,對於不一樣的事件有不一樣的回調函數,其中最主要的是處理新消息到來的事件。maven

package co.syntx.examples.rabbitmq;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
 
 
/**
 * 讀取隊列的程序端,實現了Runnable接口。
 * @author syntx
 *
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
     
    public QueueConsumer(String endPointName) throws IOException{
        super(endPointName);       
    }
     
    public void run() {
        try {
            //start consuming messages. Auto acknowledge messages.
            channel.basicConsume(endPointName, true,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * Called when consumer is registered.
     */
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer "+consumerTag +" registered");    
    }
 
    /**
     * Called when new message is available.
     */
    public void handleDelivery(String consumerTag, Envelope env,
            BasicProperties props, byte[] body) throws IOException {
        Map map = (HashMap)SerializationUtils.deserialize(body);
        System.out.println("Message Number "+ map.get("message number") + " received.");
         
    }
 
    public void handleCancel(String consumerTag) {}
    public void handleCancelOk(String consumerTag) {}
    public void handleRecoverOk(String consumerTag) {}
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

Test

package co.syntx.examples.rabbitmq;
 
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
 
public class Main {
    public Main() throws Exception{
         
        QueueConsumer consumer = new QueueConsumer("queue");
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
         
        Producer producer = new Producer("queue");
         
        for (int i = 0; i < 100000; i++) {
            HashMap message = new HashMap();
            message.put("message number", i);
            producer.sendMessage(message);
            System.out.println("Message Number "+ i +" sent.");
        }
    }
     
    /**
     * @param args
     * @throws SQLException
     * @throws IOException
     */
    public static void main(String[] args) throws Exception{
      new Main();
    }
}
相關文章
相關標籤/搜索